# Knot ID

## Setup

### Libraries

In [1]:
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchvision.datasets.vision import VisionDataset
from torch.utils.data import DataLoader
from google.colab import drive
from PIL import Image

### Google Drive setup

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


### GPU setup

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### Constants

In [None]:
batch_size = 1
img_size = 128
num_epochs = 10
learning_rate = 1e-4
PROJECT_ROOT = '/content/drive/MyDrive/datasets/knot-id'
DATA_ROOT_PROCESSED = os.path.join(PROJECT_ROOT, '/data/processed')
DATA_DIR = f'10Knots_{img_size}'
DATA_PATH = os.path.join(DATA_ROOT_PROCESSED, DATA_DIR)
CLASSES = [
	'Alpine Butterfly Knot',
	'Bowline Knot',
	'Clove Hitch',
	'Figure-8 Knot',
	'Figure-8 Loop',
	'Fisherman\'s Knot',
	'Flemish Bend',
	'Overhand Knot',
	'Reef Knot',
	'Slip Knot'
]

## Model

### Dataset

In [None]:
class Knots(VisionDataset):

	def __init__(self, split, transform=None):
		self.transform = transform
		self.filepaths = []
		self.targets = []
		self.split = split

		super().__init__(DATA_ROOT_PROCESSED, transforms=None, transform=transform)
		
		for idx, class_name in enumerate(CLASSES):
			class_path = os.path.join(DATA_PATH, self.split, class_name)
			for file in os.listdir(class_path):
				if file != '.DS_Store':
					self.filepaths.append(os.path.join(class_path, file))
					self.targets.append(idx)

	def __len__(self):
		return len(self.filepaths)
	
	def __getitem__(self, idx):
		img = Image.open(self.filepaths[idx])
		target = self.targets[idx]

		if self.transform is not None:
			img = self.transform(img)

		return img, target

### Model

In [None]:
class KnotID(nn.Module):

	def __init__(self):
		super(KnotID, self).__init__()

		# Layers for learning features
		# input shape (1, 3, 128, 128)
		self.feature_learning = nn.Sequential(
			nn.Conv2d(3, 10, 3, 1, 1),      # (1, 10, 128, 128)
			nn.ReLU(),
			nn.MaxPool2d(2),                # (1, 10, 64, 64)
			nn.Conv2d(10, 20, 3, 1, 1),     # (1, 20, 64, 64)
			nn.ReLU(),
			nn.MaxPool2d(2),				# (1, 20, 32, 32)
			nn.Conv2d(20, 40, 3, 1, 1),     # (1, 40, 32, 32)
			nn.ReLU(),
			nn.MaxPool2d(2),				# (1, 40, 16, 16)
			nn.Conv2d(40, 80, 3, 1, 1),     # (1, 80, 16, 16)
			nn.ReLU(),
			nn.MaxPool2d(2),				# (1, 80, 8, 8)
			nn.Conv2d(80, 160, 3, 1, 1),    # (1, 160, 8, 8)
			nn.ReLU(),
			nn.MaxPool2d(2),				# (1, 160, 4, 4)
		)

		# Layers for classifying images
		self.classification = nn.Sequential(
			nn.Flatten(1),                  # (1, 2560)
			nn.Linear(2560, 768),        	# (1, 768)
			nn.ReLU(),
			nn.Linear(768, 256),        	# (1, 256)
			nn.ReLU(),
			nn.Linear(256, 64),        		# (1, 64)
			nn.ReLU(),
			nn.Linear(64, 10),        		# (1, 10)
		)

	def forward(self, x):
		x = self.feature_learning(x)
		x = self.classification(x)
		return x

	def save(self, model_id, data, train_time):
		df = pd.DataFrame(data)
		df.to_csv(os.path.join(PROJECT_ROOT, f'/models/tables/knot-id_{model_id:04}.csv'))

		save_plot(
			model_id,
			data['train_losses'],
			data['test_losses'],
			data['train_accuracies'],
			data['test_accuracies'],
			data['epochs']
		)

		model_summary = str(summary(self, input_size=(1,3,128,128), verbose=0))

		with open(os.path.join(PROJECT_ROOT, f'/models/summaries/knot-id_{model_id:04}.txt'), 'w') as file:
			file.write(f"img_size: {data['img_size']}\n")
			file.write(f"batch_size: {data['batch_size']}\n")
			file.write(f"learning_rate: {data['learning_rate']}\n")
			file.write(f"num_epochs: {data['num_epochs']}\n\n")
			file.write(f'training time: {train_time:}\n\n')
			file.write(f'{str(self)}\n\n')
			file.write(model_summary)

		torch.save(self.state_dict(), os.path.join(PROJECT_ROOT, f'/models/serialized/knot-id_{model_id:04}.pt'))

## Train and test functions

In [None]:
def train(model, train_loader, loss_fn, optimizer, epoch):
    model.train()

    for batch_idx, (images, targets) in enumerate(train_loader):
        images = images.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        output = model(images)
        loss = loss_fn(output, targets)
        loss.backward()
        optimizer.step()

        if batch_idx % 100 == 0:
            print(
                f'Epoch {epoch}: [{batch_idx*len(images)}/{len(train_loader.dataset)}]'
                f'Loss: {loss.item():.8f}'
            )

In [None]:
def test(model, test_loader, loss_fn, epoch):
    model.eval()
    test_loss = 0
    test_accuracy = 0
    correct = 0
    
    with torch.no_grad():
        for images, targets in test_loader:
            images = images.to(device)
            targets = targets.to(device)
            
            output = model(images)

            test_loss += loss_fn(output, targets).item()
            pred = output.data.max(1, keepdim=True)[1]
            correct += pred.eq(targets.data.view_as(pred)).sum()

        test_loss /= len(test_loader.dataset)
        test_accuracy = 100.0 * correct / len(test_loader.dataset)
        print(
            f'Test result on epoch {epoch}: '
            f'Avg loss is {test_loss:.4f}, '
            f'Accuracy: {test_accuracy:.2f}%'
        )

## Training and testing

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.7019, 0.4425, 0.1954), (0.1720, 0.1403, 0.1065))
])

train_data = Knots(split='train', transform=transform)
test_data = Knots(split='test', transform=transform)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [None]:
model = KnotID()
model.to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

train_losses, test_losses = [], []
train_accuracies, test_accuracies = [], []
epochs = range(1, num_epochs+1)

train_start = time.time()
for epoch in epochs:
    train(model, train_loader, loss_fn, optimizer, epoch)

	print('Testing model on train dataset')
	loss, accuracy = test(model, train_loader, loss_fn, epoch)
	train_losses.append(loss)
	train_accuracies.append(accuracy)
	print('Testing model on test dataset')
	loss, accuracy = test(model, test_loader, loss_fn, epoch)
	test_losses.append(loss)
	test_accuracies.append(accuracy)
train_time = time.strftime('%H:%M:%S', time.gmtime(time.time() - train_start))

data = {
	'batch_size': batch_size,
	'img_size': img_size,
	'num_epochs': num_epochs,
	'learning_rate': learning_rate,
	'epochs': epochs,
	'train_losses': train_losses,
	'test_losses': test_losses,
	'train_accuracies': train_accuracies,
	'test_accuracies': test_accuracies
}
model.save(model_id, data, train_time)

torch.Size([64, 10])
Epoch 1: [0/1152]Loss: 2.2909
torch.Size([64, 10])


KeyboardInterrupt: ignored

In [None]:
print(len(train_data))
print(len(train_loader))

1152
18
