In [ ]:
import osimport numpy as npimport matplotlib.pyplot as pltimport tensorflow as tffrom tensorflow.keras import layers, models, callbacks

In [ ]:
gpus = tf.config.list_physical_devices('GPU')if gpus:    try:        for g in gpus:            tf.config.experimental.set_memory_growth(g, True)        print("Enabled memory growth for GPUs")    except Exception as e:        print("Could not set memory growth:", e)

In [ ]:
BASE_DIR = "/home/amit/code_only/code/app_training/Apple"IMAGE_SIZE = (256, 256)BATCH_SIZE = 12CHANNELS = 3NUM_CLASSES = 4EPOCHS = 50AUTOTUNE = tf.data.AUTOTUNE

In [ ]:
train_dir = os.path.join(BASE_DIR, "Train")val_dir   = os.path.join(BASE_DIR, "Val")test_dir  = os.path.join(BASE_DIR, "Test")train_ds = tf.keras.preprocessing.image_dataset_from_directory(    train_dir,    labels="inferred",    label_mode="int",    image_size=IMAGE_SIZE,    batch_size=BATCH_SIZE,    shuffle=True,    seed=123)val_ds = tf.keras.preprocessing.image_dataset_from_directory(    val_dir,    labels="inferred",    label_mode="int",    image_size=IMAGE_SIZE,    batch_size=BATCH_SIZE,    shuffle=True,    seed=123)test_ds = tf.keras.preprocessing.image_dataset_from_directory(    test_dir,    labels="inferred",    label_mode="int",    image_size=IMAGE_SIZE,    batch_size=BATCH_SIZE,    shuffle=False)class_names = train_ds.class_namesprint("Classes:", class_names)

In [ ]:
train_ds = train_ds.shuffle(buffer_size=2000).prefetch(buffer_size=1)val_ds   = val_ds.prefetch(buffer_size=1)test_ds  = test_ds.prefetch(buffer_size=1)

In [ ]:
AUTOTUNE = tf.data.AUTOTUNEdef augment(x, y):    return x, y

In [ ]:
input_shape = IMAGE_SIZE + (CHANNELS,)augmentation = tf.keras.Sequential([    layers.RandomFlip("horizontal_and_vertical"),    layers.RandomRotation(0.10),    layers.RandomZoom(0.1),    layers.RandomContrast(0.1),], name="augmentation")model = models.Sequential([    layers.Input(shape=input_shape),    augmentation,    layers.Rescaling(1./255),    layers.Conv2D(32, 3, activation='relu', padding='same'),    layers.BatchNormalization(),    layers.MaxPooling2D(2),    layers.Conv2D(64, 3, activation='relu', padding='same'),    layers.BatchNormalization(),    layers.MaxPooling2D(2),    layers.Conv2D(128, 3, activation='relu', padding='same'),    layers.BatchNormalization(),    layers.MaxPooling2D(2),    layers.Conv2D(128, 3, activation='relu', padding='same'),    layers.BatchNormalization(),    layers.MaxPooling2D(2),    layers.GlobalAveragePooling2D(),    layers.Dense(128, activation='relu'),    layers.Dropout(0.5),    layers.Dense(NUM_CLASSES, activation='softmax')], name="apple_disease_cnn")model.compile(    optimizer=tf.keras.optimizers.Adam(),    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),    metrics=['accuracy'])model.summary()

In [ ]:
checkpoint_dir = "../models"os.makedirs(checkpoint_dir, exist_ok=True)checkpoint_path = os.path.join(checkpoint_dir, "best_model.keras")cb = [    callbacks.EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True),    callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6),    callbacks.ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, verbose=1)]

In [ ]:
history = model.fit(    train_ds,    validation_data=val_ds,    epochs=EPOCHS,    callbacks=cb,    verbose=1)

In [ ]:
scores = model.evaluate(test_ds, verbose=1)print("Test loss:", scores[0], "Test accuracy:", scores[1])

