#modules

In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


import os
import numpy as np
import cv2
import urllib.request
import glob

#Download and Order Data

In [2]:
os.mkdir("data")

In [3]:
classes = ['saw', 'crown', 'cup', 'cloud', 'pizza', 'camera', 'face']

In [None]:
base = 'https://storage.googleapis.com/quickdraw_dataset/full/numpy_bitmap/'
for c in classes:
  cls_url = c.replace('_', '%20')
  path = base+cls_url+'.npy'
  print(path)
  urllib.request.urlretrieve(path, "data/" + c + '.npy')

In [5]:
def load_data(root, vfold_ratio=0.2, max_items_per_class=25000):
    all_files = glob.glob(os.path.join(root, '*.npy'))

    #initialize variables 
    x = np.empty([0, 784])
    y = np.empty([0])
    class_names = []
    
    #load a subset of the data to memory 
    for idx, file in enumerate(all_files):
        data = np.load(file)
        data = data[0: max_items_per_class, :]
        labels = np.full(data.shape[0], idx)

        x = np.concatenate((x, data), axis=0)
        y = np.append(y, labels)

        class_name, ext = os.path.splitext(os.path.basename(file))
        class_names.append(class_name)

    data = None
    labels = None

    #separate into training and testing 
    permutation = np.random.permutation(y.shape[0])
    
    x = x[permutation, :]
    y = y[permutation]

    vfold_size = int(x.shape[0]/100*(vfold_ratio*100))
    
    X_test = x[0:vfold_size, :]
    y_test = y[0:vfold_size]

    X_train = x[vfold_size:x.shape[0], :]
    y_train = y[vfold_size:y.shape[0]]
    return X_train, y_train, X_test, y_test, class_names

In [None]:
X_train, y_train, X_test, y_test, classes = load_data("data")

#Preprocess Data

In [7]:

def preprocess(X_train, X_test):

  X_train = X_train.reshape(X_train.shape[0], 28, 28, 1).astype("uint8")
  X_train = np.repeat(X_train, 3, -1)
  X_test = X_test.reshape(X_test.shape[0], 28, 28, 1).astype("uint8")
  X_test = np.repeat(X_test, 3, -1)

  X_train = cv2.bitwise_not(X_train)
  X_test = cv2.bitwise_not(X_test)
  
  X_train = X_train / 255.
  X_test = X_test / 255.

  X_train = np.transpose(X_train, (0, 3, 2, 1))
  X_test = np.transpose(X_test, (0, 3, 2, 1))

  X_train = torch.from_numpy(X_train)
  X_test = torch.from_numpy(X_test)

  X_train = torch.unsqueeze(X_train, 0)
  X_test = torch.unsqueeze(X_test, 0)

  X_train = torch.transpose(X_train,0,1)
  X_test = torch.transpose(X_test,0,1)

  return X_train, X_test



In [8]:
X_train, X_test = preprocess(X_train, X_test)

In [9]:
y_train = torch.from_numpy(y_train)

#CNN Architecture

In [None]:

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 8, 3)
        self.bn1=nn.BatchNorm2d(8)
        self.conv2 = nn.Conv2d(8, 16, 3)
        self.bn2=nn.BatchNorm2d(16)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(16, 32, 3)
        self.bn3=nn.BatchNorm2d(32)
        self.fc1 = nn.Linear(32 * 5 * 5, 128)
        #self.drp1=nn.Dropout(0.3)
        self.fc2 = nn.Linear(128, 64)
        #self.drp2=nn.Dropout(0.1)
        self.fc5 = nn.Linear(64, 7)

    def forward(self, x):
        x = (self.bn1(F.leaky_relu(self.conv1(x))))
        x = self.pool(self.bn2(F.leaky_relu(self.conv2(x))))
        x = self.pool(self.bn3(F.leaky_relu(self.conv3(x))))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.leaky_relu((self.fc1(x)))
        x = F.leaky_relu((self.fc2(x)))
        x = self.fc5(x)
        return x


