In [None]:
from google.colab import files,drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn 
import torch.nn.functional as F 

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

class Net(nn.Module):

  def __init__(self):

    super().__init__()

    self.conv1 = nn.Conv2d(1,6,5)
    self.conv2 = nn.Conv2d(6,16,(5,5))
    self.pool = nn.MaxPool2d(2,2)
    self.fc1 = nn.Linear(16*4*4,120)
    self.fc2 = nn.Linear(120,84)
    self.fc3 = nn.Linear(84,10)

  def forward(self, x):
    
    x = self.pool(F.relu(self.conv1(x)))
    x = self.pool(F.relu(self.conv2(x)))
    x = x.view(-1,self.num_flat_features(x))
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = F.log_softmax(self.fc3(x))

    return x

  def num_flat_features(self,x):
    size = x.size()[1:] 
    num_features = 1
    for s in size:
      num_features *= s

    return num_features

net = Net().to(device)
print(net)

Net(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=256, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)


In [None]:
import os
import glob
import numpy as np  
from skimage import io 

from torch.utils.data import Dataset, DataLoader

class MNISTDataset(Dataset):

  def __init__(self, dir, transform=None):
    self.dir = dir # /content/drive/My Drive/BigDataAI/datasets/MNIST/trainingset/1/
    self.transform = transform

  def __len__(self):
    files = glob.glob(self.dir+'/*.jpg')[:100] # return a list of file names in a given folder 
    return len(files)

  def __getitem__(self, idx):
    if torch.is_tensor(idx):
      idx = idx.tolist()

    all_files = glob.glob(self.dir+'/*.jpg')[:100]
    img_fname = os.path.join(self.dir, all_files[idx])
    image = io.imread(img_fname) # numpy array of that particular image

    digit = int(self.dir.split('/')[-1].strip())
    label = np.array(digit)

    instance = {'image':image,'label':label}

    if self.transform:
      instance = self.transform(instance)

    return instance


In [None]:
from skimage import transform

class Rescale(object):

  def __init__(self, output_size):
    assert isinstance(output_size, (int, tuple))
    self.output_size = output_size

  def __call__(self, sample):
    image, label = sample['image'], sample['label']

    h, w = image.shape[-2:]
    if isinstance(self.output_size, int):
      if h > w:
        new_h, new_w = self.output_size*h/w, self.output_size
      else:
        new_h, new_w = self.output_size, self.output_size*w/h
    else:
      new_h, new_w = self.output_size

    new_h, new_w = int(new_h), int(new_w)

    new_image = transform.resize(image, (new_h, new_w))

    return {'image': new_image, 'label':label}

class ToTensor(object):

  def __call__(self, sample):

    image, label = sample['image'], sample['label']

    image = image.reshape((1,image.shape[0],image.shape[1]))

    return {'image':torch.from_numpy(image) ,'label': torch.from_numpy(label)}

In [None]:
from torch.utils.data import random_split
from torchvision import transforms, utils

batch_size = 32

list_datasets = []
for i in range(10):
  cur_ds = MNISTDataset('/content/drive/My Drive/BigDataAI/datasets/MNIST/trainingset/'+str(i),transform=transforms.Compose([Rescale(28),ToTensor()]))
  list_datasets.append(cur_ds)

dataset = torch.utils.data.ConcatDataset(list_datasets)
print(len(dataset))

train_size = int(len(dataset)*0.7)
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset,[train_size, val_size])

train_dataloader = DataLoader(train_dataset,batch_size,shuffle=True,num_workers=1)
val_dataloader = DataLoader(val_dataset,batch_size,shuffle=True,num_workers=1)

1000


In [None]:
import torch.optim as optim

epochs = 5
learning_rate = 1e-4
optimizer = optim.Adam(net.parameters(),lr=learning_rate)
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):

  net.train()

  running_loss = 0.0
  for b, batch in enumerate(train_dataloader):
    inputs, targets = batch['image'].to(device,dtype=torch.float), batch['label'].to(device,dtype=torch.long)

    optimizer.zero_grad()
    outputs = net(inputs)
    loss = criterion(outputs,targets)
    loss.backward()
    optimizer.step()

    if (b+1)%10 == 0:
      print('epoch: %d, batch: %d, training loss: %.3f' % (epoch+1, b+1, running_loss/10))
      running_loss = 0.0


  net.eval()

  correct = [0.0]*10
  total = [0.0]*10

  with torch.no_grad():
    for b, batch in enumerate(val_dataloader):
      inputs, labels = batch['image'].to(device,dtype=torch.float), batch['label'].to(device,dtype=torch.long)

      predicted_outputs = net(inputs)
      _,predicted_idx = torch.max(predicted_outputs,1)
      c = (predicted_idx == labels)
      for i in range(len(labels)):
        label = labels[i]
        correct[label] += c[i].item()
        total[label] += 1

  for i in range(10):
    print('\t Validation accuracy for digit %d: %.3f'% (i, 100*correct[i]/total[i]))



