In [1]:
from collections import Counter
from PIL import Image as im
import numpy as np
from matplotlib import pyplot as plt
import os
import keras as k
import tensorflow as tf
from keras.models import Sequential
from keras.utils import to_categorical                 
from keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, RandomRotation, RandomFlip, RandomZoom, RandomContrast
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from PIL import ImageFile
from sklearn.metrics import classification_report
from tqdm.auto import tqdm
import seaborn as sns
import tensorflow_addons as tfa

tf.config.set_soft_device_placement(True) 
ImageFile.LOAD_TRUNCATED_IMAGES = True

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# load dataset

images = []
labels = []
img_res = [224, 224]
dataset = "dataset-v2"
categories = os.listdir(os.getcwd() + "/" + dataset)
weird_count = 0
count = 0

for i in tqdm(range(len(categories))):
    for filename in tqdm(os.listdir(os.getcwd() + '/' + dataset + '/' + categories[i] + '/'), leave=False):
        image = im.open(os.getcwd() + '/' + dataset + '/' + categories[i] + '/' + filename)
        np_img = np.array(image.resize((img_res[1], img_res[0])))
        if np_img.shape == (img_res[0], img_res[1], 3):
            images.append(np_img)
            labels.append(i)
            count += 1

# split into train and test sets 
X_train, X_test, y_train, y_test = train_test_split(images, labels, train_size=0.8, random_state=1)

X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)


# reshape data into 2d arrays so it can be processed
nsamples, nx, ny, d3 = X_train.shape
X_train = X_train.reshape((nsamples,d3*nx*ny))

# Randomly oversample minority classes
X_train_ros, y_train_ros= SMOTE().fit_resample(X_train, y_train)

# Check distribution of data across classes
print(sorted(Counter(y_train_ros).items()))

# resize data back into original shape
nsamples = X_train_ros.shape[0]
X_train_ros = X_train_ros.reshape((nsamples, nx, ny, d3))

X_train = X_train_ros
y_train = y_train_ros

# some preprocessing on the data

# normalizing the data to be from 0-1 instead 1-255
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255

# format the target data to match the output data of the cnn
# so you can compare the two
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

100%|██████████| 6/6 [00:07<00:00,  1.20s/it]


[(0, 486), (1, 486), (2, 486), (3, 486), (4, 486), (5, 486)]


In [3]:
# data augmentation
with tf.device('/cpu:0'):
    data_augmentation = k.Sequential([
    RandomFlip("horizontal_and_vertical"),
    RandomRotation(0.2, input_shape=(img_res[0], img_res[1], 3)),
    RandomZoom(0.2, 0.2),
    RandomContrast(0.2),
    ])

base_model = k.applications.MobileNetV2(
    weights="imagenet",  # Load weights pre-trained on ImageNet.
    input_shape=(img_res[0], img_res[1], 3),
    include_top=False,
)  # Do not include the ImageNet classifier at the top.

base_model.trainable = False

inputs = k.Input(shape=(img_res[0], img_res[1], 3))
x = data_augmentation(inputs)
x = base_model(x)
x = k.layers.GlobalAveragePooling2D() (x)
x = k.layers.Dense(128) (x)
x = k.layers.Dropout(0.2) (x)
x = k.layers.Dense(64) (x)
x = k.layers.Dropout(0.2) (x)
x = k.layers.BatchNormalization() (x)
outputs = k.layers.Dense(len(categories), activation = 'softmax') (x)

model = k.Model(inputs, outputs)

base_learning_rate = 0.0016

