# Screw Classification

## 1) Download and import dependencies

In [None]:
pip install -r requirements.txt

In [None]:
import torch
import torchvision
import matplotlib
import sklearn
import os
import zipfile
import gdown

## 2) Download MVTec Srews Dataset

Run the cell below to download the dataset from google drive

In [None]:
def download_dataset():
  #Download dataset from google drive url
  url = 'https://drive.google.com/uc?id=11ozVs6zByFjs9viD3VIIP6qKFgjZwv9E'
  output_filename = 'screws.zip'

  # Check if dataset has already
  if os.path.exists('archive'):
    print("Dataset already exists.")
    return

  # Download the dataset
  print("Downloading dataset...")
  gdown.download(url, output=output_filename, quiet=False)

  # Extract the contents
  with zipfile.ZipFile(output_filename, 'r') as zip_ref:
    zip_ref.extractall('.')

  os.remove(output_filename) # Delete the zip file
  print("Dataset downloaded!")


download_dataset()


## 3) Prepare Dataset for Training

Run the cell below to load the dataset from the downloaded file and create batches of 32. The data loaders are then stored into the variables `train_loader` and `test_loader` for use

In [None]:
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split

def prepare_dataset(data_path: str, batch_size: int, train_split: float, test_split=None, transform=None):
  # torch.manual_seed(14)

  if not test_split:
    test_split = 1 - train_split

  # Define our transfrom
  dataset = datasets.ImageFolder(root=data_path,
                                    transform=transform,
                                    target_transform=None)

  train_size = int(train_split * len(dataset)) # compute train set size
  test_size = int(test_split * len(dataset)) # computer test set size
  train_data, test_data = random_split(dataset, [train_size, test_size]) # randomly split data into training and test sets

  # Create Dataloaders (batches)
  train_loader = DataLoader(dataset=train_data,
                                batch_size=batch_size,
                                num_workers=os.cpu_count(),
                                shuffle=True)
  test_loader = DataLoader(dataset=test_data,
                               batch_size=batch_size,
                               num_workers=os.cpu_count(),
                               shuffle=False)

  return train_loader, test_loader


if __name__ == "__main__":
  DATA_PATH = '/content/archive/train'

   #### Define hyperparameters ####
  BATCH_SIZE = 32
  TRAIN_SPLIT = 0.7
  transform = transforms.Compose([
      transforms.Resize((224, 224)), # resize image to feed into resnet
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  ])

  ############### Prepare Dataset ###############
  train_loader, test_loader = prepare_dataset(data_path=DATA_PATH, batch_size=BATCH_SIZE, train_split=TRAIN_SPLIT, transform=transform)

  print(f"Number of training batches: {len(train_loader)}") # print number of train batches
  print(f"Number of testing batches: {len(test_loader)}") # print number of test batches

## 4) Create Model and Define Evaluation Functions

#### 4.1) Define Model `screw_classifier()`

Run the cell below to create the model class and an instance named `model` (resnet 50)

In [None]:
from torchvision.models import resnet50, ResNet50_Weights

def screw_classifier():
  # torch.manual_seed(14)
  resnet = resnet50(weights=ResNet50_Weights.DEFAULT) # pre-trained performs better than untrained weights

  num_features = resnet.fc.in_features
  resnet.fc = torch.nn.Sequential(
      torch.nn.Linear(num_features, 512),
      torch.nn.ReLU(),
      torch.nn.Dropout(0.2), # regularization to prevent overfitting by randomly setting a fraction of input units to zero during training.
      torch.nn.Linear(512, 2), # output layer with 2 neurons for binary classification
  )
  return resnet

# Define our model
model = screw_classifier()

### 4.2) Define Evaluation Metrics `evaluate_model()`

**Run** the cell below to define the `evaluate_model()` function. This function will measure a evaluate a model based on the following metrics: precision, recall, accuracy and f1 score.

In [None]:
def precision(true_positives, false_positives):
  return true_positives / (true_positives + false_positives + 1e-10)

def recall(true_positives, false_negatives):
  return true_positives / (true_positives + false_negatives + 1e-10)

def accuracy(true_positives, true_negatives, false_positives, false_negatives):
  return (true_positives + true_negatives) / (true_positives + true_negatives + false_positives + false_negatives)

def f1_score(p, rec):
  return (2 * (p * rec) / (p + rec + 1e-10))

