In [None]:
# importing relevant packages 

import torch 
from torchvision import models
import pandas as pd
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn 
import torch.nn.functional as F
import glob
import os
import cv2
import matplotlib.pyplot as plt 
import torchvision.transforms as transforms
from torchsummary import summary
from sklearn.model_selection import train_test_split
import numpy as np
from tqdm import tqdm
import json
from PIL import Image
import random
import albumentations as A
import glob

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>div.output_scroll { height: 45em; }</style>"))
pd.options.display.max_columns = None

In [None]:
class VGG_model(nn.Module):
    def __init__(self):
        super(VGG_model, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        )
        
        self.avgpool = nn.Sequential(
            nn.Conv2d(512,512, kernel_size=3, padding='same'),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(512,50, kernel_size=3, padding='same'),
            nn.LeakyReLU(0.1,inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.AdaptiveAvgPool2d(output_size=(8,8))
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(3200, 300),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Dropout(0.3),
            nn.Linear(300, 32),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x
        

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = VGG_model()
model.load_state_dict(torch.load('./Model_VGG_7_1000_22100_epoch'))
model = model.to(device)

In [None]:
def predict_keypoints(img):
    img = cv2.resize(img, (224,224))
    img_tensor = transforms.ToTensor()(img)
    img_mean = img_tensor.mean(dim = (1,2))
    img_std = img_tensor.std(dim = (1,2))
        
    img_normalised = transforms.Normalize(img_mean, img_std)(img_tensor)
    img_normalised = img_normalised.to(device)
    
    key_points = model(img_normalised[None]).flatten().detach().cpu().numpy()
    
    return key_points

In [None]:
def plot_keypoints(img, keypoints):                                                             

    plt.imshow(img)

    keypoints = np.array(keypoints)
    
    x_points = keypoints[0::2]
    y_points = keypoints[1::2]
    
    plt.scatter(x_points*img.shape[1], y_points*img.shape[0], s = 4, c=(1,0,0))
    plt.show()

In [None]:
def process_num(x):
    return x*random.uniform(0.99, 1.01)

In [None]:
def process_label(x):
    string_to_int_dict = {'orange':0, 'pear':1, 'banana':2, 'plum':3, 'egg':4, 'strawberry':5, 'chicken':6, 'bayberry':7, 'redgrape':8, 'pistachio':9}
    return string_to_int_dict[x]


In [None]:
def transform_image(img):
    img = cv2.resize(img, (224,224))
    img_tensor = transforms.ToTensor()(img)
    img_mean = img_tensor.mean(dim = (1,2))
    img_std = img_tensor.std(dim = (1,2))
        
    img_normalised = transforms.Normalize(img_mean, img_std)(img_tensor)
    return img_normalised

In [None]:
def augment_data():

    source_folder = os.path.join(os. getcwd(), 'Grasp_dataset_2')
    destination_folder = os.path.join(os. getcwd(), 'Grasp_dataset_augmented')
    dataset_names = ['Grasp_dataset_orange', 'Grasp_dataset_pear', 'Grasp_dataset_banana', 'Grasp_dataset_plum', 'Grasp_dataset_egg', 'Grasp_dataset_strawberry', 'Grasp_dataset_chicken', 'Grasp_dataset_bayberry', 'Grasp_dataset_redgrape', 'Grasp_dataset_pistachio']

    for f in dataset_names:
        
        files = glob.glob(destination_folder + '/' + f + '/*')
        
        for file in files:
            os.remove(file)
        
        df = pd.read_csv(source_folder + '/' + f + '/' + f + '.csv')
        
        num_cols = df.select_dtypes(include=['float']).columns
        df[num_cols] = df[num_cols].applymap(process_num)
        df['label'] = df['label'].apply(process_label)
        
#         df.to_csv(destination_folder + '/' + f + '/' + f + '.csv', index=False)
        
        i = 0
        repeat = 0
        
        header = ['p1_x', 'p1_y', 'p2_x', 'p2_y', 'p3_x', 'p3_y', 'p4_x', 'p4_y', 'p5_x', 'p5_y', 'p6_x', 'p6_y',
              'p7_x', 'p7_y', 'p8_x', 'p8_y', 'p9_x', 'p9_y', 'p10_x', 'p10_y', 'p11_x', 'p11_y', 'p12_x', 'p12_y', 'p13_x',
              'p13_y', 'p14_x', 'p14_y', 'p15_x', 'p15_y', 'p16_x', 'p16_y']
            
        keypoints_df = pd.DataFrame(columns=header)
        
        while i < df.shape[0]:
        
            transform = A.Compose([
                    A.Affine(rotate=random.uniform(-0.1, 0.1), p=1),
                    A.Affine(translate_percent={'x': random.uniform(-0.001, 0.001), 'y': random.uniform(-0.001, 0.001)}, p=1),
                    A.Affine(shear={'x': random.uniform(-0.1, 0.1), 'y': random.uniform(-0.05, 0.05)}, p=1),
                    A.Affine(scale=(0.999, 1.001), p=1)
                ], keypoint_params=A.KeypointParams(format='xy'))
            
            image_to_be_transformed = cv2.imread(os.path.join(source_folder + '/' + f, df.iloc[i]['image_name']))
            
            processed_image_to_be_transformed = transform_image(image_to_be_transformed)
            processed_image_to_be_transformed = processed_image_to_be_transformed.to(device)

            keypoints = model(processed_image_to_be_transformed[None]).flatten().detach().cpu().numpy()
            
            x_points = keypoints[0::2]
            y_points = keypoints[1::2]
            x_points = x_points*image_to_be_transformed.shape[1]
            y_points = y_points*image_to_be_transformed.shape[0]

            transformed = transform(image=image_to_be_transformed, keypoints=list(zip(x_points, y_points)))
            transformed_image = transformed['image']
            transformed_keypoints = transformed['keypoints']
            
            if repeat >= 20:
                print('skipping')
                repeat = 0
                i = i + 1
                continue
    
            if len(transformed_keypoints) != 16:
#                 print(df.iloc[i]['image_name'])
                repeat = repeat + 1
                continue
            
            flatten_coordinates = np.array(transformed_keypoints).flatten()
            flatten_coordinates[::2] = [x / transformed_image.shape[1] for x in flatten_coordinates[::2]]
            flatten_coordinates[1::2] = [x / transformed_image.shape[0] for x in flatten_coordinates[1::2]]
            keypoints_df.loc[len(keypoints_df)] = pd.Series(flatten_coordinates, index=header)
#             print(flatten_coordinates)
#             print(keypoints_df)
            
#             transformed_image[..., [0, 2]] = transformed_image[..., [2, 0]]
#             img = Image.fromarray(np.uint8(transformed_image))
#             img.save(destination_folder + '/' + f + '/' + df.iloc[i]['image_name'])
            
#             plot_keypoints(transformed_image, flatten_coordinates)
            
            repeat = 0
            i = i + 1
        
        complete_df = pd.concat([df, keypoints_df], axis=1)
        complete_df.to_csv(destination_folder + '/' + f + '/' + f + '.csv', index=False)


In [None]:
# test_image = cv2.imread('/home/dell/Desktop/xxxx/Uni/Kirigami_project/Keypoint_detection_notebooks/Grasp_dataset_augmented/Grasp_dataset_orange/Gimage10.jpg')
# test_keypoints = [0.12771776,0.2779994,0.47523814,0.025530297,0.8148705,0.2775899,0.8246956,0.62812585,0.49508992,0.9256545,0.15821105,0.73071223,0.43649036,0.20082472,0.40966305,0.36028758,0.39995593,0.49125895,0.41422606,0.61616457,0.44915825,0.75999326,0.52863693,0.7681227,0.5525919,0.6140371,0.5610489,0.49106207,0.5443897,0.3638414,0.51417565,0.20984803]
# print(len(test_keypoints))
# plot_keypoints(test_image, test_keypoints)

In [None]:
def combine_datasets():

    destination_folder = os.path.join(os. getcwd(), 'Grasp_dataset_augmented')
    dataset_names = ['Grasp_dataset_orange', 'Grasp_dataset_pear', 'Grasp_dataset_banana', 'Grasp_dataset_plum', 'Grasp_dataset_egg', 'Grasp_dataset_strawberry', 'Grasp_dataset_chicken', 'Grasp_dataset_bayberry', 'Grasp_dataset_redgrape', 'Grasp_dataset_pistachio']

    complete_df_set = pd.DataFrame()
    first_dataset = True

    for f in dataset_names:

        df = pd.read_csv(destination_folder + '/' + f + '/' + f + '.csv')

        if first_dataset == True:
            complete_df_set = df
            first_dataset = False
        else:
            complete_df_set = pd.concat([complete_df_set, df], axis=0)

    complete_df_set.to_csv(destination_folder + '/' + 'Grasp_dataset_augmented.csv', index=False)


In [None]:
augment_data()
combine_datasets()

In [None]:
# from sklearn.ensemble import RandomForestClassifier

# augment_data()
# combine_datasets()

# grasp_dataset = pd.read_csv(os. getcwd() + '/' + 'Grasp_dataset_augmented/Grasp_dataset_augmented.csv')

# print(grasp_dataset.tail(2))


# data_train, data_test, label_train, label_test = train_test_split(grasp_data, grasp_label, test_size=0.2, random_state=np.random.randint(100))

# clf = RandomForestClassifier(n_estimators=100, max_depth=100, random_state=np.random.randint(100))

# num_folds = 10
# cv_method = KFold(n_splits=num_folds, shuffle=True, random_state=np.random.randint(100))
# cv_results = cross_val_score(clf, grasp_data, grasp_label, cv=cv_method, scoring='accuracy')

# print('Cross-validation results:', cv_results)
# print('Average accuracy:', cv_results.mean())


In [None]:
class ClassifierDataset(Dataset):
    def __init__(self, df):
        self.df = df
        self.data, self.label = self.clean_data(df) # clean data
        
        
    def __getitem__(self, idx):
        data = self.data.iloc[idx]
        label = self.label.iloc[idx]
#         label = F.one_hot(torch.tensor(label), 10)
        
        return torch.tensor(data, dtype=torch.float32).to(device), torch.tensor(label, dtype=torch.int64).to(device)
    
    def __len__(self):
        return self.data.shape[0]


    def clean_data(self, grasp_dataset):
        grasp_data = grasp_dataset.drop(columns=['image_name', 'label'], axis=1)
        grasp_data['pressure_reading_1'] = grasp_data['pressure_reading_1'] / 100
        grasp_data['pressure_reading_2'] = grasp_data['pressure_reading_2'] / 100
        grasp_data['force_reading_1'] = grasp_data['force_reading_1'] / 1000
        grasp_data['force_reading_2'] = grasp_data['force_reading_2'] / 1000
        grasp_label = grasp_dataset['label']

        return grasp_data, grasp_label

In [None]:
# grasp_dataset = pd.read_csv(os. getcwd() + '/' + 'Grasp_dataset_augmented/Grasp_dataset_augmented.csv')

# train_df, test_df = train_test_split(grasp_dataset, test_size=0.1)

# train_dataset = ClassifierDataset(train_df)
# test_dataset = ClassifierDataset(test_df)

# train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True)
# test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=True)

# for data in train_dataloader:
#     break

# input_data, target_output = data
# print(input_data)
# print(target_output)

In [None]:
class MLP(nn.Module):

    def __init__(self):
        super(MLP, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(16*2+4, 64),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Dropout(0.2),
            nn.Linear(64, 128),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.LeakyReLU(0.1,inplace=True),
            nn.Dropout(0.2),
            nn.Linear(32, 10)
        )

    def forward(self, x):
        return self.layers(x)



In [None]:
classification_model = MLP()
classification_model = classification_model.to(device)

In [None]:
summary(classification_model, (8,16*2+4))

In [None]:
def update_training_set():

    global train_df, test_df, train_dataset, test_dataset, train_dataloader, test_dataloader, image_data
    
    destination_folder = os.path.join(os. getcwd(), 'Grasp_dataset_augmented')

    augment_data()
    combine_datasets()
    
    grasp_dataset = pd.read_csv(os. getcwd() + '/' + 'Grasp_dataset_augmented/Grasp_dataset_augmented.csv')
    
    train_df, test_df = train_test_split(grasp_dataset, test_size=0.1)
    
    train_dataset = ClassifierDataset(train_df)
    test_dataset = ClassifierDataset(test_df)
    
    train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=True)
    
    print('Finished.')

In [None]:
update_training_set()

In [None]:
def get_essentials():
    loss_fun = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(classification_model.parameters(), lr=0.00001)
    return loss_fun, optimizer

In [None]:
def train_batch(data, model, loss_fun, optimizer):
    model.train()
    input_data, target_output = data
    pred_output = model(input_data)
    loss = loss_fun(pred_output, target_output)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    return loss.item()

@torch.no_grad()
def val_batch(data, model, loss_fun, optimizer):
    model.eval()
    input_data, target_output = data
    pred_output = model(input_data.to(torch.float32))
    loss = loss_fun(pred_output, target_output)
    return loss.item()


In [None]:
epochs = 1000
loss_fun, optimizer = get_essentials()

In [None]:
train_epoch, val_epoch = [], []
for epoch in tqdm(range(epochs)):
    train_batch_losses, val_batch_losses = [], []
    for data in train_dataloader:
        train_batch_loss = train_batch(data, classification_model, loss_fun, optimizer)
        train_batch_losses.append(train_batch_loss)
    for data in test_dataloader:
        val_batch_loss = val_batch(data, classification_model, loss_fun, optimizer)
        val_batch_losses.append(val_batch_loss)
    train_epoch.append(np.mean(train_batch_losses))
    val_epoch.append(np.mean(val_batch_losses))

    if epoch % 5 == 0:
        print(train_batch_loss, val_batch_loss)
    
    if (epoch) % 100 == 0:
        torch.save(classification_model.state_dict(), './Temp_models/Model_classify_dataset_2' + str(epoch) + '_epoch')
        update_training_set()

In [None]:
plt.plot(range(epochs), train_epoch, label="train_loss")
plt.plot(range(epochs), val_epoch, label="test_loss")
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training Facial Keypoints model")
plt.show()

In [None]:
correct = 0
total = 0

Grasp_dataset_validation = pd.read_csv(os. getcwd() + '/' + 'Grasp_dataset_validation/Grasp_dataset_validation.csv')
    

validation_dataset = ClassifierDataset(Grasp_dataset_validation)
    
validation_dataloader = DataLoader(validation_dataset, batch_size=1, shuffle=True)


# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    classification_model.eval()
    for data in validation_dataloader:
        input_data, labels = data
        # calculate outputs by running images through the network
        outputs = classification_model(input_data)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 100 test images: {100 * correct // total} %')