model.compile(optimizer=k.optimizers.Adam(learning_rate = base_learning_rate),
              loss=k.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

print(model.summary())



Metal device set to: Apple M1 Pro

systemMemory: 32.00 GB
maxCacheSize: 10.67 GB



2023-02-27 16:21:03.586213: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-02-27 16:21:03.586362: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 sequential (Sequential)     (None, 224, 224, 3)       0         
                                                                 
 mobilenetv2_1.00_224 (Funct  (None, 7, 7, 1280)       2257984   
 ional)                                                          
                                                                 
 global_average_pooling2d (G  (None, 1280)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dense (Dense)               (None, 128)               163968    
                                                                 
 dropout (Dropout)           (None, 128)               0     

Normal model:

In [8]:
# data augmentation
with tf.device('/cpu:0'):
    data_augmentation_light = k.Sequential([
    RandomRotation(0.1, input_shape=(img_res[0], img_res[1], 3)),
    ])

    data_augmentation_heavy = k.Sequential([
    RandomFlip("horizontal_and_vertical"),
    RandomRotation(0.2, input_shape=(img_res[0], img_res[1], 3)),
    RandomZoom(0.2, 0.2),
    #RandomContrast(0.2),
    ])

base_model = k.applications.MobileNetV2(
    weights="imagenet",  # Load weights pre-trained on ImageNet.
    input_shape=(img_res[0], img_res[1], 3),
    include_top=False,
)  # Do not include the ImageNet classifier at the top.

base_model.trainable = False

inputs = k.Input(shape=(img_res[0], img_res[1], 3))
x = data_augmentation_heavy(inputs)
x = base_model(x)
x = k.layers.GlobalAveragePooling2D() (x)
x = k.layers.Dense(128) (x)
x = k.layers.Dropout(0.2) (x)
x = k.layers.Dense(64) (x)
x = k.layers.Dropout(0.2) (x)
x = k.layers.BatchNormalization() (x)
outputs = k.layers.Dense(len(categories), activation = 'softmax') (x)

model = k.Model(inputs, outputs)

learning_rate = 0.0015

model.compile(optimizer=k.optimizers.Adam(learning_rate = learning_rate),
                loss=k.losses.CategoricalCrossentropy(),
                metrics=['accuracy'])

Model with keras tuning:

In [15]:
import keras_tuner as kt

# data augmentation
with tf.device('/cpu:0'):
    data_augmentation_light = k.Sequential([
    RandomRotation(0.1, input_shape=(img_res[0], img_res[1], 3)),
    ])

    data_augmentation_heavy = k.Sequential([
    RandomFlip("horizontal_and_vertical"),
    RandomRotation(0.2, input_shape=(img_res[0], img_res[1], 3)),
    RandomZoom(0.2, 0.2),
    #RandomContrast(0.2),
    ])

base_model = k.applications.MobileNetV2(
    weights="imagenet",  # Load weights pre-trained on ImageNet.
    input_shape=(img_res[0], img_res[1], 3),
    include_top=False,
)  # Do not include the ImageNet classifier at the top.

base_model.trainable = False


def build_model(hp):
    inputs = k.Input(shape=(img_res[0], img_res[1], 3))
    if hp.Boolean("data_aug_heavy"):
        x = data_augmentation_heavy(inputs)
    else:
        x = data_augmentation_light(inputs)
    x = base_model(x)
    x = k.layers.GlobalAveragePooling2D() (x)
    x = k.layers.Dense(units=hp.Int("dense1", min_value=32, max_value=1024, step=32),) (x)
    if hp.Boolean("dropout1"):
        x = k.layers.Dropout(0.2) (x)
    x = k.layers.Dense(units=hp.Int("dense2", min_value=32, max_value=512, step=32),) (x)
    if hp.Boolean("dropout2"):
        x = k.layers.Dropout(0.2) (x)
    if hp.Boolean("batchnorm"):
        x = k.layers.BatchNormalization() (x)
    outputs = k.layers.Dense(len(categories), activation = 'softmax') (x)

    model = k.Model(inputs, outputs)

    learning_rate = hp.Float("lr", min_value=1e-4, max_value=1e-2, sampling="log")

    model.compile(optimizer=k.optimizers.Adam(learning_rate = learning_rate),
                  loss=k.losses.CategoricalCrossentropy(),
                  metrics=['accuracy'])
    return model

tuner = kt.RandomSearch(
    hypermodel=build_model,
    objective="val_accuracy",
    max_trials=10,
    executions_per_trial=3,
    overwrite=True,
    directory="keras_tuner-model",
    project_name="waste_classification_model",
)

tuner.search_space_summary()


Search space summary
Default search space size: 7
data_aug_heavy (Boolean)
{'default': False, 'conditions': []}
dense1 (Int)
{'default': None, 'conditions': [], 'min_value': 32, 'max_value': 1024, 'step': 32, 'sampling': None}
dropout1 (Boolean)
{'default': False, 'conditions': []}
dense2 (Int)
{'default': None, 'conditions': [], 'min_value': 32, 'max_value': 512, 'step': 32, 'sampling': None}
dropout2 (Boolean)
{'default': False, 'conditions': []}
batchnorm (Boolean)
{'default': False, 'conditions': []}
lr (Float)
{'default': 0.0001, 'conditions': [], 'min_value': 0.0001, 'max_value': 0.01, 'step': None, 'sampling': 'log'}


In [12]:
tuner.search(X_train, y_train, epochs=3, validation_split = 0.1, batch_size = 8)


Trial 10 Complete [02h 17m 22s]
val_accuracy: 0.39726026852925617

Best val_accuracy So Far: 0.4383561611175537
Total elapsed time: 03h 06m 18s
INFO:tensorflow:Oracle triggered exit


In [13]:
best_hps = tuner.get_best_hyperparameters(5)
model = build_model(best_hps[0])
print(model.summary())

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 sequential_2 (Sequential)   (None, 224, 224, 3)       0         
                                                                 
 mobilenetv2_1.00_224 (Funct  (None, 7, 7, 1280)       2257984   
 ional)                                                          
                                                                 
 global_average_pooling2d_1   (None, 1280)             0         
 (GlobalAveragePooling2D)                                        
                                                                 
 dense_3 (Dense)             (None, 128)               163968    
                                                                 
 dense_4 (Dense)             (None, 128)               1651

In [16]:
epoch_counter = 1

# save the data to a file that can later be converted to the CoreML format
class SaveModelCallback(k.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        global epoch_counter
        print("Saving model...")
        self.model.save("model_epoch_" + str(epoch_counter) + ".h5")
        epoch_counter += 1
        
# actually running the cnn and fitting/training neural network on the data
# batch size = after 64 samples, make a small adjustment
# epoch = every time all the data is run through, make a big adjustment
with tf.device("/gpu:0"):
    model.compile(optimizer='adam', 
                loss=k.losses.CategoricalCrossentropy(),
                metrics=[k.metrics.CategoricalCrossentropy(name='categorical_crossentropy'),'accuracy'])           
    history = model.fit(X_train, y_train, epochs=5, batch_size=8, validation_split=0.1, shuffle = True, callbacks=[SaveModelCallback()])


# evalutating the loss/accuracy of the model on the test set
loss, accuracy = model.evaluate(X_test, y_test)
print("loss / crossentropy / accuracy:")
print(loss)
print(accuracy)

"""
IF MODEL DOESNT KNOW, RETURN GARBAGE AND SAY WE'RE NOT QUITE SURE
Resource: https://www.ridwell.com/
"""

acc = history.history['accuracy'] # get history report of the model

val_acc = history.history['val_accuracy'] # get history of the validation set

loss = history.history['loss'] #get the history of the lossses recorded on the train set
val_loss = history.history['val_loss'] #get the history of the lossses recorded on the validation set

y_pred = model.predict(X_test)

y_pred = (y_pred > 0.5) 

print(classification_report(y_test, y_pred, target_names=categories, digits=4))



# visualize 24 random results
sns.set(font_scale=1)
index = np.random.choice(np.arange(len(X_test)), 12, replace=False)    # pick 24 random smamples
figure, axes = plt.subplots(nrows=3, ncols=4, figsize=(16,9))           # set dimensions
for item in zip(axes.ravel(), X_test[index], y_test[index], y_pred[index]):          # put each sample into a "slot" in the table
    axes, image, target, predict = item
    axes.imshow(image, cmap=plt.cm.gray_r)
    axes.set_xticks([])
    axes.set_yticks([])
    print(target)
    axes.set_title("label: " + categories[np.argmax(target)] + '''
    predicted: ''' + categories[np.argmax(predict)])
plt.tight_layout()
plt.show()             

Epoch 1/5


2023-02-27 22:17:52.664119: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:113] Plugin optimizer for device_type GPU is enabled.




2023-02-27 22:18:08.131988: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:113] Plugin optimizer for device_type GPU is enabled.


Saving model...
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


ValueError: too many values to unpack (expected 2)