<a href="https://colab.research.google.com/github/nspiegeln/iml_2022/blob/main/task3_main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#load data
from google.colab import files
uploaded = files.upload()

In [None]:
import torch
import numpy as np
from matplotlib import image
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.pyplot as plt


In [None]:
with open ('train_triplets.txt') as f:
    train_id = f.readlines()
    f.close()

with open ('test_triplets.txt') as f:
    test_id = f.readlines()
    f.close()


train_transforms = transforms.Compose([transforms.RandomRotation(30),
                                       transforms.RandomResizedCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406], 
                                                            [0.229, 0.224, 0.225])])

test_transforms = transforms.Compose([transforms.Resize(256),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406], 
                                                           [0.229, 0.224, 0.225])])


train_id, val_id = train_test_split(train_id, test_size = 0.1, random_state = 42)

trainloader = torch.utils.data.DataLoader(train_id, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(test_id, batch_size=32)
valloader = torch.utils.data.DataLoader(val_id, batch_size=64, shuffle=True)

In [None]:
model = models.densenet121(pretrained=True ) 
criterion = nn.TripletMarginLoss(margin=1,p = 2.0, reduce =None, reduction='mean')
optimizer = optim.Adam(model.classifier.parameters(), lr=0.0003)

In [None]:
for param in model.parameters():
    param.requires_grad = False

from collections import OrderedDict
classifier = nn.Sequential(OrderedDict([
                          ('fc1', nn.Linear(1024, 500)),
                          ('bn1',nn.BatchNorm1d(500)),
                          ('relu1',nn.ReLU(inplace=True)),
                        #   ('drop1',nn.Dropout(p=0.5)),
                          ('fc2', nn.Linear(500, 1000)),
                          ('output', nn.Linear(1000, 10000))
                          ]))
    
model.classifier = classifier

epochs = 10
print_every = 40
steps = 0

model.to('cuda')

In [None]:
def get_image(triplet_string, train_transforms):
    ids = triplet_string.split()
    image_nr1 = ids[0]
    image_nr2 = ids[1]
    image_nr3 = ids[2]

    pre_img1 = Image.open(r'food/' + image_nr1 + '.jpg')
    pre_img2 = Image.open(r'food/' + image_nr2 + '.jpg')
    pre_img3 = Image.open(r'food/' + image_nr3 + '.jpg')

    img1 = train_transforms(pre_img1)
    img2 = train_transforms(pre_img2)
    img3 = train_transforms(pre_img3)

    return img1, img2, img3, pre_img1, pre_img2 ,pre_img3

In [None]:
def batch_gen(labels,train_transforms):
    batch_A = torch.zeros((len(labels),3,224,224))
    batch_B = torch.zeros_like(batch_A)
    batch_C = torch.zeros_like(batch_A)

    for i in range(len(labels)):
        A,P,N,a,p,n = get_image(labels[i],train_transforms)
        batch_A[i,:,:,:] = A
        batch_B[i,:,:,:] = P
        batch_C[i,:,:,:] = N
    
    # #cast to tensor
    # batch_A = torch.from_numpy(batch_A)
    # batch_B = torch.from_numpy(batch_B)
    # batch_C = torch.from_numpy(batch_C)

    return batch_A, batch_B, batch_C

In [None]:
for e in range(epochs):
    sum_train_loss = 0
    training_loss = 0
    validation_loss = 0
    for inputs, labels in enumerate(trainloader):
        steps +=1
        
        batch_A, batch_B, batch_C= batch_gen(labels, train_transforms)
        batch_A, batch_B, batch_C = batch_A.to('cuda'), batch_B.to('cuda'), batch_C.to('cuda')

        optimizer.zero_grad()

        output1 = model.forward(batch_A)
        output2 = model.forward(batch_B)
        output3 = model.forward(batch_C)

        training_loss = criterion(output1,output2,output3)

        # if verbose:
        #     plt.figure()
        #     f,axarr = plt.subplots(1,3)
        #     axarr[0].imshow(img1_o)
        #     axarr[0].set_title('image A: ' + image_nr1)
        #     axarr[1].imshow(img2_o)
        #     axarr[1].set_title('image B: ' + image_nr2)
        #     axarr[2].imshow(img3_o)
        #     axarr[2].set_title('image C: ' + image_nr3)
        #     f.suptitle('error:{}'.format(loss))


        training_loss.backward()
        optimizer.step()

        sum_train_loss += training_loss.item()

    
    
        
        if steps % print_every == 0:
            print("Epoch: {}/{}... ".format(e+1, epochs),
            "Loss: {:.4f}".format(sum_train_loss/print_every))
            sum_train_loss = 0
        print('training loss per batch: {}'.format(training_loss))

    print('validation started')
    
    model.eval()
    for inputs, labels in enumerate(valloader):
        steps +=1
        
        batch_A, batch_B, batch_C= batch_gen(labels, train_transforms)
        batch_A, batch_B, batch_C = batch_A.to('cuda'), batch_B.to('cuda'), batch_C.to('cuda')


        with torch.no_grad():
            output1 = model(batch_A)
            output2 = model(batch_B)
            output3 = model(batch_C)

            validation_loss = criterion(output1,output2,output3)

        print('validation loss per batch: {}'.format(validation_loss))


In [None]:
def predict(batch_A,batch_B,batch_C):   
    predictions = np.zeros((batch_A.shape[0],1))
    with torch.no_grad():
        output1 = model(batch_A)
        output2 = model(batch_B)
        output3 = model(batch_C)
        loss = criterion2(output1,output2,output3)

        for i in range(len(loss)):
            
            if loss[i].item() == 0:
                predictions[i] = 1
            else:
                predictions[i] = 0
    return predictions


In [None]:
firstIteration = True
for inputs, labels in enumerate(testloader):
    # for triplets in labels:
    #     img1, img2, img3,img1_o,img2_o,img3_o = get_image(triplets, test_transforms)
    #     img1, img2, img3 = img1.to('cuda'), img2.to('cuda'), img2.to('cuda'),

    batch_A, batch_B, batch_C= batch_gen(labels, test_transforms)
    batch_A, batch_B, batch_C = batch_A.to('cuda'), batch_B.to('cuda'), batch_C.to('cuda')

    with torch.no_grad():
        batch_pred=predict(batch_A,batch_B,batch_C)
        # predictions1 = np.append(batch_pred)
        
    if firstIteration:
        predictions2 = batch_pred
        firstIteration = False

    
    else:
        predictions2= np.concatenate((predictions2,batch_pred),axis=0)

In [None]:
predictions = predictions2
predictions_tot = predictions.reshape((-1,1))
predictions_tot = np.rint(predictions_tot)
#with open('solutions.csv', 'w', newline='') as f:
#    writer=csv.writer(f)
#    writer.writerows(predictions2)
np.savetxt('solutions.csv', predictions_tot, fmt='%s')

# cast to data frame
import pandas as pd

df_pred = pd.DataFrame(np.squeeze(predictions_tot))
df_pred.to_csv('solutions.csv', index=False, encoding='utf-8',index_label=None)