Test Script for testing loading of the data

In [1]:
#Loading necessary libraries
import os
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from PIL import Image
import numpy as np
from torchvision.io import read_image
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [2]:
#Loading Image Names for Stop and Non-Stop Images
stop_image_dir = "./Data/stop/"
not_stop_image_dir = "./Data/not_stop/"
stop_image_list =  os.listdir(stop_image_dir)
not_stop_image_list =  os.listdir(not_stop_image_dir)

In [3]:
#Find distribution of widths and heights
n_images = len(stop_image_list) + len(not_stop_image_list)
width_distr = np.zeros(n_images)
heigth_distr = np.zeros(n_images)

#Iterate Over Images
for i in range(len(stop_image_list)):
    stop_image_name = stop_image_list[i]
    temp_image = Image.open(stop_image_dir + stop_image_name)
    width_distr[i] = temp_image.width
    heigth_distr[i] = temp_image.height

for i in range(len(not_stop_image_list)):
    not_stop_image_name = not_stop_image_list[i]
    temp_image = Image.open(not_stop_image_dir + not_stop_image_name)
    width_distr[i+len(stop_image_list)] = temp_image.width
    heigth_distr[i+len(stop_image_list)] = temp_image.height

#Calculating means as target widths and heights
reshape_width = np.round_(np.mean(width_distr)).astype(int)
reshape_heigth = np.round_(np.mean(heigth_distr)).astype(int)


In [4]:
#Creating Custom DataSet
class stopNotStopData(Dataset):
    def __init__(self, stop_dir, not_stop_dir, transform=None, targetTransform=None):
        self.stop_dir = stop_dir
        self.not_stop_dir = not_stop_dir

        #Finding Number of Stop and Non-Stop Images
        self.stop_image_list =  os.listdir(stop_image_dir)
        self.not_stop_image_list =  os.listdir(not_stop_image_dir)
        self.n_stop_images = len(self.stop_image_list)
        self.n_not_stop_images = len(self.not_stop_image_list)
        self.n_images = self.n_not_stop_images + self.n_stop_images

        #Creating initial labels for images
        self.labels = np.zeros(self.n_images)

        self.transform = transform
        self.targetTransform = targetTransform

    def __len__(self):
        return self.n_images

    def __getitem__(self, idx):
        #First find the corresponding image
        if idx+1<=self.n_stop_images:
            image_name = self.stop_image_list[idx]
            image_path = os.path.join(self.stop_dir, image_name)
            #Image is a stop image first load it
            #image = Image.open(self.stop_dir+str(int(idx+1))+".JPG")
            image = Image.open(image_path)
            #image = np.asarray(image).astype(np.uint8)
            label = 0 # 0 is for stop image and 1 is for non-stop image
            self.labels[i] = 0 #Set it also in labels array
            if self.transform:
                image = self.transform(image)
            if self.targetTransform:
                label = self.targetTransform(label) 
        else:
            idx = idx - self.n_stop_images
            image_name = self.not_stop_image_list[idx]
            image_path = os.path.join(self.not_stop_dir, image_name)
            #image = read_image(self.not_stop_dir+str(int(idx+1))+".JPG")
            #image = Image.open(self.not_stop_dir+str(int(idx+1))+".JPG")
            image = Image.open(image_path)
            #image = np.asarray(image).astype(np.uint8)
            label = 1 # 0 is for stop image and 1 is for non-stop image
            self.labels[i] = 1 #Set it also in labels array
            if self.transform:
                image = self.transform(image)
            if self.targetTransform:
                label = self.targetTransform(label)

        return image, label



In [5]:
#Test of Splitting DataSet into train, validation and test
#Define Transformation on Image
image_transform = transforms.Compose([transforms.Resize(size=(reshape_width, reshape_heigth)), transforms.ToTensor()])
#Create DataSet object
full_data_set = stopNotStopData(stop_dir=stop_image_dir, not_stop_dir=not_stop_image_dir, transform=image_transform, targetTransform=None)

#First Find Number of Elements of each dataset
num_elements = np.round_([len(full_data_set)*0.8, len(full_data_set)*0.1, len(full_data_set)*0.1]).astype(int)
num_elements[0] = num_elements[0] - (np.sum(num_elements)-len(full_data_set))
#Split dataset into three parts
train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(full_data_set, num_elements, generator=torch.Generator().manual_seed(75))


In [15]:
#Test of the training part for basic structure
class tevfik_nn(nn.Module):
    #Constructor
    def __init__(self, out_channel_1 = 16, out_channel_2 = 32):
        super(tevfik_nn, self).__init__()
        self.out_channel_1 = out_channel_1
        self.out_channel_2 = out_channel_2
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=self.out_channel_1, kernel_size=5, padding=3, stride=2)
        self.maxpool1 = nn.MaxPool2d(kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=self.out_channel_1, out_channels=self.out_channel_2, kernel_size=3, padding=1, stride=1)
        self.maxpool2 = nn.MaxPool2d(kernel_size=3)

        self.linear1 = nn.Linear(out_channel_2*43*31, 1024)
        self.linear2 = nn.Linear(1024, 1)

        
    def forward(self, x):
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.maxpool2(x)
        print(x.shape)
        #x = self.flatten(x)
        x = torch.flatten(x)
        print(x.shape)
        x = self.linear1(x)
        x = torch.relu(x)
        x = self.linear2(x)
        x = torch.sigmoid(x)
        return x

