In [None]:
# import torch and numpy and pretrained model
from torchvision import models
import torch
import numpy as np

# load pretrained model
pretrained_cnn = models.resnet50(weights = 'DEFAULT', progress = True)

In [None]:
# changes fully connected/classifier layer to new layer for us to train, 15 is the number of solutions or types of leaves
pretrained_cnn.fc = torch.nn.Linear(2048, 15)

In [None]:
#freeze model except fc layer because we don't wanna retrain the pretrained model
for param in pretrained_cnn.parameters():
    param.requires_grad = False

for param in pretrained_cnn.fc.parameters():
    param.requires_grad = True

In [None]:
criterion = torch.nn.CrossEntropyLoss() #could write this out ourselves
# need to find an optimizer or make one for a custom softmax function
optimizer = torch.optim.Adam(pretrained_cnn.fc.parameters(), lr=0.001) #need optimize learning rate idk how momentum works

In [None]:
# # Get Files
import sys, os
from scripts import ProcessData
from multiprocessing import Pool, cpu_count
from functools import partial
from io import BytesIO

data_dir = "./data/"

def get_urls():
    urls = []
    for i in range(1, 16):
        urls.append(f"https://www.cvl.isy.liu.se/en/research/datasets/swedish-leaf/leaf{i}.zip")
    return urls

if __name__ == "__main__":
    os.makedirs(data_dir)
    

    # download and extract in parallel 
    print(f"There are {cpu_count()} CPUs on this machine ") 
    with Pool(cpu_count()) as p:
        download_func = partial(ProcessData.download_zip, file_path = data_dir)
        print(p.map(download_func, get_urls()))







In [None]:
#import data
from PIL import Image
import os
from sklearn.preprocessing import OneHotEncoder

# gets the label based on the number
def getLabel(s):
    labels = {1 : "Ulmus carpinifolia", 
                2 : "Acer", 
                3 : "Salix aurita", 
                4 : "Quercus", 
                5 : "Alnus incan", 
                6 : "Betula pubescens", 
                7 : "Salix alba 'Sericea'", 
                8 : "Populus tremula", 
                9 : "Ulmus glabra", 
                10 : "Sorbus aucuparia", 
                11 : "Salix sinerea", 
                12 : "Populus", 
                13 : "Tilia", 
                14 : "Sorbus intermedia", 
                15 : "Fagus silvatica"}
    return labels.get(int(s))

imgs = [] # images
labels = [] # labels not in use, because you can't feed strings to gpu, need to feed tensors
ohe = [] # storing int values for one hot encodings
labeled_images = []

for leaf_folder in os.listdir(data_dir):
    for leaf in os.listdir(leaf_folder):
        imgs.append(os.path.join(data_dir, leaf_folder, leaf))
        val = leaf.split('leaf')[0]   # split into list with just the number, gets first value, the number
        labels.append(getLabel(val)) 
        ohe.append(val)

# one hot encoding (changing 1-15 to tensors for gpu)
encoded = OneHotEncoder(categories = [[x for x in range(1, 16)]], sparse = False).fit_transform(np.array(ohe).reshape((len(ohe),1)))

#joining image and label
for image, ohe_label in zip(imgs, encoded):
    labeled_images.append([image, ohe_label])

# print(la)
# print(im)
# print(joined[1])

In [None]:
#making dataset
from torch.utils.data import Dataset
import torchvision.transforms as transforms

# defining transform for resnet
resNetTransform = transforms.Compose([
    transforms.Resize((224, 224)), # change to what data should be
    transforms.ToTensor(),
    ])

# makes a custom dataset based on pytorch dataset class
class PlantDataset(Dataset):
    def __init__(self, data_dir, arr, transform = resNetTransform):
        # initialize some valuess
        self.data_dir = data_dir
        self.data = [img for img, _ in arr]
        self.arr = arr

        #transform to normalize/resize all images
        self.transform = transform
    
    def __len__(self):
        return len(self.arr)
    
    def __getitem__(self, idx):
        img, label = self.arr[idx] 
        img = self.transform(Image.open(img))
                
        #returns a tuple of the transformed image and the label (one-hot encoding)
        return (img, label)


In [None]:
#chcek devices change to whatever you're using

print(torch.backends.mps.is_available())
print(torch.backends.mps.is_built())

#set device
device = torch.device('mps')

# from tqdm import tqdm # progress

In [None]:
# K Fold Cross Validation
from sklearn.model_selection import KFold
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from torchvision import transforms
import os

# init stuff
k_folds = 5
num_epochs = 1
dataset = PlantDataset(data_dir, labeled_images)
loss_function = nn.CrossEntropyLoss()

# For fold results
results = {}

