# Traditional PyTorch on Databricks

This notebook, which is supposed to run within a Databricks cluster, is complementary to the SparkTorch version notebook. Here, we present the same approach as seen from a viewpoint more common in educational/academic circles, where distributed learning and cloud computing is not so common. For this reason, we will still upload the data from the Blob Storage, but then we will load them into a Pandas dataframe and work without making actual use of Spark's engine.

In [0]:
import numpy as np

# Same as in the other notebook
dlname = dbutils.secrets.get(scope = "testtuts", key = "lakename")
service_credential = dbutils.secrets.get(scope="testtuts", key="adcredential")
application_id = dbutils.secrets.get(scope="testtuts", key="DBappID")
directory_id = dbutils.secrets.get(scope="testtuts", key="DBtenantID")

spark.conf.set("fs.azure.account.auth.type."+dlname+".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type."+dlname+".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id."+dlname+".dfs.core.windows.net", application_id)
spark.conf.set("fs.azure.account.oauth2.client.secret."+dlname+".dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint."+dlname+".dfs.core.windows.net", "https://login.microsoftonline.com/"+directory_id+"/oauth2/token")

containername = "torchtut"

# Train dataset
train_data_url = f"abfss://{containername}@{dlname}.dfs.core.windows.net/train.csv"
traindf = spark.read.format("csv").load(train_data_url, inferSchema = True, header = True).toPandas().astype(np.int8)

# Test dataset
test_data_url = f"abfss://{containername}@{dlname}.dfs.core.windows.net/test.csv"
testdf = spark.read.format("csv").load(test_data_url, inferSchema = True, header = True).toPandas().astype(np.int8)

As discussed, the `traindf` and `testdf` are now Pandas instead of Spark dataframes, so all of our data are collected in the machine's RAM and are not going to be processed in a distributed manner.

In [0]:
traindf.head(3)

Unnamed: 0,pixel1,pixel2,pixel3,pixel4,pixel5,pixel6,pixel7,pixel8,pixel9,pixel10,...,pixel776,pixel777,pixel778,pixel779,pixel780,pixel781,pixel782,pixel783,pixel784,label
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,5
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,4
3,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
4,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,9


We proceed by performing the min-max scaling as before, only now using scikit-learn's MinMaxScaler.

In [0]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler().fit(traindf.iloc[:,:784])
traindf.iloc[:,:784] = scaler.transform(traindf.iloc[:,:784])
testdf.iloc[:,:784] = scaler.transform(testdf.iloc[:,:784])

The dataframes must now be split into feature and label dataframes and then further split in order to obtain a validation set for the model's training further down the road.

In [0]:
x_train, y_train, x_test, y_test = traindf.iloc[:,:784], traindf.iloc[:,784], testdf.iloc[:,:784], testdf.iloc[:,784]

from sklearn.model_selection import train_test_split

# get 30% of train data for validation
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.3, random_state=42)

# The following are done so that the CNN model accepts the data as 2D images
a, b = x_train.shape
b = int(np.sqrt(b))
x_train = x_train.to_numpy().reshape(a,b,b)

y_train = y_train.to_numpy()

a, b = x_val.shape
b = int(np.sqrt(b))
x_val = x_val.to_numpy().reshape(a,b,b)

y_val = y_val.to_numpy()

a, b = x_test.shape
b = int(np.sqrt(b))
x_test = x_test.to_numpy().reshape(a,b,b)

At this point, we define a `Dataset` class, in order to load our datasets into PyTorch loaders.

In [0]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms

class CustomDatasetNN(Dataset):
    def __init__(self, feats, labels):
        self.feats = feats
        self.labels = labels

    def __len__(self):
        return len(self.feats)

    def __getitem__(self, item):
        img = self.feats[item]
        tensoring = transforms.ToTensor()

        img = tensoring(img).float()
        return img, self.labels[item]
    
train_data = CustomDatasetNN(x_train,y_train)
train_loader = DataLoader(train_data,batch_size=300,shuffle=True)

val_data = CustomDatasetNN(x_val,y_val)
val_loader = DataLoader(val_data,batch_size=300,shuffle=True)

test_data = CustomDatasetNN(x_test,y_test)
test_loader = DataLoader(test_data,batch_size=1,shuffle=False)

Now that this is done, we may define the model as in the SparkTorch notebook. However, in this case we must also take care of explicitly defining the training, validation, earlystopping, etc. routines, as we did with the data-loading routines.

In [0]:
class CNNModel(nn.Module):
    def __init__(self, input_height, input_width, conv_channels, kernels, maxpools, lin_channels, dropout):
        super(CNNModel, self).__init__()
        self.num_conv_layers = len(kernels)
        
        seq = []
        for i in range(self.num_conv_layers):
            seq.append(nn.Conv2d(in_channels=conv_channels[i], 
                                 out_channels=conv_channels[i+1],
                                 kernel_size=kernels[i], stride=1, padding=1))
            seq.append(nn.ReLU())
            seq.append(nn.MaxPool2d(kernel_size=maxpools[i]))
            
        # Flatten the output of the final convolution layer
        seq.append(nn.Flatten())
        
        convolutions = nn.Sequential(*seq)
        
        # Calculation of first linear layer dimensions
        # We build an empty tensor of appropriate size and let him go through
        # the above sequence, in order to calculate the output's size automatically
        first_lin = convolutions(torch.empty(1,conv_channels[0],input_height,input_width)).size(-1)
        
        self.num_lin_layers = len(lin_channels)
        for i in range(self.num_lin_layers):
            if i == self.num_lin_layers-1:
                seq.append(nn.Linear(lin_channels[i-1], lin_channels[i]))
                break
            elif i == 0:
                seq.append(nn.Linear(first_lin, lin_channels[i]))
            else:
                seq.append(nn.Linear(lin_channels[i-1], lin_channels[i]))
            seq.append(nn.ReLU())
            seq.append(nn.Dropout(dropout))
                
        self.fitter = nn.Sequential(*seq)

    def forward(self, x):
        out = self.fitter(x)
        return out
    
