In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import cv2
from scipy import ndimage
from sklearn.model_selection import train_test_split

## **Data Analysis**

In [2]:
labels_df = pd.read_csv("../data/train_info.csv")
labels_df

Unnamed: 0,file_name,hamiltonian
0,graph4443.png,yes
1,graph1905.png,no
2,graph7719.png,no
3,graph4902.png,no
4,graph4114.png,yes
...,...,...
4423,graph4672.png,yes
4424,graph3437.png,yes
4425,graph3649.png,no
4426,graph893.png,yes


In [3]:
def rgb2gray(rgb):
    img = np.dot(rgb[...,:3], [0.2989, 0.5870, 0.1140])
    return img / 255

def convert_to_array(data_dir, labels):
    files = os.listdir(data_dir)
    dataset_x = []
    dataset_y = []
    for picture in files:
        img = cv2.imread(data_dir + f"/{picture}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (400, 400))
        img = rgb2gray(img)
        
        img = img.reshape(1, 400, 400)
        
        label = labels[labels["file_name"] == f'{picture}']["hamiltonian"].to_list()
        
        dataset_x.append(img)
        dataset_y.append(label[0])
    
    return np.array(dataset_x), dataset_y

data_train = convert_to_array("../data/train_images/train_images", labels_df)

In [4]:
images, labels = data_train

In [5]:
images_train, images_test, labels_train, labels_test = train_test_split(images, labels, test_size=0.2, random_state=123)

In [6]:
images_train.shape, images_test.shape

((3542, 1, 400, 400), (886, 1, 400, 400))

In [7]:
def plot_random_sample(images, label):
    random_indices = np.random.choice(images.shape[0], 12, replace=False)
    plt.figure(figsize=(20, 10), dpi=200)
    for i, idx in enumerate(random_indices):
        plt.subplot(3, 4, i + 1)
        plt.imshow(images[idx], cmap='gray')
        plt.title(f"Hamiltonian = {label.iloc[idx]['hamiltonian']}")
        plt.axis('on')
        plt.xticks([])
        plt.yticks([])
        plt.tight_layout()

# plot_random_sample(images_train, labels_df)

In [8]:
def augment_data(images, labels, num_augmentations):
    augmented_images = []
    augmented_labels = []

    for i in range(images.shape[0]):
        for _ in range(num_augmentations):
            angle = np.random.uniform(-90, 90)
            rotated_image = ndimage.rotate(images[i], angle, reshape=True)

            shift = np.random.uniform(-5, 5, 2)
            shifted_image = ndimage.shift(rotated_image, shift)

            if np.random.rand() > 0.5:
                flipped_image = np.fliplr(shifted_image)
            else:
                flipped_image = shifted_image

            augmented_images.append(flipped_image)
            augmented_labels.append(labels.iloc[i])

    max_shape = np.max([img.shape for img in augmented_images], axis=0)

    resized_images = []
    for img in augmented_images:
        resized_img = cv2.resize(img, (max_shape[1], max_shape[0]), interpolation=cv2.INTER_AREA)
        resized_images.append(resized_img)

    return np.array(resized_images), pd.DataFrame(augmented_labels)

In [9]:
# augmented_data = augment_data(images, label["hamiltonian"], 2)

## **Moedel Design**

In [10]:
import torch
from skimage import io, transform
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

In [24]:
if torch.cuda.is_available():
    print("CUDA is available!")
else:
    print("CUDA is not available.")
    
device_name = "cpu" if torch.cuda.is_available() else "cpu"
device = torch.device(device_name)

CUDA is available!


In [25]:
labels_train = [1.0 if i == "yes" else 0.0 for i in labels_train]
labels_test = [1.0 if i == "yes" else 0.0 for i in labels_test]

In [26]:
torch_images_train = torch.from_numpy(images_train).type(torch.float).to(device)
torch_labels_train = torch.from_numpy(np.array(labels_train)).type(torch.float).to(device)

torch_images_test = torch.from_numpy(images_test).type(torch.float).to(device)
torch_labels_test = torch.from_numpy(np.array(labels_test)).type(torch.float).to(device)

torch_images_train.size(), torch_labels_train.size(), torch_images_test.size(), torch_labels_test.size()

(torch.Size([3542, 1, 400, 400]),
 torch.Size([3542]),
 torch.Size([886, 1, 400, 400]),
 torch.Size([886]))

In [27]:
dataset_train = TensorDataset(torch_images_train, torch_labels_train)
dataset_test = TensorDataset(torch_images_test, torch_labels_test)

dataloader_train = DataLoader(
    dataset_train, 
    batch_size=32,
    shuffle=True
)

dataloader_test = DataLoader(
    dataset_test, 
    batch_size=32,
    shuffle=True
)

In [28]:
for i, sample in enumerate(dataloader_train):
    if i == 0:
        res = sample[0]
    else:
        break

In [29]:
res.shape

torch.Size([32, 1, 400, 400])

In [30]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        
        self.conv1 = nn.Conv2d(
            in_channels=1,
            out_channels=64,
            kernel_size=6,
            stride=1,
            padding="same"
        ) 
        self.batch_norm_1 = nn.BatchNorm2d(
            num_features=64
        )

        self.conv2 = nn.Conv2d(
            in_channels=64, 
            out_channels=128,
            kernel_size=16, 
            stride=1,
            padding="same"
        )
        self.batch_norm_2 = nn.BatchNorm2d(
            num_features=128
        )
        
        self.dropout_1 = nn.Dropout2d(p=0.25)
        self.dropout_2 = nn.Dropout(p=0.5)
        
        self.fc1 = nn.Linear(
            in_features=12800, 
            out_features=128
        )
        self.fc2 = nn.Linear(
            in_features=128, 
            out_features=1
        )

    
    def forward(self, x):
        print(x.size())
        x = self.conv1(x)
        x = F.relu(x)
        x = self.batch_norm_1(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = self.dropout_1(x)
        
        print(x.size())
        
        x = self.conv2(x)
        x = F.relu(x)
        x = self.batch_norm_2(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout_1(x)
        
        print(x.size())
        
        x = x.view(-1, 12800)
        print(x.size())
        
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout_2(x)
        
        x = self.fc2(x)
        output = F.sigmoid(x)
        # x = self.dropout_2(x)
        
        return output
        


In [31]:
model = Model()
model.to(device)

Model(
  (conv1): Conv2d(1, 64, kernel_size=(6, 6), stride=(1, 1), padding=same)
  (batch_norm_1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(64, 128, kernel_size=(16, 16), stride=(1, 1), padding=same)
  (batch_norm_2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout_1): Dropout2d(p=0.25, inplace=False)
  (dropout_2): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=12800, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=1, bias=True)
)

In [32]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
loss_fn = nn.BCELoss()

In [33]:
def train_one_epoch(epoch_index, tb_writer):
    running_loss = 0
    last_loss = 0
    
    for i, data in enumerate(dataloader_train):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = model(inputs)
        
        loss = loss_fn(outputs, labels)
        loss.backward()
        
        optimizer.step()
        
        running_loss += loss.item()
        
        # Gather data and report
        running_loss += loss.item()
        if i % 1000 == 999:
            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            tb_x = epoch_index * len(dataloader_train) + i + 1
            tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            running_loss = 0.

    return last_loss

In [34]:
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

In [None]:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
epoch_number = 0

EPOCHS = 5

best_vloss = 1_000_000.

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch_number + 1))

    model.train(True)
    avg_loss = train_one_epoch(epoch_number, writer)


    running_vloss = 0.0
    model.eval()

    with torch.no_grad():
        for i, vdata in enumerate(dataloader_test):
            vinputs, vlabels = vdata
            voutputs = model(vinputs)
            vloss = loss_fn(voutputs, vlabels)
            running_vloss += vloss

    avg_vloss = running_vloss / (i + 1)
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))


    writer.add_scalars('Training vs. Validation Loss',
                    { 'Training' : avg_loss, 'Validation' : avg_vloss },
                    epoch_number + 1)
    writer.flush()

    if avg_vloss < best_vloss:
        best_vloss = avg_vloss
        model_path = 'model_{}_{}'.format(timestamp, epoch_number)
        torch.save(model.state_dict(), model_path)

    epoch_number += 1

EPOCH 1:
torch.Size([32, 1, 400, 400])
torch.Size([32, 64, 200, 200])
