In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import tensorflow as tf
import pathlib

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

In [None]:
# A script for data augmentation and formatting the data directory
# for use with the following data loaders

# Unzip kaggle data from https://www.kaggle.com/salmaneunus/mechanical-tools-dataset into a directory called 'data' and run this script from the parent directory
# | computervision_final
# | -> final_submission.ipynb
# | -> data
#    | -> Mechanical Tools
#    | -> Annontated.csv
#    | -> etc.

cwd = os.getcwd()

os.system('mv data old_data')
os.system('mkdir data')
os.system('cp -r old_data/Mechanical\ Tools\ Image\ dataset/Mechanical\ Tools\ Image\ dataset/Hammer old_data/Mechanical\ Tools\ Image\ dataset/Mechanical\ Tools\ Image\ dataset/Screw\ Driver old_data/Mechanical\ Tools\ Image\ dataset/Mechanical\ Tools\ Image\ dataset/Wrench data')
os.system('mv data/Hammer data/hammer')
os.system('mv data/Screw\ Driver data/screwdriver')
os.system('mv data/Wrench data/wrench')

count = 1
for path in os.listdir(f'{cwd}/data'):
    for file in os.listdir(f'{cwd}/data/{path}'):
        os.rename(f'{cwd}/data/{path}/{file}', f'{cwd}/data/{path}/{path}_image_{count}.jpg')
        count += 1
    count = 1

In [None]:
# One method we used for loading data

data_dir = pathlib.Path(f'{os.getcwd()}/data/train')
batch_size = 32
imgh = 120 # height
imgw = 120 # width

# training data
train_ds = keras.preprocessing.image_dataset_from_directory(
    data_dir, validation_split=0.2, subset='training',seed=1,
    image_size=(imgh, imgw),
    batch_size=batch_size
)

# testing data
val_ds = keras.preprocessing.image_dataset_from_directory(
    data_dir, validation_split=0.2, subset='validation',seed=1,
    image_size=(imgh, imgw),
    batch_size=batch_size
)

classes = train_ds.class_names

In [None]:
# Displaying a sample of our training data

plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i+10].numpy().astype("uint8"))
    plt.title(classes[labels[i+10]])
    plt.axis("off")

In [None]:
# Our implementation of the Canny edge detector

def auto_canny(image_a, sigma=0.33):
    # compute the median of the single channel pixel intensities
    v = np.median(image_a)
    # apply automatic Canny edge detection using the computed median
    lower = int(max(0, (1.0 - sigma) * v))
    upper = int(min(255, (1.0 + sigma) * v))
    edged = cv2.Canny(image, lower, upper)
    # return the edged image
    return edged


W = 255
i = 0
for foldername in os.listdir(path):
    for filename in os.listdir(path + '/' + foldername):
        oriimg = Image.open(path + '/' + foldername + '/' + filename)
        img1 = oriimg.convert('RGB')
        img2 = img1.resize((int(IMG_SIZ), int(IMG_SIZ)), Image.LANCZOS)  # resize image
        gray = cv2.cvtColor(np.float32(img2), cv2.COLOR_RGB2GRAY)  # rgb to grayscale
        img_b = cv2.GaussianBlur(gray, (3, 3), 0)  # Gaussian blur
        img3 = auto_canny(np.uint8(img_b), sigma=0.33)  # Canny edge detector
        cv2.imwrite(path + '/' + foldername + '/' + filename, img3)  # replace original figure
    print(foldername)


In [None]:
# Our first baseline implementation of a CNN

num_classes = 3

# Randomly flip, rotate, and zoom examples before training
augment_layers = Sequential([
    layers.experimental.preprocessing.RandomFlip("horizontal",
        input_shape=(imgh, imgw, 3)),
    layers.experimental.preprocessing.RandomRotation(0.1),
    layers.experimental.preprocessing.RandomZoom(0.1),
])

model = Sequential([
    augment_layers,
    layers.experimental.preprocessing.Rescaling(1./255, input_shape=(imgh, imgw, 3)),
    layers.Conv2D(16, 3, padding='same', activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.2),
    layers.Conv2D(32, 3, padding='same', activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.2),
    layers.Conv2D(64, 3, padding='same', activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.2),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dense(num_classes, activation='softmax')
])