def evaluate_model(true_positives, true_negatives, false_positives, false_negatives, epoch=None):
  p = precision(true_positives, false_positives)
  rec = recall(true_positives, false_negatives)
  acc = accuracy(true_positives, true_negatives, false_positives, false_negatives)
  f1 = (2 * (p * rec) / (p + rec + 1e-10))

  if epoch == None:
    print(f"Precision: {p:3f} | recall: {rec:.3f} | accuracy: {acc:.0%} | f1_score: {f1:.3f}")
    return

  print(f"Epoch: {epoch + 1} | precision: {p:3f} | recall: {rec:.3f} | accuracy: {acc:.0%} | f1_score: {f1:.3f}")
  return p, rec, acc, f1

### 4.3) Define the `test_model()` function which will be used later

In [None]:
def test_model(model, test_loader):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model.to(device)

  all_labels = []
  all_preds = []


  with torch.no_grad():

    true_positives = 0
    true_negatives = 0
    false_positives = 0
    false_negatives = 0

    for inputs, labels in test_loader:
          inputs, labels = inputs.to(device), labels.to(device)
          outputs = model(inputs)
          _, y_preds = outputs.max(1)
          all_labels.extend(labels.cpu().numpy())
          all_preds.extend(y_preds.cpu().numpy())

          true_positives += (labels * y_preds).sum()
          true_negatives += ((1 - labels) * (1 - y_preds)).sum()
          false_positives += ((1 - labels) * y_preds).sum()
          false_negatives += (labels * (1 - y_preds)).sum()
          evaluate_model(true_positives, true_negatives, false_positives, false_negatives)
          return all_preds, all_labels

**[Optional Step]** - Run the cell below to test the resnet 50 with its pre-trained weights before training our model.

In [None]:
# Test Model
test_model(model, test_loader)

## 5) Train & Evaluate Model

### 5.1) Begin Training Below `train_model()`

Run the cell below to train the model. Feel free to edit any hyperparameters in the function call at the very bottom.

In [None]:
import torch