def load_backbone_from_checkpoint(model, checkpoint_path):
    model.load_state_dict(torch.load(checkpoint_path))
    
# adapted code from this repository: https://github.com/Bjarten/early-stopping-pytorch
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'Validation loss increase spotted. Early stopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

def training_loop(model, train_dataloader, optimizer):
    model.train()
    batch_losses = []
            
    for batch in train_dataloader:
        x_batch, y_batch = batch
                
        x_batch, y_batch = x_batch.float(), y_batch.type(torch.LongTensor)
                
        # Clear the previous gradients first
        optimizer.zero_grad()
        
        # forward pass
        yhat = model(x_batch) # No unpacking occurs in CNNs
        
        # loss calculation
        loss = loss_function(yhat, y_batch)
        
        # Backward pass
        loss.backward()
        
        # Update weights
        optimizer.step()
        
        batch_losses.append(loss.data.item())
        
    train_loss = np.mean(batch_losses)

    return train_loss

def validation_loop(model, val_dataloader):
    
    model.eval()
    batch_losses = []
    
    for batch in val_dataloader:
        x_batch, y_batch = batch
                
        x_batch, y_batch = x_batch.float(), y_batch.type(torch.LongTensor)
        
        yhat = model(x_batch)
        
        loss = loss_function(yhat, y_batch)
        
        batch_losses.append(loss.data.item())
        
    val_loss = np.mean(batch_losses)

    return val_loss


def train(model, train_dataloader, val_dataloader, optimizer, epochs, patience=-1):

    train_losses = []
    val_losses = []
    print(f"Initiating CNN training.")
    model_path = f'CNN.pt'
    checkpoint_path = 'checkpoint.pt'
        
    if patience != -1:
        early_stopping = EarlyStopping(patience=patience, verbose=False, path=checkpoint_path)

    for epoch in range(epochs):
        
        # Training loop
        train_loss = training_loop(model, train_dataloader, optimizer)    
        train_losses.append(train_loss)

        # Validation loop
        with torch.no_grad():

            val_loss = validation_loop(model, val_dataloader)
            val_losses.append(val_loss)

        if patience != -1:
            early_stopping(val_loss, model)

            if early_stopping.early_stop:
                print("Patience limit reached. Early stopping and going back to last checkpoint.")
                break

    if patience != -1 and early_stopping.early_stop == True:
        load_backbone_from_checkpoint(model,checkpoint_path)        

    torch.save(model.state_dict(), model_path)

    print(f"CNN training finished.\n")
    
    return train_losses, val_losses
    
def evaluate(model, test_dataloader):
    model.eval()
    predictions = []
    labels = []
    
    with torch.no_grad():
        for batch in test_dataloader:
            
            x_batch, y_batch = batch
                
            x_batch, y_batch = x_batch.float(), y_batch.type(torch.LongTensor)
            
            yhat = model(x_batch)
            
            # Calculate the index of the maximum argument
            yhat_idx = torch.argmax(yhat, dim=1)
            
            predictions.append(yhat_idx.cpu().numpy())
            labels.append(y_batch.cpu().numpy())
    
    return predictions, labels  # Return the model predictions

At this point, we may proceed with the training of the model, using the same set of parameters that were used in the SparkTorch notebook.

In [0]:
input_height, input_width = b, b
conv_channels = [1,4,16,32,64]
kernels = [3,3,3,3]
maxpools = [2,2,2,2]
lin_channels = [128,64,10]
dropout = 0.1
learning_rate = 0.001
patience = 20

epochs = 150

model = CNNModel(input_height = input_height, input_width = input_width,
                 conv_channels = conv_channels, kernels = kernels, maxpools = maxpools,
                 lin_channels = lin_channels, dropout = dropout)

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

# Train the model
t_losses, v_losses = train(model, train_loader, val_loader, optimizer, epochs, patience=patience)

Initiating CNN training.
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 2 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 2 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 2 out of 20
Validation loss increase spotted. Early stopping counter: 1 out of 20
Validation loss increase spotted. Early stopping counter: 2 out of 20
Validation loss increase spotted. Early stopping counter: 3 out of 20
Validation loss increase spotted. Early stopping counter: 4 out o

Finally, we obtain the results after the model's evaluation on the test set.

In [0]:
# Evaluate the model
predictions, labels = evaluate(model, test_loader)

y_pred = np.concatenate(predictions, axis=0)
y_true = np.concatenate(labels, axis=0)

from sklearn.metrics import accuracy_score

acc = accuracy_score(y_true, y_pred)

print(f"The final model's accuracy is {acc*100:.4f}%")

The final model's accuracy is 93.4600%