# Set fixed random number seed
torch.manual_seed(42)

kfold = KFold(n_splits=k_folds, shuffle=True)

# Start print
print('--------------------------------')

for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):
    
    # Print
    print(f'FOLD {fold}')
    print('--------------------------------')
    
    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
    # Define data loaders for training and testing data in this fold
    trainloader = torch.utils.data.DataLoader(
                      dataset, 
                      batch_size=10, sampler=train_subsampler)
    testloader = torch.utils.data.DataLoader(
                      dataset,
                      batch_size=10, sampler=test_subsampler)
    
    # Initialize optimizer
    optimizer = torch.optim.Adam(pretrained_cnn.parameters(), lr=1e-4)
    
    # Run the training loop for defined number of epochs
    for epoch in range(0, num_epochs):

      # Print epoch
      print(f'Starting epoch {epoch+1}')

      # Set current loss value
      current_loss = 0.0

      # Iterate over the DataLoader for training data
      for i, data in enumerate(trainloader, 0):
        
        # Get inputs
        inputs, targets = data
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Perform forward pass
        outputs = pretrained_cnn(inputs)
        
        # Compute loss
        loss = loss_function(outputs, targets)
        
        # Perform backward pass
        loss.backward()
        
        # Perform optimization
        optimizer.step()
        
        # Print statistics
        current_loss += loss.item()
        if i % 500 == 499:
            print('Loss after mini-batch %5d: %.3f' %
                  (i + 1, current_loss / 500))
            current_loss = 0.0
            
    # Process is complete.
    print('Training process has finished. Saving trained model.')

    # Print about testing
    print('Starting testing')
    
    # Saving the model
    save_path = f'./model-fold-{fold}.pth'
    torch.save(pretrained_cnn.state_dict(), save_path)

    # Evaluationfor this fold
    correct, total = 0, 0
    with torch.no_grad():

      # Iterate over the test data and generate predictions
      for i, data in enumerate(testloader, 0):

        # Get inputs
        inputs, targets = data

        # Generate outputs
        outputs = pretrained_cnn(inputs)

        # Set total and correct
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

      # Print accuracy
      print('Accuracy for fold %d: %d %%' % (fold, 100.0 * correct / total))
      print('--------------------------------')
      results[fold] = 100.0 * (correct / total)
    
# Print fold results
print(f'K-FOLD CROSS VALIDATION RESULTS FOR {k_folds} FOLDS')
print('--------------------------------')
sum = 0.0
for key, value in results.items():
    print(f'Fold {key}: {value} %')
    sum += value
    print(f'Average: {sum/len(results.items())} %')

In [18]:
#testing accuracy on test dataset
import tqdm

total_correct = 0
total_instances = 0

pretrained_cnn.eval()
# iterating through batches without updating gradients
with torch.no_grad():
    for images, labels in tqdm(train_loader):
      pretrained_cnn.to(device)
      images = images.to(device)
      # labels = labels.float().to(device) # don't need this cuz we not training no more

      # making classifications and deriving indices of maximum value via argmax (which gives the max value i the tensor)
      solution_tensor = pretrained_cnn(images)
      classifications = torch.argmax(solution_tensor, dim = 1).item()

      #undoing one-hot encoding to get label value as a number
      label = np.where(labels.numpy() == 1)[1]

      correct_predictions = int(classifications==label)

      #  incrementing counters
      total_correct+=correct_predictions
      total_instances+=len(images)

#print accuracy
print("basline before training")
print(round(total_correct/total_instances, 3))

100%|██████████| 900/900 [00:59<00:00, 15.24it/s]


basline before training
0.072


In [20]:
#testing accuracy on test dataset
total_correct = 0
total_instances = 0

#chang back batch size
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=1, shuffle=True)

pretrained_cnn.eval()
# iterating through batches without updating gradients
with torch.no_grad():
    for images, labels in tqdm(train_loader):
      images = images.to(device)
      # labels = labels.float().to(device) # don't need this cuz we not training no more

      # making classifications and deriving indices of maximum value via argmax (which gives the max value i the tensor)
      solution_tensor = pretrained_cnn(images)
      classifications = torch.argmax(solution_tensor, dim = 1).item()

      #undoing one-hot encoding to get label value as a number
      label = np.where(labels.numpy() == 1)[1]

      correct_predictions = int(classifications==label)

      #  incrementing counters
      total_correct+=correct_predictions
      total_instances+=len(images)

#print accuracy
print("after training")
print(round(total_correct/total_instances, 3))

100%|██████████| 900/900 [00:49<00:00, 18.07it/s]

after training
1.0





In [29]:
#dump to pickle
import pickle
pickle.dump(pretrained_cnn, open('model.pkl', 'wb'))