In [None]:
import os
import tensorflow as tf
import cv2
import imghdr

import numpy as np
from matplotlib import pyplot as plt

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, SpatialDropout2D, GlobalAveragePooling2D
from keras.applications import MobileNetV2, EfficientNetV2B0 # pretrained model

In [None]:
data_dir = 'Detect_solar_dust'

image_exts = ['jpeg','jpg', 'bmp', 'png']

# check all images are openable
for image_class in os.listdir(data_dir):
    for image in os.listdir(os.path.join(data_dir, image_class)):
        image_path = os.path.join(data_dir, image_class, image)
        try:
            img = cv2.imread(image_path)
            tip = imghdr.what(image_path)
            if tip not in image_exts: 
                print('Image not in ext list {}'.format(image_path))
                os.remove(image_path)
        except Exception as e: 
            print('Failed to load image {}'.format(image_path))
            # os.remove(image_path)

In [None]:
np.random.seed(0) # Add random seed of training for reproducibility

In [None]:
# TODO: explore params
data = tf.keras.utils.image_dataset_from_directory(data_dir, image_size=(224, 224))

In [None]:
labels = []
for images, labels_batch in data:
  labels.extend(labels_batch.numpy())

plt.hist(labels)
plt.xlabel('Class Name')
plt.ylabel('Count')
plt.title('Distribution of image classes')

# Set the x-axis tick labels to the class names
plt.xticks(ticks=range(len(data.class_names)), labels=data.class_names)
plt.show()

In [None]:
# Scaling the data to optimize learning time
# from 0-255 (RGB) to 0-1
data = data.map(lambda x,y: (x/255, y))

# used to get batches of our data
batch = data.as_numpy_iterator().next()

In [None]:
# Labels of a batch, 1 representing dirty, 0 clean
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, img in enumerate(batch[0][:4]):
    ax[idx].imshow(img)
    ax[idx].title.set_text(batch[1][idx])

In [None]:
len(data)

In [None]:
train_size = int(len(data)*.7) # To train the model
val_size = int(len(data)*.2) # To finetune the model
test_size = int(len(data)*.1) # To evaluate the model

In [None]:
train = data.skip(val_size + test_size).take(train_size)
val = data.take(val_size)
test = data.skip(val_size).take(test_size)

In [None]:
hist = []

In [None]:
model = Sequential(
    [
        Conv2D(16, (3,3), 1, activation="leaky_relu", input_shape=(224,224, 3)),
        MaxPooling2D(), # Takes maximum value out of an area (default is (2,2))
        # SpatialDropout2D(0.2),
        # BatchNormalization(),
        
        Flatten(), # Condensing into a single dimension
        
        Dense(128, activation="leaky_relu"),
        Dropout(0.5),
        Dense(1, activation="sigmoid")
    ]
)

In [None]:
model.compile('adam', loss=tf.losses.BinaryCrossentropy(), metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
hist.append(('CNN', model.fit(train, epochs=26, validation_data=val))) 

In [None]:
# Transfer learning using a pre-trained MobileNetV2 network
mobilenet_pretrained_model = Sequential([
    # base
    MobileNetV2(input_shape=(224, 224, 3),include_top=False, weights='imagenet'),
    Flatten(),
    Dropout(0.5),
    Dense(1, activation='sigmoid'),   
])
# Weights are used from imagenet, don't re-train
mobilenet_pretrained_model.layers[0].trainable= False

# show model summary
mobilenet_pretrained_model.summary()

In [None]:
mobilenet_pretrained_model.compile('adam', loss=tf.losses.BinaryCrossentropy(), metrics=['accuracy'])

In [None]:
mobilenet_pretrained_model.summary()

In [None]:
hist.append(('Pretrained MobileNet', mobilenet_pretrained_model.fit(train, epochs=17, validation_data=val))) 

In [None]:
hist_elements = list(hist[0][1].history.keys())
hist_elements

In [None]:
for i in hist_elements:
    print(i)
    for j in hist:
        plt.plot(range(len(j[1].history[i])), j[1].history[i], label=j[0])
        plt.legend()
    plt.show()

In [None]:
from tensorflow.keras.metrics import Precision, Recall, BinaryAccuracy

pre = Precision()
re = Recall()
acc = BinaryAccuracy()

for batch in test.as_numpy_iterator(): 
    X, y = batch
    yhat = model.predict(X)
    pre.update_state(y, yhat)
    re.update_state(y, yhat)
    acc.update_state(y, yhat)

print(pre.result(), re.result(), acc.result())

In [None]:
from tensorflow.keras.metrics import Precision, Recall, BinaryAccuracy

pre = Precision()
re = Recall()
acc = BinaryAccuracy()

for batch in test.as_numpy_iterator(): 
    X, y = batch
    yhat = mobilenet_pretrained_model.predict(X)
    pre.update_state(y, yhat)
    re.update_state(y, yhat)
    acc.update_state(y, yhat)

print(pre.result(), re.result(), acc.result())