def train_model(model, train_loader, test_loader, epochs, learning_rate):
  # torch.manual_seed(14)
  device = "cuda" if torch.cuda.is_available() else "cpu"
  model.to(device)

  weights = torch.tensor([0.3, 0.7]) # adjusted weights for unbalanced datset
  loss_fn = torch.nn.CrossEntropyLoss(weight=weights.to(device))
  optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

  accuracies, f1_scores, precisions, recalls, num_epochs, training_loss = [], [], [], [], [], []
  num_epochs = epochs

  for epoch in range(epochs):
      model.train()  # Set the model to train mode

      ######### Training loop ########
      running_loss = 0.0
      for inputs, labels in train_loader:
          inputs, labels = inputs.to(device), labels.to(device)
          y_preds = model(inputs) # Forward pass
          loss = loss_fn(y_preds, labels) # Calculate loss
          optimizer.zero_grad() # Backward pass and optimization
          loss.backward()
          optimizer.step()
          running_loss += loss.item()

      # Calculate average training loss for this epoch
      avg_loss = running_loss / len(train_loader.dataset)
      print(f"Epoch [{epoch + 1}/{epochs}] | Total loss: {running_loss} | Avg Loss: {avg_loss:.4f}")
      training_loss.append(running_loss)

      ######## Evaluation loop ########
      model.eval()
      all_labels, all_preds = [], []
      true_positives,true_negatives, false_positives, false_negatives = 0, 0, 0, 0

      with torch.no_grad():
          for inputs, labels in test_loader:

            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, y_preds = outputs.max(dim=1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(y_preds.cpu().numpy())

            true_positives += (labels * y_preds).sum()
            true_negatives += ((1 - labels) * (1 - y_preds)).sum()
            false_positives += ((1 - labels) * y_preds).sum()
            false_negatives += (labels * (1 - y_preds)).sum()

      # Evaluate model and record metrics
      p, rec, acc, f1 = evaluate_model(true_positives, true_negatives, false_positives, false_negatives, epoch)
      accuracies.append(acc)
      f1_scores.append(f1)
      precisions.append(p)
      recalls.append(rec)


  return accuracies, f1_scores, precisions, recalls, num_epochs, all_labels, all_preds, training_loss

######### Train Model ##########
accuracies, f1_scores, precisions, recalls, num_epochs, all_labels, all_preds, training_loss = train_model(model,
                                                                                            train_loader,
                                                                                            test_loader,
                                                                                            epochs=30,
                                                                                            learning_rate=0.00005)

### 5.2) Save Model

Run the cell below to save the trained model above to colab

In [None]:
import os

# Directory path
directory = '/content/models'

# Create the directory if it doesn't exist
if not os.path.exists(directory):
    os.makedirs(directory)
    print(f"Directory '{directory}' created successfully.")
else:
    print(f"Directory '{directory}' already exists.")

# Define path and save model
PATH = '/content/models/screw_classifier_model'
torch.save(model.state_dict(), PATH)

### 5.3) Visualize Training

Run the cell below to view the plots of accuracies, f1 scores, precisions, recalls, and total loss

In [None]:
import numpy as np
import matplotlib.pyplot as plt

accuracies = np.array(torch.tensor(accuracies, device="cpu"))
f1_scores =  np.array(torch.tensor(f1_scores, device="cpu"))
precisions =  np.array(torch.tensor(precisions, device="cpu"))
recalls =  np.array(torch.tensor(recalls, device="cpu"))
total_loss =  np.array(torch.tensor(training_loss, device="cpu"))
epochs = np.arange(1, num_epochs + 1)

# Create subplots
fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(5, 1, figsize=(10, 20))

# Plot accuracy
ax1.plot(epochs, accuracies, label='Accuracy')
ax1.set_title('Accuracy vs. Epoch')
ax1.set_ylabel('Accuracy')
ax1.grid(True)

# Plot F1 score
ax2.plot(epochs, f1_scores, label='F1 Score')
ax2.set_title('F1 Score vs. Epoch')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('F1 Score')
ax2.grid(True)

# Plot Precision
ax3.plot(epochs, precisions, label='Precision')
ax3.set_title('Precision vs. Epoch')
ax3.set_xlabel('Epoch')
ax3.set_ylabel('Precision')
ax3.grid(True)

# Plot Recall
ax4.plot(epochs, recalls, label='Recall')
ax4.set_title('Recall vs. Epoch')
ax4.set_xlabel('Epoch')
ax4.set_ylabel('Recall')
ax4.grid(True)

# Plot Total Loss
ax5.plot(epochs, total_loss, label='Total Loss')
ax5.set_title('Total Loss vs. Epoch')
ax5.set_xlabel('Epoch')
ax5.set_ylabel('Total Loss')
ax5.grid(True)

# Adjust layout
plt.tight_layout()

# Show plot
plt.show()

## 6) Test Model & Results

### ***IMPORTANT***: [Optional] Skip this step if you are testing a model that was trained above. Run the cell below if you want to download and test a trained model.

In [None]:
url = 'https://drive.google.com/uc?export=download&id=1orhZKoJ80pVsYtmi1j3W8Fdt9IdqwWNs'
output_filename = 'screws_classifier_trained'

def download_trained_model():
  if os.path.exists('screws_classifier_trained'):
    print("Model already exists.")
    return

    # Download the model
  print("Downloading model...")
  gdown.download(url, output=output_filename, quiet=False)

  print("Model downloaded!")


if __name__ == "__main__":
  download_trained_model()
  MODEL_PATH = '/content/screws_classifier_trained'

#### Test & Evaluate Model

Note: Ensure functions `screw_classifier()`, `test_model()`, and `evaluate_model()` are defined from above.

In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

# Load our model
if os.path.exists('/content/models/screw_classifier_model'):
  MODEL_PATH = '/content/models/screw_classifier_model'

device = "cuda" if torch.cuda.is_available() else "cpu"
model_to_test = screw_classifier()
model_to_test.load_state_dict(torch.load(MODEL_PATH, map_location=torch.device(device)))

# Test our model
all_preds, all_labels = test_model(model_to_test, test_loader) # test_loader is the same test set as above

#### Plot Confusion Matrix

In [None]:
all_preds = torch.tensor(all_preds, device="cpu")
all_labels = torch.tensor(all_labels, device="cpu")

# Setup confusion matrix
confmat = ConfusionMatrix(num_classes=2, task='binary')
confmat_tensor = confmat(preds=all_preds, target=all_labels)

# Plot results
fig, ax = plot_confusion_matrix(
    conf_mat=confmat_tensor.numpy(),
    class_names=["undamaged", "damaged"],
    figsize=(5, 5)
)