In [2]:
import os
import pickle
from pathlib import Path

import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from torch import Tensor, flatten, nn, optim, utils
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.datasets import MNIST
from torchvision.io import read_image
from torchvision.transforms import ToTensor


In [2]:
df = pd.DataFrame()
imagefolder = Path("notMNIST_small/")
for folder in imagefolder.iterdir():
    label = str(folder.name)
    filenames = list(map(lambda x:str(x),folder.iterdir()))
    tempdf = pd.DataFrame({"file":filenames})
    tempdf["label"] = label
    df = pd.concat([df,tempdf])
df = df.reset_index()
lb = LabelEncoder()
df["label"] = lb.fit_transform(df["label"])

In [4]:
joblib.dump(lb,"labelencoder.joblib")

['labelencoder.joblib']

In [3]:


class ImageDataset(Dataset):
    def __init__(self, labels_df:pd.DataFrame, transform=None, target_transform=None):
        self.img_labels = labels_df
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = self.img_labels.iloc[idx]["file"]
        try:
            image = read_image(img_path)
        except:
            print(idx,img_path)
        label = self.img_labels.iloc[idx]["label"]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [4]:
class ReScale:
    def __init__(self) -> None:
        pass
    def __call__(self,sample):
        return sample/255

In [6]:
ds = ImageDataset(df,ReScale())
trainds,valds,testds = random_split(ds,[round(0.8*len(ds)),round(0.1*len(ds)),round(0.1*len(ds))+1])

In [6]:
trainloader = DataLoader(trainds,batch_size=64,pin_memory=True,num_workers=12,persistent_workers=True)
testloader = DataLoader(testds,batch_size=64,pin_memory=True,num_workers=12,persistent_workers=True)

In [7]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(1,6,25,padding="same"),
                                nn.ReLU(),
                                nn.MaxPool2d(2,2))
        self.conv2 = nn.Sequential(nn.Conv2d(6,9,15,padding="same"),
                                nn.ReLU(),
                                nn.MaxPool2d(2,2))
        self.conv3 = nn.Sequential(nn.Conv2d(9,12,5,padding="same"),
                                nn.ReLU(),
                                nn.MaxPool2d(2,2))
        self.fc = nn.Sequential(nn.Linear(108,100),
                                nn.ReLU(),
                                nn.Linear(100,80),
                                nn.ReLU(),
                                nn.Linear(80,10))

    def forward(self, x):
        x = x.reshape(-1,1,28,28)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = flatten(x,1)
        x= self.fc(x)
        return x

