In [None]:
# MNIST NN 

In [None]:
import numpy as np 
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import torch.nn.functional as F


torch.set_printoptions(sci_mode=False)

In [None]:
# Load dataset

train_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
x,y = train_data[0]
x_ = x.reshape(28,28,1)
plt.imshow(x_);

In [None]:
# create dataloaders
def train_loader(batch_size):
    return DataLoader(train_data, batch_size=batch_size, shuffle=True)


def test_loader(batch_size):
    return DataLoader(test_data, batch_size=batch_size, shuffle=False)


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
# define FCN model 
class FashionNN(nn.Module):
    def __init__(self):
        super(FashionNN, self).__init__()
        self.input = nn.Sequential(
                            nn.Flatten(),
                            nn.Linear(28*28, 512),
                            nn.ReLU())
        self.hidden = nn.Sequential(
                            nn.Linear(512,128),
                            nn.ReLU())
        self.output = nn.Sequential(
                            nn.Linear(128,10))
     
    def forward(self,x):
        x = self.input(x)
        x = self.hidden(x)
        x = self.output(x)
        return x

In [None]:
def count_parameters_detail(model):
    total_params = 0
    print('Modules: Parameters')
    print('-------------------')
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad: continue
        param = parameter.numel()
        print(name,': ',param)
        total_params+=param
    print(f"Total Trainable Params: {total_params}")
    
    # generall dtype is float32 = 32 bits = 4 bytes 
    print(f"Disk space neede with float32: {total_params*4/1e6} mb")
    
    return total_params

In [None]:
def test_acc(model, dataloader, device):
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            
            images, labels = images.to(device), labels.to(device)
            out = model(images)
            
            preds = np.argmax(out.detach().cpu().numpy(), axis=1)
            labels = labels.cpu().numpy()
        
            correct += np.sum(preds == labels)
            total += len(preds)

    
    acc = correct/total * 100
    print('Accuracy :', round(acc,3), "%")
    return acc

In [None]:
model = FashionNN().to(device)
print(f"Model architecture FCN: {model} \n")
print(count_parameters_detail(model))

In [None]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
model.to(device)

epochs = []
losses, test_accuracies, train_accuracies = [], [], []

for epoch in range(1,10):
    epochs.append(epoch)
    batch = 128
    running_loss = 0
    print(f"Epoch: {epoch}")
    for data, labels in train_loader(batch):
        
        data, labels = data.to(device), labels.to(device)
        
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out,labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        
    losses.append(running_loss)
    train_acc_ = test_acc(model=model, dataloader=train_loader(128), device=device)
    test_acc_ = test_acc(model=model, dataloader=test_loader(128), device=device)
    test_accuracies.append(test_acc_)
    train_accuracies.append(train_acc_)
    print('Final Test Accuracy:',test_acc_,'%')
    print("Final Train Accuracy:", train_acc_, '%')
    print('Last Train Loss:', running_loss)
    print()
    #plt.plot(epochs, t_loss, '--o')
    
    
plt.plot(losses)
plt.plot(test_accuracies)
plt.plot(train_accuracies)
#plt.grid()
#plt.legend()
#plt.xlabel('Epochs', size=14)
#plt.ylabel('Loss', size=14)
#plt.show()

In [None]:
# Freeze Parameters
for param in model.parameters():
    param.requires_grad = False

In [None]:
random_image = torch.rand((28,28), requires_grad=True).unsqueeze(0).unsqueeze(0)
class_label = torch.ones(1,dtype=torch.long)*0

image = random_image[0,0].detach().numpy()
plt.imshow(image, cmap="gray",interpolation="none")
plt.show()

In [None]:
criterion = nn.CrossEntropyLoss()
model.to(device)

for epoch in range(1,10000):
    
    
    random_image, class_label = random_image.to(device), class_label.to(device)
    
    out = model(random_image)
    
    loss = criterion(out, class_label)
  
    random_image_grad = torch.autograd.grad(loss,random_image)
    
    random_image = random_image - 0.1*random_image_grad[0]
    
    if (epoch%1000==0):
        print("Epoch",epoch)
        print('Loss:',loss)
        
        image = random_image[0,0].detach().numpy()
        plt.imshow(image, cmap="gray",interpolation="none")
        plt.show()
        
        print('Preds:',np.argmax(out.detach().cpu().numpy(), axis=1))

        print()


In [None]:
# using CNN

In [None]:
from torch.utils.tensorboard import SummaryWriter
import time
import pandas as pd
import json 
from collections import OrderedDict
from collections import namedtuple
from itertools import product
import torchvision
from IPython.display import clear_output