In [ ]:
epochs_ran = len(history.history['loss'])plt.figure(figsize=(12,5))plt.subplot(1,2,1)plt.plot(range(1, epochs_ran+1), history.history['accuracy'], label='Train acc')plt.plot(range(1, epochs_ran+1), history.history['val_accuracy'], label='Val acc')plt.xlabel('Epoch'); plt.ylabel('Accuracy'); plt.legend(); plt.title('Accuracy')plt.subplot(1,2,2)plt.plot(range(1, epochs_ran+1), history.history['loss'], label='Train loss')plt.plot(range(1, epochs_ran+1), history.history['val_loss'], label='Val loss')plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.legend(); plt.title('Loss')plt.show()

In [ ]:
def predict_image(model, image_tensor):    img = tf.cast(image_tensor, tf.float32)    img = tf.image.resize(img, IMAGE_SIZE)    img = img / 255.0    img = tf.expand_dims(img, 0)  # batch of 1    preds = model.predict(img)    idx = int(np.argmax(preds[0]))    return class_names[idx], float(np.max(preds[0]) * 100.0)

In [ ]:
plt.figure(figsize=(12,12))for images, labels in test_ds.take(1):    for i in range(min(9, images.shape[0])):        ax = plt.subplot(3,3,i+1)        img = images[i].numpy().astype('uint8')        actual = class_names[int(labels[i])]        pred_name, conf = predict_image(model, img)        plt.imshow(img)        plt.title(f"Actual: {actual}\nPred: {pred_name} ({conf:.1f}%)")        plt.axis('off')plt.show()

In [ ]:
def safe_save_model(m, out_dir="../models"):    os.makedirs(out_dir, exist_ok=True)    existing = [d for d in os.listdir(out_dir) if os.path.isdir(os.path.join(out_dir, d))]    version = (max([int(v) for v in existing if v.isdigit()] + [0]) + 1)    save_path = os.path.join(out_dir, f"{version}.keras")    m.save(save_path)    print("âœ… Saved model to", save_path)safe_save_model(model, checkpoint_dir)

In [ ]:
import pathlib, csvfrom collections import Countertry:    model  # noqa: F821except NameError:    model = tf.keras.models.load_model(checkpoint_path)out_csv = os.path.join(checkpoint_dir, "test_predictions.csv")rows = []test_path = pathlib.Path(test_dir)for class_dir in sorted(test_path.iterdir()):    if not class_dir.is_dir():        continue    for img_path in sorted(class_dir.glob("*")):        if not img_path.is_file():            continue        try:            img_raw = tf.io.read_file(str(img_path))            img = tf.image.decode_image(img_raw, channels=CHANNELS)            pred_name, conf = predict_image(model, img)            actual = class_dir.name            rows.append([str(img_path), actual, pred_name, conf])        except Exception as e:            rows.append([str(img_path), "ERROR", str(e), 0.0])with open(out_csv, "w", newline="") as f:    writer = csv.writer(f)    writer.writerow(["filepath", "actual", "predicted", "confidence"])    writer.writerows(rows)print(f"Saved {len(rows)} predictions to {out_csv}")total = len(rows)correct = sum(1 for r in rows if r[1] == r[2])print(f"File-level accuracy: {correct}/{total} = {correct/total:.3f}" if total else "No files found")print("Top 10 predictions (first 10 rows):")for r in rows[:10]:    print(r)print("Per-class confusion counts (actual->predicted):")cnt = Counter((r[1], r[2]) for r in rows)for k, v in sorted(cnt.items()):    print(f"{k}: {v}")

In [ ]:
model_dir = "../models"model_files = [f for f in os.listdir(model_dir) if f.endswith('.keras')]models = []for model_file in model_files:    model_path = os.path.join(model_dir, model_file)    models.append(tf.keras.models.load_model(model_path))def predict_multiple_models(models, image_tensor):    predictions = []    for model in models:        pred_name, conf = predict_image(model, image_tensor)        predictions.append((pred_name, conf))    return predictionsplt.figure(figsize=(12,12))for images, labels in test_ds.take(1):    for i in range(min(9, images.shape[0])):        ax = plt.subplot(3,3,i+1)        img = images[i].numpy().astype('uint8')        actual = class_names[int(labels[i])]        predictions = predict_multiple_models(models, img)        pred_names = [pred[0] for pred in predictions]        confs = [pred[1] for pred in predictions]        plt.imshow(img)        plt.title(f"Actual: {actual}\nPreds: {pred_names}\nConfidences: {confs}")        plt.axis('off')plt.show()