## 1. Imports

In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf
import deeplake

print(tf.__version__)

## 2. Load Dataset

In [None]:
ROOT_PATH = "D:/Programming/Projects/Public/plant-lens/ai"
DATASET_DATA_PATH = f"{ROOT_PATH}/data/dataset"

training_dataset = deeplake.load(f'{DATASET_DATA_PATH}/training')
testing_dataset = deeplake.load(f'{DATASET_DATA_PATH}/testing')

training_dataset.summary()
testing_dataset.summary()

In [None]:
data_classes = list(set(sum(training_dataset.labels.data()['text'], [])))
data_classes_count = len(data_classes)
print("Total Number of Classes", data_classes_count)

### 2.2 Create Data Pipeline

In [None]:
training_size = int(len(training_dataset.labels) * 0.8)
print("Train Data Size", training_size, "Validation Data Size", len(training_dataset.labels) - training_size)

training_dataset = training_dataset.tensorflow()
testing_dataset = testing_dataset.tensorflow()

validation_dataset = training_dataset.skip(training_size)
training_dataset = training_dataset.take(training_size)

In [None]:
# Define a preprocessing function
def preprocess_image(args)->tuple:
  image = args['images']
  image = tf.cast(image, tf.float32)
  image = tf.math.divide(image, 255.0)

  label = args['labels']
  label = tf.cast(label, tf.int32)
  label = tf.squeeze(label)

  encoded_label = tf.one_hot(label, depth=data_classes_count)
  return (image, encoded_label)

training_dataset = training_dataset.map(preprocess_image)
validation_dataset = validation_dataset.map(preprocess_image)
testing_dataset = testing_dataset.map(preprocess_image)

# Shuffle and batch the datasets
BATCH_SIZE = 32
training_dataset = training_dataset.shuffle(buffer_size=10000).batch(BATCH_SIZE)
validation_dataset = validation_dataset.batch(BATCH_SIZE)
testing_dataset = testing_dataset.batch(BATCH_SIZE)

In [None]:
for images, labels in training_dataset.take(1):
  print("Training Batch images shape:", images.shape)
  print("Training Batch labels shape:", labels.shape)

for images, labels in testing_dataset.take(1):
  print("Testing Batch images shape:", images.shape)
  print("Testing Batch labels shape:", labels.shape)

for images, labels in validation_dataset.take(1):
  print("Validation Batch images shape:", images.shape)
  print("Validation Batch labels shape:", labels.shape)

## 3. Build Model

In [None]:
from tensorflow.keras import Model, layers, optimizers, models, callbacks
import tensorflow_hub as hub


def build_model():
    dimensions = 224
    input_layer = layers.Input(shape=(dimensions, dimensions, 3))
    feature_extractor = hub.KerasLayer('https://tfhub.dev/google/imagenet/mobilenet_v3_small_075_224/feature_vector/5', trainable=True)(input_layer)
    flatten_layer = layers.Flatten()(feature_extractor)
    hidden_layer = layers.Dense(512, activation='relu')(flatten_layer)
    predictions = layers.Dense(data_classes_count, activation='softmax')(hidden_layer)
    model = Model(inputs=input_layer, outputs=predictions)

    LEARNING_RATE = 0.0001
    optimizer = optimizers.Adam(learning_rate=LEARNING_RATE)

    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
   
    return model

DEVELOPMENT_MODEL_PATH = f"{ROOT_PATH}/model/develop"
VERSION_TAG = "0.2.0-46"
BUILD_MODEL = True

if BUILD_MODEL:
  model = build_model()
else:
  model = models.load_model(f'{DEVELOPMENT_MODEL_PATH}/v{VERSION_TAG}.keras')
  
model.summary()

## 4. Train Model

In [None]:
%%time

EPOCHS = 50
CHECKPOINT_MODEL_PATH = f"{ROOT_PATH}/model/checkpoint/VERSION"

checkpoint_callback = callbacks.ModelCheckpoint(filepath=CHECKPOINT_MODEL_PATH, save_weights_only=True, verbose=1)
# Train the model
history = model.fit(training_dataset, epochs=EPOCHS, validation_data=validation_dataset, callbacks=[checkpoint_callback])


### 4.2 Training Result

In [None]:
# access metrics from training history
print("\ninitial | Training loss: {:.4f} | Validation Loss: {:.4f} || Training Accuracy: {:.2f} % | Validation Accuracy: {:.2f} %".format(history.history["loss"][0], history.history["val_loss"][0], history.history["accuracy"][0] * 100, history.history["val_accuracy"][0] * 100))
print(" latest | Training loss: {:.4f} | Validation Loss: {:.4f} || Training Accuracy: {:.2f} % | Validation Accuracy: {:.2f} %".format(history.history["loss"][-1], history.history["val_loss"][-1], history.history["accuracy"][-1] * 100, history.history["val_accuracy"][-1] * 100))

fig, axis = plt.subplots(1, 2, figsize=(10,4)) 
# plot loss
axis[0].plot(history.history["loss"])
axis[0].plot(history.history["val_loss"], color='orange')
axis[0].set_title("loss per epoch")
# plot accuracy
axis[1].plot(history.history["accuracy"])
axis[1].plot(history.history["val_accuracy"], color='orange')
axis[1].set_title("accuracy per epoch")
plt.show()

## 5. Test Model

In [None]:
# Evaluate the model
test_loss, test_acc = model.evaluate(testing_dataset)

print('Test loss:', test_loss, 'Test accuracy:', test_acc)

## 6. Visualize Result

In [None]:
from sklearn import metrics

true_labels = []
for _, labels in testing_dataset:
    true_labels.extend(tf.math.argmax(labels, axis=1).numpy())

predictions = model.predict(testing_dataset)
predicted_labels = tf.math.argmax(predictions, axis=1).numpy()

confusion_matrix = metrics.confusion_matrix(true_labels, predicted_labels)
cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=data_classes)

cm_display.plot()

# Rotate x-axis labels by 15 degrees
plt.xticks(rotation=90)
plt.show()

## 7. Save Model

In [None]:
VERSION_TAG = input("Enter Version Tag (e.g 0.0.0):") + f"-{int(test_acc * 100)}" if BUILD_MODEL else VERSION_TAG # "0.0.0"

models.save_model(model, f'{DEVELOPMENT_MODEL_PATH}/v{VERSION_TAG}', save_format="tf")