In [None]:
class CNN_Network(nn.Module):
    def __init__(self):
        super(CNN_Network,self).__init__()
        
        self.conv1 = nn.Conv2d(1,4,kernel_size=3,stride=1,padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        
        self.conv2 = nn.Conv2d(4,8,kernel_size=3,stride=1,padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2,stride=2,padding=0)
        
        self.fc1 = nn.Linear(200,50)
        self.fc2 = nn.Linear(50,10)
        
    def forward(self,x,test=False):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        
        x = nn.Flatten()(x)
        
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

In [None]:
# put all hyper params into a OrderedDict, easily expandable
params = OrderedDict(
    lr = [.01, .001],
    batch_size = [64, 128, 256],
    shuffle = [True, False]
)
epochs = 3

In [None]:
class RunBuilder():
    def get_runs(params):
        Run = namedtuple('Run', params.keys())
        runs = []
        for v in product(*params.values()):
            runs.append(Run(*v))
            
        return runs

In [None]:
runs = RunBuilder.get_runs(params)
runs

In [None]:
# Helper class, help track loss, accuracy, epoch time, run time, 
# hyper-parameters etc. Also record to TensorBoard and write into csv, json
class RunManager():
  def __init__(self):

    # tracking every epoch count, loss, accuracy, time
    self.epoch_count = 0
    self.epoch_loss = 0
    self.epoch_num_correct = 0
    self.epoch_start_time = None

    # tracking every run count, run data, hyper-params used, time
    self.run_params = None
    self.run_count = 0
    self.run_data = []
    self.run_start_time = None

    # record model, loader and TensorBoard 
    self.network = None
    self.loader = None
    self.tb = None

  # record the count, hyper-param, model, loader of each run
  # record sample images and network graph to TensorBoard  
  def begin_run(self, run, network, loader):

    self.run_start_time = time.time()

    self.run_params = run
    self.run_count += 1

    self.network = network
    self.loader = loader
    self.tb = SummaryWriter(comment=f'-{run}')

    images, labels = next(iter(self.loader))
    grid = torchvision.utils.make_grid(images)

    self.tb.add_image('images', grid)
    self.tb.add_graph(self.network, images)

  # when run ends, close TensorBoard, zero epoch count
  def end_run(self):
    self.tb.close()
    self.epoch_count = 0

  # zero epoch count, loss, accuracy, 
  def begin_epoch(self):
    self.epoch_start_time = time.time()

    self.epoch_count += 1
    self.epoch_loss = 0
    self.epoch_num_correct = 0

  # 
  def end_epoch(self):
    # calculate epoch duration and run duration(accumulate)
    epoch_duration = time.time() - self.epoch_start_time
    run_duration = time.time() - self.run_start_time

    # record epoch loss and accuracy
    loss = self.epoch_loss / len(self.loader.dataset)
    accuracy = self.epoch_num_correct / len(self.loader.dataset)

    # Record epoch loss and accuracy to TensorBoard 
    self.tb.add_scalar('Loss', loss, self.epoch_count)
    self.tb.add_scalar('Accuracy', accuracy, self.epoch_count)

    # Record params to TensorBoard
    for name, param in self.network.named_parameters():
      self.tb.add_histogram(name, param, self.epoch_count)
      self.tb.add_histogram(f'{name}.grad', param.grad, self.epoch_count)
    
    # Write into 'results' (OrderedDict) for all run related data
    results = OrderedDict()
    results["run"] = self.run_count
    results["epoch"] = self.epoch_count
    results["loss"] = loss
    results["accuracy"] = accuracy
    results["epoch duration"] = epoch_duration
    results["run duration"] = run_duration

    # Record hyper-params into 'results'
    for k,v in self.run_params._asdict().items(): results[k] = v
    self.run_data.append(results)
    df = pd.DataFrame.from_dict(self.run_data, orient = 'columns')

    # display epoch information and show progress
    clear_output(wait=True)
    display(df)

  # accumulate loss of batch into entire epoch loss
  def track_loss(self, loss):
    # multiply batch size so variety of batch sizes can be compared
    self.epoch_loss += loss.item() * self.loader.batch_size

  # accumulate number of corrects of batch into entire epoch num_correct
  def track_num_correct(self, preds, labels):
    self.epoch_num_correct += self._get_num_correct(preds, labels)

  @torch.no_grad()
  def _get_num_correct(self, preds, labels):
    return preds.argmax(dim=1).eq(labels).sum().item()
  
  # save end results of all runs into csv, json for further analysis
  def save(self, fileName):

    pd.DataFrame.from_dict(
        self.run_data, 
        orient = 'columns',
    ).to_csv(f'{fileName}.csv')

    with open(f'{fileName}.json', 'w', encoding='utf-8') as f:
      json.dump(self.run_data, f, ensure_ascii=False, indent=4)

In [None]:
run_manager = RunManager()

# get all runs from params using RunBuilder class
for run in RunBuilder.get_runs(params):

    # if params changes, following line of code should reflect the changes too
    network = CNN_Network()
    loader = train_loader(run.batch_size)
    optimizer = torch.optim.Adam(network.parameters(), lr=run.lr)

    run_manager.begin_run(run, network, loader)
    for epoch in range(epochs):
      
      run_manager.begin_epoch()
      for batch in loader:
        
        images = batch[0]
        labels = batch[1]
        preds = network(images)
        loss = F.cross_entropy(preds, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        run_manager.track_loss(loss)
        run_manager.track_num_correct(preds, labels)

      run_manager.end_epoch()
    run_manager.end_run()

# when all runs are done, save results to files
run_manager.save('results')