# class Net(nn.Module):   
#   def __init__(self):
#       super(Net, self).__init__()

#       self.cnn_layers = nn.Sequential(
#           # Defining a 2D convolution layer
#           nn.Conv2d(3, 5, kernel_size=3),
#           #nn.BatchNorm2d(5),
#           nn.ReLU(inplace=True),
#           nn.MaxPool2d(kernel_size=2, stride=2),
#           # Defining another 2D convolution layer
#           #nn.Conv2d(5, 8, kernel_size=4, stride=1, padding=1),
#           #nn.BatchNorm2d(8),
#           #nn.ReLU(inplace=True),
#           #nn.MaxPool2d(kernel_size=2, stride=2),
#       )

#       self.linear_layers = nn.Sequential(
#           nn.Linear(5*13*13, 10)
#       )

#   # Defining the forward pass    
#   def forward(self, x):
#       x = self.cnn_layers(x)
#       x = x.view(x.size(0), -1)
#       x = self.linear_layers(x)
#       return x



In [11]:
net =  Net()
print(net)

Net(
  (conv1): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
  (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
  (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (bn3): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=800, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc5): Linear(in_features=64, out_features=7, bias=True)
)


In [12]:
criterion = nn.CrossEntropyLoss()
#optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
#optimizer = optim.Adam(net.parameters(),lr=0.005,betas=(0.9,0.999),eps=1e-08,weight_decay=0,amsgrad=False)
optimizer = optim.Adam(net.parameters(), lr=0.003)

In [13]:
X_train.shape

torch.Size([140000, 1, 3, 28, 28])

In [14]:
y_train.unique(return_counts=True)

(tensor([0., 1., 2., 3., 4., 5., 6.], dtype=torch.float64),
 tensor([19906, 20055, 19903, 20010, 20053, 20053, 20020]))

#Train Data

In [None]:
for epoch in range(3):  # loop over the dataset multiple times

    running_loss = 0.0
    #for i, data in enumerate(X_train):
    for i in range(len(X_train)):
          # get the inputs; data is a list of [inputs, labels]
        #inputs, labels = data
        inputs = X_train[i]
        
        label = y_train[i]
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs.float())
        #print(outputs)
        label = label.type(torch.LongTensor)
        loss = criterion(outputs[0], label)
        
        loss.backward()

        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

In [21]:
PATH = './model_tot.pth'
torch.save(net, PATH)

In [22]:
PATH = './model_val.pth'
torch.save(net.state_dict(), PATH)

#Testing and finding Accuracy

In [24]:
model = Net()
model.load_state_dict(torch.load("model_cls.pth"))
model.eval()

Net(
  (conv1): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
  (bn1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
  (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (bn3): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=800, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc5): Linear(in_features=64, out_features=7, bias=True)
)

In [10]:
y_test = torch.from_numpy(y_test)

In [None]:
ccount,tcount = 0,0
clount = np.zeros([7])
for i in range(len(X_test)):
          # get the inputs; data is a list of [inputs, labels]
        #inputs, labels = data
  inputs = X_test[i]
        
  label = y_test[i]
        # zero the parameter gradients
        # forward + backward + optimize
  outputs = model(inputs.float())
        #print(outputs)
  pred = torch.argmax(outputs[0])
  #print(pred)
  tcount+=1
  if pred == y_test[i]:
    clount[pred]+=1
    ccount+=1
    print(pred)
        # print statistics
  
  #if i % 500 == 499:    # print every 2000 mini-batches
   #   print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
  #    running_loss = 0.0

In [22]:
a = y_test.unique(return_counts=True)
arg = np.array(clount)
result = arg/a[1]
np.sort(result)

array([0.84042552, 0.85458169, 0.89786367, 0.89958405, 0.91829735,
       0.95110219, 0.96363282])