In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
import numpy as np
import os
import matplotlib.pyplot as plt
from time import perf_counter
import pandas as pd
from skimage import io

In [None]:
# The flag below controls whether to allow TF32 on matmul. This flag defaults to True.
# torch.backends.cuda.matmul.allow_tf32 = True

# The flag below controls whether to allow TF32 on cuDNN. This flag defaults to True.
# torch.backends.cudnn.allow_tf32 = True

# Another performance enhancing flag?
# torch.backends.cudnn.benchmark = True

In [None]:
# The directory path of our entire image set
# IMAGES_LOCATION = os.path.join(os.getcwd(), 'all')
# 
# with open('labels_file.csv', 'w') as outfile:
    # for file in os.listdir(IMAGES_LOCATION):
        # if 'cat' in file:
            # outfile.write(f'{file}, 0')
        # elif 'dog' in file:
            # outfile.write(f'{file}, 1')
        # outfile.write('\n')
    # outfile.close()

In [None]:
'''# Path of our csv file
LABELS_LOCATION = 'labels_file.csv'

# The directory path of our entire image set
IMAGES_LOCATION = os.path.join(os.getcwd(), 'all')

# Custom dataset example for the cats and dogs
class CatsAndDogsDataset(Dataset):
    
    # Constructor
    def __init__(self, csv_file, root_dir, transform = None):

        # This CSV file will contain the image names in one column and the labels in the other
        self.annotations = pd.read_csv(csv_file)

        self.root_dir = root_dir

        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):

        # Return image at row i, hence iloc, column 0, which is the image name
        img_path = os.path.join(self.root_dir, self.annotations.iloc[index, 0])

        image = io.imread(img_path)

        y_label = torch.tensor(int(self.annotations.iloc[index, 1]))

        if self.transform is not None:

            image = self.transform(image)

        return (image, y_label)'''

In [None]:
'''# Image constants and batch size
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
BATCH_SIZE = 512

# Declare our transforms ahead of time
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((IMAGE_HEIGHT, IMAGE_WIDTH)),
    transforms.Grayscale()    
])

# Instantiate the dataset
dataset = CatsAndDogsDataset(csv_file = LABELS_LOCATION, root_dir = IMAGES_LOCATION, transform = transform)

print(len(dataset))

# Split data into train and test
train_set, test_set = torch.utils.data.random_split(dataset, [20000, 4999])

train_loader = DataLoader(train_set, batch_size = BATCH_SIZE, shuffle = True)
test_loader = DataLoader(test_set, batch_size = BATCH_SIZE, shuffle = True)
'''

In [None]:
# Datapath of the training data
TRAIN_DIR = os.path.join(os.getcwd(), 'train')
TEST_DIR = os.path.join(os.getcwd(), 'test')
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
BATCH_SIZE = 512

# The preprocessing that we'll perform to each image
# If we want to do multiple transforms, it looks like we have to do it in this way.
preprocessing = transforms.Compose([
    
    transforms.Grayscale(),
    transforms.Resize((IMAGE_HEIGHT, IMAGE_WIDTH)),
    transforms.ToTensor()
    

])

# This will take images from a given root directory and assign labels based on subdirectory structure.
# The transform argument does not look like it can accept a list of transforms, based on experimentation
train_set_setup = datasets.ImageFolder(
    TRAIN_DIR, transform = preprocessing
)

test_set_setup = datasets.ImageFolder(
    TEST_DIR, transform = preprocessing
)

# Load in the data, batch, and shuffle
train_set = DataLoader(train_set_setup, batch_size = BATCH_SIZE, shuffle = True, pin_memory = True, num_workers = 8, prefetch_factor=1)
test_set = DataLoader(test_set_setup, batch_size = BATCH_SIZE, shuffle = True, pin_memory = True, num_workers = 8, prefetch_factor=1)

In [None]:
# The neural network itself is a subclass of nn.Module
class NeuralNetwork(nn.Module):
    
    # Constructor
    def __init__(self):

        # Acknowledgement of derivation from nn.Module
        super(NeuralNetwork, self).__init__()
        # self.flatten = nn.Flatten()
        # self.linear_relu_stack = nn.Sequential(
        #     nn.Linear(IMAGE_HEIGHT * IMAGE_WIDTH, 256),
        #     nn.ReLU(),
        #     nn.Linear(256, 1024),
        #     nn.ReLU(),
        #     nn.Linear(1024, 10),
        # )
        
        self.conv_relu_stack = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size = 2, stride = 2),
            nn.ReLU(),
            nn.Conv2d(32, 16, kernel_size = 5, stride = 3),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Conv2d(16, 64, kernel_size = 2, stride = 1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 16, kernel_size = 4, stride = 5),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )

    # How the data is actually fed through the network
    def forward(self, x):
        #logits = self.linear_relu_stack(x)
        logits = self.conv_relu_stack(x)
        return logits

In [None]:
# Before building the network, we need to check if the GPU is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

In [None]:
# Instantiate our network and send it to the GPU
model = NeuralNetwork().to(device)
print(model)

In [None]:
# Declare our hyperparameters
learning_rate = 1e-3
batch_size = BATCH_SIZE
epochs = 8

In [None]:
# Specify our loss function
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [None]:
# The training process in Pytorch isn't fully automated as in Tensorflow
def train_loop(train_set, model, loss_fn, optimizer):

    # Size of the dataset
    size = len(train_set.dataset)

    start = perf_counter()

    # Go over each item in each batch of the training set
    for batch, (x, y) in enumerate(train_set):

        x = x.to(device=device)
        y = y.to(device=device)

        # Make a prediction, then calculate the loss
        # The x.cuda() syntax denotes the fact that we're sending the batch to the GPU
        prediction = model(x)
        loss = loss_fn(prediction, y)

        # Resets the gradient so that we don't use an old one
        optimizer.zero_grad()

        # Calculate the gradient based on the calculated loss
        loss.backward()

        # Adjust the weights
        optimizer.step()

        # Optional prinouts for progress
        if batch == len(train_set) - 1:

            end = perf_counter()

            # Get data about every 10th epoch
            loss, current = loss.item(), batch * len(x)

            # loss and batch out of total?
            print(f'Loss: {loss:>7f} [{current:>5d}/{size:>5d}] | Time per epoch = {end - start:>3f}')

In [None]:
# The test loop is somewhat similar, but does not use gradient descent
def test_loop(test_set, model, loss_fn):

    size = len(test_set.dataset)

    num_batches = len(test_set)

    test_loss, correct = 0, 0

    # The inference part. No weight updates, etc.
    with torch.no_grad():

        for x,y in test_set:

            # We have to send each batch to the GPU
            x = x.to(device=device)
            y = y.to(device=device)

            pred = model(x)

            test_loss += loss_fn(pred, y).item()

            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# pytorch_total_params = sum(p.numel() for p in model.parameters())
# print(pytorch_total_params)
for x in range(epochs):
    train_loop(train_set, model, loss_fn, optimizer)

test_loop(test_set, model, loss_fn)
print('Done!')