In [None]:
# A script for data augmentation and formatting the data directory
# for use with the following data loaders

datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.3)  #
for foldername in os.listdir('data/train/'):  # read original images in the path:  data/train/Hammer,
    # data/train/Screw Driver and data/train/Wrench
    for file_name in os.listdir('data/train' + '/' + foldername):
        img = load_img('data/train' + '/' + foldername + '/' + file_name)
        x = img_to_array(img)
        x = x.reshape((1,) + x.shape)

        i = 1
        # j = 1
        for batch in datagen.flow(x,
                                  batch_size=32,
                                  save_to_dir='data/train_b' + '/' + foldername,  # save new images to the path: data/train_b/Hammer
                                  save_prefix=foldername + '_b' + str(i),
                                  save_format='jpg'):
            i += 1
            if i > 20:  # stop if 20 images are generated from 1 original image
                break

In [None]:
# A different method we used for loading data that includes the preprocessing
# Requires a slightly different directory structure

batch_size = 32
imgh = 224 # height
imgw = 224 # width

# Instances of a Keras data generator class
train_datagen = ImageDataGenerator(
    rescale=1. / 255,             # rescale the image
    rotation_range=20,            # random rotate up to 20 degrees
    brightness_range=[0.2, 1.0],  # random brightness shift
    horizontal_flip=True,         # horzontally flip
    vertical_flip=True,           # vertically flip
    zoom_range=0.2)               # random zoom

valid_datagen = ImageDataGenerator(
    rescale=1. / 255,
)

# Loading in data using the generators
train_ds = train_datagen.flow_from_directory(
    'data/train',
    target_size=(imgh, imgh),
    color_mode="rgb",
    batch_size=32,
    class_mode="categorical",
    shuffle=True,
    seed=42)

val_ds = valid_datagen.flow_from_directory(
    'data/validation',
    target_size=(imgh, imgh),
    color_mode="rgb",
    batch_size=32,
    class_mode="categorical",
    shuffle=True,
    seed=42)