In [8]:
# define the LightningModule
class LitModel(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model= model
        self.save_hyperparameters()
    
    def forward(self,x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, y = batch
        x,y = x.cuda().float(),y.cuda()
        y = torch.flatten(y)
        x = x.reshape(-1,1,28,28)
        x_hat = self.model(x)
        criterion = nn.CrossEntropyLoss()
        loss = criterion(x_hat,y)
        # Logging to TensorBoard by default
        self.log("train_loss", loss)
        return loss
    
    def test_step(self, *args, **kwargs):
        return super().test_step(*args, **kwargs)

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer
        
    def predict_step(self, batch, batch_idx: int, dataloader_idx: int = 0):
        x = batch
        if isinstance(batch,list):
            x,y=batch
        x = x.reshape(-1,1,28,28)
        return self.model(x)

In [9]:
model = LitModel(Model())
trainer = pl.Trainer(accelerator="gpu", devices=1,max_epochs=400,enable_checkpointing=True,fast_dev_run=False)
trainer.fit(model=model, train_dataloaders=trainloader)

  rank_zero_warn(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type  | Params
--------------------------------
0 | model | Model | 38.4 K
--------------------------------
38.4 K    Trainable params
0         Non-trainable params
38.4 K    Total params
0.154     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=400` reached.


In [16]:
model.eval()
with torch.no_grad():
    y_hat = model(testds[1][0])

In [9]:
def accuracy(predictions, labels):
  return (100 * np.sum(np.argmax(predictions,1) == np.argmax(labels,1))
          / predictions.shape[0])

In [22]:
filtersize1 = 25
filtersize2 = 15
filtersize3 = 5


batch_size = 150

depth1 = 6
depth2 = 9
depth3 = 12

numlabels= 10

numhidden1 = 100
numhidden2 = 80

num_labels = 10

graph= tf.Graph()

with graph.as_default():

  # Input data.
  x = tf.placeholder(tf.float32, shape=(1, image_size, image_size, num_channels),name = "X")
  tf_train_dataset = tf.placeholder(
    tf.float32, shape=(batch_size, image_size, image_size, num_channels),name = "Trainset")
  tf_train_labels = tf.placeholder(tf.float32, shape=(batch_size, num_labels),name = "Train_labels")
  tf_valid_dataset = tf.constant(X_val,name = "Validset")
  tf_valid_labels = tf.constant(y_val,name = "Valid_labels")
  tf_test_dataset = tf.constant(X_test,name = "Testset")
  
  # Variables.
  layer1_weights = tf.Variable(tf.truncated_normal(
      [filtersize1, filtersize1, num_channels, depth1], stddev=0.1),name = "Layer1W")
  layer1_biases = tf.Variable(tf.constant(1.0, shape=[depth1]),name = "Layer1B")
    
  layer2_weights = tf.Variable(tf.truncated_normal(
      [filtersize2, filtersize2, depth1, depth2], stddev=0.1),name = "Layer2W")
  layer2_biases = tf.Variable(tf.constant(1.0, shape=[depth2]),name = "Layer2B")
    
  layer3_weights = tf.Variable(tf.truncated_normal(
      [filtersize3, filtersize3, depth2, depth3], stddev=0.1),name = "Layer3W")
  layer3_biases = tf.Variable(tf.constant(1.0, shape=[depth3]),name = "Layer3B")
    
  layer4_weights = tf.Variable(tf.truncated_normal(
      [16*depth3, numhidden1], stddev=0.1),name = "Layer4W")
  layer4_biases = tf.Variable(tf.constant(1.0, shape=[numhidden1]),name = "Layer4B")
    
  layer5_weights = tf.Variable(tf.truncated_normal(
      [numhidden1, numhidden2], stddev=0.1),name = "Layer5W")
  layer5_biases = tf.Variable(tf.constant(1.0, shape=[numhidden2]),name = "Layer5B")
  
  layer6_weights = tf.Variable(tf.truncated_normal(
      [numhidden2, numlabels], stddev=0.1,name = "Layer6W"))
  layer6_biases = tf.Variable(tf.constant(1.0, shape=[numlabels]),name = "Layer6B")
  
  saver = tf.train.Saver()
  # Model.
  def model(data):
    conv = tf.nn.conv2d(data, layer1_weights, [1, 1, 1, 1], padding='SAME',name = "Conv1")
    hidden = tf.nn.relu(conv + layer1_biases,name = "Hidden1")
    pool = tf.nn.max_pool(hidden,[1,2,2,1],[1,2,2,1],padding = 'SAME',name = "MaxPool1")
    
    conv = tf.nn.conv2d(pool, layer2_weights, [1, 1, 1, 1], padding='SAME',name = "Conv2")
    hidden = tf.nn.relu(conv + layer2_biases,name = "Hidden2")
    pool = tf.nn.max_pool(hidden,[1,2,2,1],[1,2,2,1],padding = 'SAME',name = "MaxPool2")
    
    conv = tf.nn.conv2d(pool, layer3_weights, [1, 1, 1, 1], padding='SAME',name = "Conv3")
    hidden = tf.nn.relu(conv + layer3_biases,name = "Hidden3")
    pool = tf.nn.max_pool(hidden,[1,2,2,1],[1,2,2,1],padding = 'SAME',name = "MaxPool3")
    
    shape = pool.get_shape().as_list()
    reshape = tf.reshape(pool, [shape[0], shape[1] * shape[2] * shape[3]])
    hidden = tf.nn.relu(tf.matmul(reshape, layer4_weights) + layer4_biases,name = "Hidden4")
    
    logits = tf.matmul(hidden, layer5_weights) + layer5_biases
    relu = tf.nn.relu(logits,name = "Hidden5")
    
    return tf.matmul(relu, layer6_weights) + layer6_biases
  
  # Training computation.
  logits = model(tf_train_dataset)
  tf.summary.histogram('train_activations', logits)
  with tf.name_scope('cross_entropy'):
    with tf.name_scope('total'):
      loss = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits_v2(labels=tf_train_labels, logits=logits))
      tf.summary.scalar('cross_entropy', loss)
        
  # Optimizer.
  optimizer = tf.train.AdamOptimizer(0.005).minimize(loss)
  
  # Predictions for the training, validation, and test data.
  train_prediction = tf.nn.softmax(logits)
  valid_prediction = tf.nn.softmax(model(tf_valid_dataset))
  with tf.name_scope('cross_entropy2'):
    with tf.name_scope('total2'):
        cross_entropy2 = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
            labels=tf_valid_labels, logits=model(tf_valid_dataset)))
        tf.summary.scalar('cross_entropy2', cross_entropy2)
  test_prediction = tf.nn.softmax(model(tf_test_dataset))
  y = tf.nn.softmax(model(x))
  