In [None]:
%tensorflow_version 2.x
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
  print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
except ValueError:
  raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')

tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
tpu_strategy = tf.distribute.experimental.TPUStrategy(tpu)

In [None]:
# importing required libraries
import tensorflow as tf 
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import cv2
from skimage import color

import sklearn
from sklearn.model_selection import train_test_split

from google.colab import drive
drive.mount('/content/drive')

In [None]:
def rescale(i, image, width, height):
  new_im = cv2.resize(image,(width,height))
  # normalize values to be between -1 and 1
  # new_im = (new_im - 127.5 )/127.5
  # normalize values to be between 0 and 1
  # new_im = new_im/255
  return new_im

In [None]:
# We first load the necessary libraries, the dataset and reshape its dimensons

# FOR INITIAL RAW IMAGES
faulty_dir = '/content/drive/Shareddrives/Senior Thesis/Casting_image/casting_512x512/casting_512x512/def_front/'
ok_dir = '/content/drive/Shareddrives/Senior Thesis/Casting_image/casting_512x512/casting_512x512/ok_front/'

# FOR AUGMENTED IMAGES
#faulty_dir = '/content/drive/Shareddrives/Senior Thesis/Casting_image/casting_data/casting_data/images/def_front/'
#ok_dir = '/content/drive/Shareddrives/Senior Thesis/Casting_image/casting_data/casting_data/images/ok_front/'
directories = (faulty_dir, ok_dir)

# check the total number of images we have
count = 0
for direct in directories:
  for file in os.listdir(direct):
    count += 1

# set the values of the X and y arrays
num_images = count
input_shape = (80, 80, 3)
height = input_shape[0]
width = input_shape[1]
channels = input_shape[2]

X = np.zeros((num_images,height,width,channels))
y = np.zeros((num_images))

# populate the arrays
i = 0
j = -1
for direct in directories:
  j+=1
  for file in os.listdir(direct):
    path = (direct+file)
    im = plt.imread(path)
    scaled_im = rescale(i, im, width, height)
    X[i,:,:,:] = scaled_im
    if j == 0:
      # 1 for faulty
      y[i] = 1
    else:
      # 0 for ok image
      y[i] = 0
    i+=1

# one-hot encoding using function from Keras
# y = to_categorical(y)

In [None]:
# use Keras function to split the arrays into training, test, and validation (70%, 15%, 15%)
X_train, X_valid_test, y_train, y_valid_test = train_test_split(X, y, test_size = 0.3, random_state=1)
X_valid, X_test, y_valid, y_test = train_test_split(X_valid_test, y_valid_test, test_size = 0.5, random_state=1)

print(X_train.shape, X_valid.shape, X_test.shape)
print(type(X_train),type(y_train))

In [None]:
f, axarr = plt.subplots(3,3,figsize=(5,5))
for i in range(9):
  row = int(i/3)
  col = int(i%3)
  axarr[row,col].imshow(X[i,:,:,0], cmap='gray')
plt.show()

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch.autograd import Variable
from Networks_Implementation import Classifier_CNN
model = Classifier_CNN()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.0001)
loss_fn = torch.nn.CrossEntropyLoss()

In [None]:
  BATCH_SIZE = 128
  X_train = X_train.transpose(X_train,(2,0,1))
  X_train = torch.Tensor(X_train)
  y_train = torch.Tensor(y_train).type(torch.FloatTensor)
  dataset_train = TensorDataset(X_train,y_train)

  X_valid = X_valid.transpose(X_valid,(2,0,1))
  X_valid = torch.Tensor(X_valid)
  y_valid = torch.Tensor(y_valid).type(torch.FloatTensor)
  dataset_valid = TensorDataset(X_valid,y_valid)

  train_loader = DataLoader(dataset_train, batch_size=BATCH_SIZE, shuffle=True)
  valid_loader = DataLoader(dataset_valid, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
def saveModel():
    path = "/content/drive/Shareddrives/Senior Thesis/Models/Classifier.pth"
    torch.save(model.state_dict(), path)

def testAccuracy(dataloader):
    model.eval()
    accuracy = 0.0
    total = 0.0
    
    with torch.no_grad():
        for data in dataloader:
            images, labels = data
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest probability will be our prediction
            predicted = torch.squeeze(torch.round(outputs))
            total += labels.shape[0]
            accuracy += (predicted == labels).sum().item()
    
    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy) 

def train(num_epochs):
    
    best_accuracy = 0.0
    model.train()

    # Define your execution device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("The model will be running on", device, "device")
    # Convert model parameters and buffers to CPU or Cuda
    model.to(device)

    for epoch in range(num_epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        running_acc = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):
            
            # get the inputs
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))
            labels = labels.unsqueeze(1)
            # zero the parameter gradients
            optimizer.zero_grad()
            # predict classes using images from the training set
            outputs = model(images)
            # compute the loss based on model output and real labels
            loss = loss_fn(outputs, labels)
            # backpropagate the loss
            loss.backward()
            # adjust parameters based on the calculated gradients
            optimizer.step()

            # Let's print statistics for every 1,000 images
            running_loss += loss.item()     # extract the loss value
            if i % 10 == 0:    
                # print every 10
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 1000))
                # zero the loss
                running_loss = 0.0

        # Compute and print the average accuracy fo this epoch when tested over all 10000 test images
        accuracy = testAccuracy(valid_loader)
        print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))
        
        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            saveModel()
            best_accuracy = accuracy

In [None]:
train(20)

In [None]:
# plot training and validation losses and accuracy versus number of epochs
accuracy = model.history.history['accuracy']
val_accuracy = model.history.history['val_accuracy']
loss = model.history.history['loss']
val_loss = model.history.history['val_loss']
epochs = range(len(accuracy))
plt.plot(epochs, accuracy, 'b', label='Training accuracy')
plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.legend()
plt.xlim(0,10)
plt.ylim(0,1.0)
plt.figure()
plt.plot(epochs, loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

In [None]:
metrics = np.zeros((2,2))

metrics[0,0] = model.history.history['true_positives'][9]
metrics[0,1] = model.history.history['false_positives'][9]
metrics[1,0] = model.history.history['false_negatives'][9]
metrics[1,1] = model.history.history['true_negatives'][9]
print(metrics)

In [None]:
model.eval(x = X_test, y = y_test)