In [None]:
!pip install torch torchmetrics torchvision

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset

import torch.optim as optim

import torchvision
from torchvision.transforms import transforms
from torchmetrics import Accuracy, Recall, Precision

import numpy as np
from datetime import datetime
import os

from image_transformers import train_transforms

In [3]:
device = torch.device( 'cuda:0' if torch.cuda.is_available() else 'cpu' )
"Using "+device.type

'Using cpu'

In [14]:
# Other vars
this_training_session_rank = 1

# Hyperparams
epoch = 10
batch_size = 80
lr=.01
momentum = .9
data_to_train_on=100000

In [5]:
# Loading train and test dataset
train_dataset = torchvision.datasets.EMNIST( root="./data", train=True, download=True, transform=train_transforms, split="byclass")
test_dataset = torchvision.datasets.EMNIST( root="./data", train=False, download=True, transform=train_transforms, split="byclass")

In [20]:
# using only 10,000 datas to train
train_dataset = Subset( train_dataset, np.arange(data_to_train_on*(this_training_session_rank-1), data_to_train_on*this_training_session_rank))

In [7]:
# Dataloaders
train_dataloader = DataLoader( train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader( test_dataset, batch_size=batch_size, shuffle=False)

In [13]:
# Data probing
# next(iter(train_dataloader))
train_dataset[0][0].shape

torch.Size([1, 64, 64])

In [65]:
class OCRModel(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()

        self.image_conv_net = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),  # --> 32x64x64
            nn.ReLU(),
            nn.MaxPool2d(2,2), # --> 32x32x32

            nn.Conv2d(32, 16, kernel_size=3, padding=1), # --> 16x32x32
            nn.ReLU(),
            nn.MaxPool2d(2,2),    # --> 16x16x16

            nn.Conv2d(16, 10, kernel_size=3 , padding=1), # --> 10x16x16
            nn.ReLU(),
            nn.MaxPool2d(2,2), # --> 10x8x8

            nn.Flatten() # --> 10x8x8
        )

        self.fc_layer = nn.Sequential(
            nn.Linear(10*8*8, 10),
            nn.ReLU(),

            nn.BatchNorm1d(10), # Normalizes each batch, that is, tries to make input distribution equal to output distribution.

            nn.Linear(10, 10),
            nn.ReLU(),

            nn.Dropout(.3), # Randomly drops some neurons with prob. of 3% reducing chances of overfitting

            nn.Linear(10, 5),
            nn.ReLU(),

            nn.Linear(5, num_classes)
        )

    def forward(self, features):
        out = self.image_conv_net(features)
        out = self.fc_layer(out)

        return out


In [66]:
# Model initialization
num_classes = test_dataset.classes.__len__()
model = OCRModel(num_classes).to(device)

In [67]:
# Loss and optimizers
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

In [68]:
# Saving model
def save_model(model, final_model: bool=False):
    cdt = datetime.now()
    now = cdt.strftime("%Y-%m-%d_%H_%M_%S")
    saving_dir = f"intermediate-models" if not final_model else f"."

    try:
        os.mkdir("intermediate-models")
    except:
        pass
    model_name = f"model-{now}.pth" if not final_model else "model-final.pth"
    torch.save(model.state_dict(), f"{saving_dir}/{model_name}")

In [69]:
# Training loop
epoch_loss_store = []
for i in range(epoch):
    running_loss = .0
    for features, labels in train_dataloader:

        features = features.to(device)
        labels = labels.to(device)

        # Forward pass and loss calcuation
        outputs = model(features)
        loss = criterion(outputs, labels)

        # Backward pass ans optimizing
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    running_loss /= len(train_dataloader)
    print(f"Epoch: {i+1}, Loss: {running_loss} ")
    epoch_loss_store.append(running_loss)
    
    # Saving model in every 1 epoch
    print("Saving model...")
    save_model(model)

Epoch: 1, Loss: 2.684645977783203 
Epoch: 2, Loss: 2.385517695426941 
Epoch: 3, Loss: 2.320366182804108 
Epoch: 4, Loss: 2.275113148021698 
Epoch: 5, Loss: 2.247555594444275 
Epoch: 6, Loss: 2.212896706390381 
Epoch: 7, Loss: 2.2067477807044984 
Epoch: 8, Loss: 2.1937913893699648 
Epoch: 9, Loss: 2.1833752579689025 
Epoch: 10, Loss: 2.1698438510894777 


In [70]:
# Testing loop

all_accuracy = Accuracy(task="multiclass", num_classes=num_classes, average="micro").to(device)
all_precision = Precision(task="multiclass", num_classes=num_classes, average="micro").to(device)
all_recall = Recall(task="multiclass", num_classes=num_classes, average="micro").to(device)

total_loss = .0
model.eval() # Changin model to evaluation mode so it doesnot change weignts.
with torch.no_grad():

    for features, labels in test_dataloader:

        features = features.to(device)
        labels = labels.to(device)


        outputs = model(features)
        loss = criterion(outputs, labels)

        total_loss += loss.item()

        # Metrics
        all_accuracy(outputs, labels)
        all_recall(outputs, labels)
        all_precision(outputs, labels)

average_loss = total_loss / len(test_dataloader)
accuracy = all_accuracy.compute()
recall = all_recall.compute()
precision = all_precision.compute()

print(F"Total loss: {total_loss}")
print(F"Avg loss: {average_loss}")
print(F"{accuracy=}")
print(F"{recall=}")
print(F"{precision=}")

Total loss: 2667.3486499786377
Avg loss: 1.8332293127000947
accuracy=tensor(0.4804, device='cuda:0')
recall=tensor(0.4804, device='cuda:0')
precision=tensor(0.4804, device='cuda:0')


In [71]:
# Saving model

cdt = datetime.now()
now = cdt.strftime("%Y-%m-%d_%H_%M_%S")
saving_dir = f"models/model-{now}"

try:
    os.mkdir("models")
except:
    pass
os.mkdir(saving_dir)

with open(f"{saving_dir}/params.txt" , "w") as f:
    f.write(f"# Hyperparameters \n")
    f.write(f"{epoch=}\n")
    f.write(f"{batch_size=}\n")
    f.write(f"{lr=}\n")
    f.write(f"{momentum=}\n")
    f.write(f"{data_to_train_on=} datas\n\n")
    f.write(f"{auto_contrast_probability=}\n\n")

    f.write(f"# Metrics \n")
    f.write(f"{average_loss=}\n")
    f.write(f"{total_loss=}\n")
    f.write(f"{recall=}\n")
    f.write(f"{precision=}\n")
    f.write(f"{accuracy=}\n")

    f.write("# Epoch and loss\n")
    for loss in epoch_loss_store:
      f.write(f"{loss}")

torch.save(model.state_dict(), f"./models/model-{now}/model.pt")

In [81]:
try:
  import shutil
  from google.colab import files
  import zipfile
  def zip_folder(folder_path, output_zip_path):

    """Zips the contents of a folder into a zip file.

    Args:
        folder_path: The path to the folder to be zipped.
        output_zip_path: The path to the output zip file.
    """

    with zipfile.ZipFile(output_zip_path, "w") as zip_file:
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                zip_file.write(file_path)
  zip_folder("/content/models", "/content/ts-models.zip")
except:
  ...