In [None]:
# One example of a different model structure that we experimented with
def getModel(input_shape):
    model = Sequential()
    model.add(Conv2D(16, (3, 3), padding="same", activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(32, (3, 3), padding="same", activation='relu'))
    model.add(Conv2D(64, (3, 3), padding="same", activation='relu'))
    model.add(Conv2D(32, (3, 3), padding="same", activation='relu'))
    model.add(Conv2D(64, (3, 3), padding="same", activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(32, (3, 3), padding="same", activation='relu'))
    model.add(Conv2D(64, (3, 3), padding="same", activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Conv2D(128, (3, 3), padding="same", activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))

    model.add(Flatten())
    model.add(Dense(1024, activation='relu'))
    model.add(Dense(512, activation='relu'))
    model.add(Dense(3, activation='softmax'))
    
    return model

model = getModel(train_ds.shape[1:])

In [None]:
# Code to compile the model
# We manually set learning rate to enable experimenting with different values
# We used both categorical and sparse categorical cross-entropy loss
# Sparse returns classes instead of probability, less computation saves training time

opt = keras.optimizers.Adam(learning_rate=0.001)
model.compile(
    optimizer=opt,
#     loss="categorical_crossentropy",
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
# Setting the callbacks for training
# EarlyStopping monitors validation loss and will stop the model if it does not
#   continue to decrease for <patience> epochs. Set more or less depending on expected 
#   number of epochs. Prevents overfitting from occuring.
# Checkpointing to save the model weights in case of an issue, to enable training in 
#    multiple sessions, and loading to test
# Write logs for debugging

patience = 30
my_callbacks = [
    EarlyStopping(patience=patience),
    ModelCheckpoint(filepath='model.{epoch:02d}-{val_loss:.2f}.h5'),
    TensorBoard(log_dir='./logs'),
] 

In [None]:
# Train the model

epochs = 400
history = model.fit(train_generator, epochs=epochs, validation_data=validation_generator, shuffle=True, callbacks=my_callbacks)

In [None]:
# Our implementation of a classifier using the VGG16 model for feature extraction

# load pre-trained model
base_model = VGG16(weights='imagenet', include_top=False, pooling=None, input_shape=(224, 224, 3),
                   classes=3)

# freeze pre-trained model
for layer in base_model.layers:
    layer.trainable = False

# Add our dense layer to new model
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(3, activation='softmax')(x)

# new model
model = Model(inputs=base_model.input, outputs=predictions)

# optimizer, learning rate and compile
opt = Adam(learning_rate=0.0001)
model.compile(
    optimizer=opt,
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

# callbacks, save the model with highest accuracy
tbCallBack = TensorBoard(log_dir='./logs/',
                         histogram_freq=0,
                         batch_size=16,
                         write_graph=True,
                         write_grads=True,
                         write_images=True,
                         embeddings_freq=0,
                         embeddings_layer_names=None,
                         embeddings_metadata=None)

checkpoint = ModelCheckpoint(filepath='vgg16_edge.h5', monitor='val_accuracy', mode='auto', save_best_only='True')

# train new model
history = model.fit_generator(
    generator=train_generator,
    validation_data=validation_generator,
    epochs=150,
    callbacks=[tbCallBack, checkpoint],
)

In [None]:
# After traing a model and saving the metrics history, we plot 
# training and validation accuracy and loss

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_range = range(epochs) # must be adjusted manually if training stops early

plt.figure(figsize=(16, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
# After we have trained a model and saved it to disk, we load it 
#    and test it against our test set

model_path = "final.h5"
# read model from file
trained_model = load_model(model_path)

# Load in testing images from directory
# Resize only, no preprocessing done
test_datagen = ImageDataGenerator()
test_generator = test_datagen.flow_from_directory(
    'data/test',
    target_size=(imgh, imgh),
    color_mode="rgb",
    batch_size=1,
    class_mode="sparse",
    shuffle=False,
)

# displaying the number of test images
filenames = test_generator.filenames
nb_samples = len(filenames)
print(nb_samples)

# make predictions on the test images
# returned array is of size (3, number of examples)
#    where each example has 3 float values representing
#    their probability of being in the corresponding class
Y_pred = trained_model.predict(test_generator)

# create a vector of class predictions in range {0, 1, 2}
y_pred = np.argmax(Y_pred, axis=1)

# display a confusion matrix of our test results
print('Confusion Matrix')
print(confusion_matrix(test_generator.classes, y_pred))
print('Classification Report')
target_names = ['hammer', 'screwdriver', 'wrench']

# display a classification report 
print(classification_report(test_generator.classes, y_pred, target_names=target_names))


# calculate the ROC and AUC metrics
y_test = test_generator.classes
macro_roc_auc_ovo = sk.metrics.roc_auc_score(y_test, Y_pred, multi_class="ovo", average="macro")
weighted_roc_auc_ovo = sk.metrics.roc_auc_score(y_test, Y_pred, multi_class="ovo", average="weighted")
macro_roc_auc_ovr = sk.metrics.roc_auc_score(y_test, Y_pred, multi_class="ovr", average="macro")
weighted_roc_auc_ovr = sk.metrics.roc_auc_score(y_test, Y_pred, multi_class="ovr", average="weighted")
print("One-vs-One ROC AUC scores:\n{:.6f} (macro),\n{:.6f} "
      "(weighted by prevalence)"
      .format(macro_roc_auc_ovo, weighted_roc_auc_ovo))
print("One-vs-Rest ROC AUC scores:\n{:.6f} (macro),\n{:.6f} "
      "(weighted by prevalence)"
      .format(macro_roc_auc_ovr, weighted_roc_auc_ovr))

# For the three class probabilities for each example,
#    set the index with the highest probability to 1,
#    the rest set to 0
tmp = []
for i in y_test:
    if i == 0: tmp.append([1, 0, 0])
    elif i == 1: tmp.append([0, 1, 0])        
    elif i == 2: tmp.append([0, 0, 1])
y_test = np.array(tmp)

# compute data for ROC curve plot
n_classes = 3
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], Y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), Y_pred.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

plt.figure()
lw = 2
plt.plot(fpr[2], tpr[2], color='darkorange',
         lw=lw, label='ROC curve (area = %0.2f)' % roc_auc[2])
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.show()