In [2]:
# /*
# * Author: Samay Pashine (samay.pashine@tufts.edu)
# * Brief: Binary image classification 3-D printing programming assignment for Internship.
# * Date: 03/26/2023
# * Modified: 03/26/2023
# * Version: 1.0
# */

# Importing the necessary libraries
from tqdm import tqdm
import numpy as np
import os
import time
import torch
import torch.nn as nn
import torch.optim
import torch.utils.data
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.io import read_image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import pandas as pd
from sklearn.metrics import f1_score
from PIL import Image

# model_names = sorted(name for name in models.__dict__ if name.islower() and not name.startswith("__") and callable(models.__dict__[name]))
# print("[INFO]. Model Available : ", model_names)

DATA = "./dataset"
LEARNING_RATE = 1e-5
BATCH_SIZE = 16
EPOCH = 10
MOMENTUM = 0.9
WEIGHT_DECAY = 1e-4
GPU = 0

In [3]:
def file_track(path):
    filepaths = dict()
    df = pd.read_csv(path)
    
    path = path.split('/')[:-1]
    path = '/'.join(path) 
    for i in range(len(df)):
        try:
            filepaths[os.path.join(path, 'images', df.iloc[i][0])] = df.iloc[i][3]
        except:
            filepaths[os.path.join(path, 'images', df.iloc[i][0])] = 1
    return filepaths


def train_val_split(filepaths, train_ratio, val_ratio):
    train_files, val_files = dict(), dict()

    for path in  filepaths.keys():
        if np.random.random() < (train_ratio/10):
            train_files[path] = filepaths[path]
        else:
            val_files[path] = filepaths[path]
    return train_files, val_files

In [4]:
class CustomImageDataset(Dataset):
    def __init__(self, img_files_dict, transform=None, target_transform=None):
        
        self.img_files = list(img_files_dict.keys())
        self.img_files_label = list(img_files_dict.values())

        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        image = read_image(self.img_files[idx]).float()
        label =  self.img_files_label[idx]
        
        if self.transform:
            image = self.transform(image)

        if self.target_transform:
            label = self.target_transform(label)
        
        return image, label


In [5]:
def validate(model, dataloader):
    model.eval()
    size = len(dataloader.dataset)

    y_true = []
    y_pred = []

    progress_bar = tqdm(range(size//BATCH_SIZE), position=1, desc='[VALIDATION]. ')
    correct = 0
    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            X = X.to(torch.device('cuda'))
            y = y.to(torch.device('cuda'))
            X = X.float()

            pred = model(X)
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

            y_true.extend(y.to(torch.device('cpu')).tolist())
            y_pred.extend(pred.argmax(1).to(torch.device('cpu')).tolist())
            
            f1 = f1_score(y_true, np.round(y_pred).astype(int))

            progress_bar.set_description(f"[VALIDATION].")
            progress_bar.set_postfix(test_acc=(100*correct/size), F1_score=f1)
            progress_bar.update()

In [6]:
def train(model, train_dataloader, val_dataloader):
    model.train(mode=True)
    size = len(train_dataloader.dataset)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY, momentum=MOMENTUM)

    for ep in range(EPOCH):
        correct = 0
        progress_bar = tqdm(range(size//BATCH_SIZE), position=0, desc='[TRAINING]. ')
        for batch, (X, y) in enumerate(train_dataloader):
            X = X.to(torch.device('cuda'))
            y = y.to(torch.device('cuda'))
            X = X.float()

            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss = loss.item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            
            progress_bar.set_description(f"EPOCH [{ep+1}/{EPOCH}]")
            progress_bar.set_postfix(loss=loss, acc=(100*correct/size))
            progress_bar.update()

        validate(model, val_dataloader)
        torch.save(model, './checkpoints/model_{}.pth'.format(time.time()))
    return model

In [7]:
def test(model, transform_func, DATA):
    submission_df = pd.read_csv(os.path.join(DATA, 'test.csv'))
    outputs_list = []
    for i in range(len(submission_df)):
        image = read_image(os.path.join(DATA, 'images', submission_df.iloc[i][0])).float()
        image = transform_func(image).cuda()
        pred = model(image.unsqueeze(0))
        output = pred.argmax(1)
        outputs_list.append(output.item())

    submission_df = submission_df.drop(['printer_id', 'print_id'], axis=1)
    submission_df['has_under_extrusion'] = outputs_list
    submission_df.to_csv("./submission.csv", index=False)

In [12]:
if GPU is not None:
    device = "cuda" if torch.cuda.is_available() else "cpu"
    if device == "cuda":
        print("[INFO]. Activating the GPU for training.")
        torch.cuda.set_device(0)
        torch.set_default_tensor_type('torch.cuda.FloatTensor')
    else:
        print("[WARN]. CPU is enabled for training.")


path = os.path.join(DATA, 'train.csv')

filepaths = file_track(os.path.join(DATA, 'train.csv'))
train_files, val_files = train_val_split(filepaths, 9, 1)

transform_func = transforms.Compose([transforms.ToPILImage(),
                                     transforms.AutoAugment(),
                                     transforms.ToTensor(),
                                     transforms.Resize(size=(480, 640), antialias=True),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

training_data, val_data = CustomImageDataset(train_files, transform=transform_func), CustomImageDataset(val_files, transform=transform_func)

model = models.resnet18(weights='ResNet18_Weights.IMAGENET1K_V1').cuda()
model_features = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(model_features, 256),
    nn.Linear(256, 128),
    nn.Linear(128, 64),
    nn.Linear(64, 8),
    nn.Linear(8, 2)
)

train_dataloader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True, generator=torch.Generator(device=device))
val_dataloader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True, generator=torch.Generator(device=device))

model = train(model, train_dataloader, val_dataloader)
test(model, transform_func, DATA)
torch.save(model, "./model/{}_final_model".format(time.time()))


[INFO]. Activating the GPU for training.


EPOCH [1/10]:   1%|          | 31/4556 [00:20<49:43,  1.52it/s, acc=0.329, loss=0.691] 