#Device Selection
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
    
print(device)

#model = tevfik_nn(out_channel_1=8, out_channel_2=16).to(device)
#x_test = torch.rand(3, 1300, 946, device=device)
#y_yest = model(x_test)

cuda


In [16]:
#Test of writing function to train model
# Model Training Function
def train_model(model, n_epochs, train_loader, validation_loader, optimizer, criterion, N_test):
    cost_list = []
    accuracy_list = []
    # Loops for each epoch
    for epoch in range(n_epochs):
        # Keeps track of cost for each epoch
        COST=0
        # For each batch in train loader
        for x, y in train_loader:
            x.to(device)
            y.to(device)
            # Resets the calculated gradient value, this must be done each time as it accumulates if we do not reset
            optimizer.zero_grad()
            # Makes a prediction based on X value
            x.to(device)
            z = model(x)
            # Measures the loss between prediction and acutal Y value
            loss = criterion(z, y)
            # Calculates the gradient value with respect to each weight and bias
            loss.backward()
            # Updates the weight and bias according to calculated gradient value
            optimizer.step()
            # Cumulates loss 
            COST+=loss.data
        
        # Saves cost of training data of epoch
        cost_list.append(COST)
        # Keeps track of correct predictions
        correct=0
        # Perform a prediction on the validation  data  
        for x_test, y_test in validation_loader:
            # Makes a prediction
            z = model(x_test)
            # The class with the max value is the one we are predicting
            _, yhat = torch.max(z.data, 1)
            # Checks if the prediction matches the actual value
            correct += (yhat == y_test).sum().item()
        
        # Calcualtes accuracy and saves it
        accuracy = correct / N_test
        accuracy_list.append(accuracy)
        print('Epoch' + str(epoch+1) + 'Finished!')
    return model, cost_list, accuracy_list

In [17]:
import matplotlib.pyplot as plt

#Test part of the model
model = tevfik_nn(out_channel_1=8, out_channel_2=16).to(device)
n_epochs = 5

# We create a criterion which will measure loss
criterion = nn.CrossEntropyLoss()
learning_rate = 0.1
# Create an optimizer that updates model parameters using the learning rate and gradient
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

#Creating DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=32 ,shuffle=True)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=len(valid_dataset), shuffle=True)
N_test = len(valid_dataset)

#Training the model
model, cost_list, accuracy_list = train_model(model=model, n_epochs=n_epochs, train_loader=train_dataloader, validation_loader=valid_dataloader, optimizer=optimizer, criterion=criterion, N_test=N_test)

plt.plot(range(1,n_epochs+1), cost_list)
plt.show()

plt.plot(range(1,n_epochs+1), accuracy_list)
plt.show()

RuntimeError: stack expects each tensor to be equal size, but got [3, 1300, 946] at entry 0 and [4, 1300, 946] at entry 20

In [18]:
for x, y in train_dataloader:
    print(x.shape)

torch.Size([32, 3, 1300, 946])


RuntimeError: stack expects each tensor to be equal size, but got [3, 1300, 946] at entry 0 and [4, 1300, 946] at entry 1

In [None]:
#Test can it be transferrable to dataLoader
train_dataloader = DataLoader(train_dataset, batch_size=32 ,shuffle=True)
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=len(valid_dataset), shuffle=True)

train_features, train_labels = next(iter(train_dataloader))
print(len(train_labels))

In [None]:
#Test of Convolution and max pooling
conv_test = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, padding=3, stride=2)
pooling = nn.MaxPool2d(kernel_size=5)

test_idx = 5
test_x = train_features[test_idx]

print(test_x.shape)

aft_conv1 = conv_test(test_x)
print(type(aft_conv1))
print(aft_conv1.shape)

aft_pool = pooling(aft_conv1)
print(aft_pool.shape)

conv_2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1, stride=1)
aft_conv2 = conv_2(aft_pool)
pool_2 = nn.MaxPool2d(kernel_size=3)
final = pool_2(aft_conv2)
print(final.shape)
final = torch.flatten(final)
print(final.shape)


In [None]:
import matplotlib.pyplot as plt
from random import randrange

idx = randrange(32)

#Load the Data
data = train_features[idx]
[rgb, n_y, n_x] = data.shape

image = np.zeros([n_y, n_x ,rgb])
for i in range(rgb):
    image[:,:,i] = data[i][:][:]

plt.figure(figsize=(8,8))
plt.imshow(image)
plt.title(str(train_labels[idx]))
plt.show()
print(image.shape)

In [None]:
print(reshape_width)
print(reshape_heigth)