# Packages Import

In [None]:
import pandas as pd
import numpy as np
from PIL import Image
import cv2
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import torchvision.datasets as datasets
from sklearn.model_selection import train_test_split
import torchvision.transforms as transforms
import torch.nn.functional as F
from tqdm import tqdm, tqdm_notebook
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.cuda.get_device_name(device)

In [None]:
pip install pretrainedmodels #for pretrained Xception model

In [None]:
import pretrainedmodels 

In [None]:
print(pretrainedmodels.model_names)

In [None]:
model_name = 'xception'
model = pretrainedmodels.__dict__[model_name](num_classes=1000, pretrained='imagenet')

In [None]:
model

In [None]:
#128-D embedding space
model.last_linear = nn.Sequential(
    nn.Linear(2048, 128),
    nn.ReLU()
)

In [None]:
pip install pytorch-metric-learning #for supervised contrastive loss function

In [None]:
from pytorch_metric_learning import losses

In [None]:
df = pd.read_csv('../input/aptos-csv/train.csv')

In [None]:
image_dir = '../input/cropped-clahe-dr/Cropped_CLAHE_train_images/'
train_df, val_df = train_test_split(df, test_size = 0.20, shuffle=True, random_state= 44, stratify = df['diagnosis'])

In [None]:
df['diagnosis'].value_counts()

In [None]:
val_df['diagnosis'].value_counts()

In [None]:
transform = transforms.Compose([
            transforms.Resize(size=(224, 224), interpolation=Image.NEAREST),
            transforms.ToTensor()
])

In [None]:
model.to(device)

In [None]:
aug = A.Compose([
    #A.RandomBrightnessContrast(brightness_limit=1, contrast_limit=1, p=1.0),
    A.OneOf([
        A.Rotate(limit=90, p=1, border_mode=cv2.BORDER_CONSTANT),
        A.Rotate(limit=270, p=1, border_mode=cv2.BORDER_CONSTANT),
        A.HorizontalFlip(p=1),
        A.VerticalFlip(p=1)
    ], p=1)
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, image_dir, transform= None, aug = None):
        super(CustomDataset, self).__init__()
        self.image_ids = list(df['id_code'])
        self.labels = list(df['diagnosis'])
        self.image_dir = image_dir
        self.transform = transform
        self.aug = aug

    def __getitem__(self, idx):
        file_name = self.image_ids[idx]
        label = self.labels[idx]
        image = Image.open(self.image_dir+file_name+'.png').convert('RGB')
        aug_image = np.array(image)
        if self.aug:
            aug_image = self.aug(image=aug_image)
            image2 = transforms.ToPILImage()(aug_image['image'])
        if self.transform:
            image2 = self.transform(image2)
            image = self.transform(image)
        label = torch.tensor(label)
        return image, image2, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_dataset = CustomDataset(train_df, image_dir, transform=transform, aug= aug)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=8, pin_memory=True, drop_last=True)

In [None]:
test_dataset = CustomDataset(val_df, image_dir, transform=transform, aug = aug)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=8, pin_memory=True)

In [None]:
class Siamese_Encoder(nn.Module):
    def __init__(self, encoder):
        super(Siamese_Encoder, self).__init__()
        self.encoder = encoder
    
    def forward(self, x):
        return self.encoder(x)
    """
    def forward(self, x1, x2):
        out1 = self.forward_once(x1)
        out2 = self.forward_once(x2)
        return out1, out2
    """

In [None]:
Model = Siamese_Encoder(model).to(device) #Xception encoder

# Load Pretrained Weights

In [None]:
trained_100_epoch = torch.load('../input/xception-128-100/Xception_embedding_128_100.pth')

In [None]:
Model.load_state_dict(trained_100_epoch.state_dict())

In [None]:
optimizer = torch.optim.Adam(Model.parameters(), lr=0.001)
#criterion = SupervisedContrastiveLoss(temperature=0.07)
criterion = losses.SupConLoss(temperature=0.1)

In [None]:
training_loss = []
valid_loss_min = np.inf

In [None]:
import os
os.mkdir("./saved_model/")

# 1st stage representation learning

In [None]:
for n in range(100):
    train_loss = 0.0
    Model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x1, x2, y = batch
        x1 = x1.to(device)
        x2 = x2.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        #print("x1...................", torch.min(x1), torch.max(x1))
        #print("x2...................", torch.min(x2), torch.max(x2))
        out1 = Model(x1)
        out2 = Model(x2)
        #print(out1.shape)
        #print("out1...................", torch.min(out1), torch.max(out1))
        #print("out2...................", torch.min(out2), torch.max(out2))
        features = torch.cat((out1, out2), dim=0).squeeze()
        y = torch.cat((y, y), dim=0)
        #print(features.shape)
        #print(y.shape)
        loss = criterion(features, y)
        #print(loss.item())
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    train_loss /= len(train_dataloader.dataset)
    training_loss.append([n,train_loss])
    print("epoch number: ", n)
    print(f"Train Loss: {(100*train_loss):.2f}%")
    
    if valid_loss_min>train_loss:
        torch.save(Model, './saved_model/Xception_embedding_512.pth')
        print("new added", train_loss)
        valid_loss_min = train_loss

In [None]:
training_loss

In [None]:
loss_df = pd.DataFrame(np.array(training_loss))

In [None]:
loss_df.to_csv('./Suploss.csv')

In [None]:
Model

# 2nd stage fine-tuning

In [None]:
class FinalModel(nn.Module):
    def __init__(self, encoder):
        super(FinalModel, self).__init__()
        self.encoder = encoder
        self.classifier = nn.Linear(2048, 5)
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.classifier(x)
        return x

In [None]:
encoder2 = torch.load('../input/xception-128-100/Xception_embedding_128_100.pth')

encoder2.encoder.last_linear = nn.Identity()
model = FinalModel(encoder2).to(device)
for param in model.encoder.parameters():
    param.requires_grad = False
model


In [None]:
weights = torch.FloatTensor([1/1805, 1/999, 1/370, 1/295, 1/193]).to(device)
learning_rate = 0.001
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
Test = 0.0

In [None]:
training_acc = []
Test_Result = []
for n in range(30):
    total = 0
    train_loss = 0.0
    model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x1, x2, y = batch
        x1 = x1.to(device)
        del x2
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x1)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        total += (pred.argmax(1) == y).type(torch.float).sum().item()
    total /= len(train_dataloader.dataset)
    train_loss /= len(train_dataloader.dataset)
    training_acc.append({'Accuracy': total, 'Avg loss': train_loss})
    
    """evaluation"""
    model.eval()
    size = len(test_dataloader.dataset)
    test_loss, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm_notebook(test_dataloader, leave=False):
            x1, x2, y = batch
            x1 = x1.to(device)
            del x2
            y = torch.from_numpy(np.asarray(y)).to(device)
            pred = model(x1)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print("epoch number: ", n)
    print(f"Train Accuracy: {(100*total):.2f}%")
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    Test_Result.append({'Accuracy': correct, 'Avg loss': test_loss})
    if Test<correct:
        torch.save(model, "test_classifier.pth")
        Test = correct
        print("new added")

In [None]:
training_acc

In [None]:
Test_Result

In [None]:
Test = pd.DataFrame(Test_Result)
Test

In [None]:
Test.to_csv('./test_result.csv')

In [None]:
pd.DataFrame(training_acc).to_csv('./training_acc.csv')

In [None]:
model_100 = torch.load('./test_classifier.pth')

In [None]:
model.load_state_dict(model_100.state_dict())

In [None]:
y_true = []
y_pred = []

In [None]:
nb_classes = 5
confusion_matrix = torch.zeros(nb_classes, nb_classes)
model.eval()
size = len(test_dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad():
    for batch in tqdm_notebook(test_dataloader, leave=False):
        x1, x2, y = batch
        x1 = x1.to(device)
        del x2
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x1)
        print(pred.shape)
        _, preds = torch.max(pred, 1)
        
        for t, p in zip(y.view(-1), preds.view(-1)):
            confusion_matrix[t.long(), p.long()] += 1
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        y_pred.append(pred.cpu().numpy())
        y_true.append(y.cpu().numpy())
test_loss /= size
correct /= size
print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
print(confusion_matrix)

In [None]:
def softmax(x):
    """Compute softmax values for each sets of scores in x."""
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0) # only difference

In [None]:
from sklearn.metrics import roc_curve, auc, roc_auc_score
import matplotlib.pyplot as plt

In [None]:
y = np.concatenate(y_true, axis=0)

In [None]:
y.shape

In [None]:
y_p = np.concatenate(y_pred, axis=0 )

In [None]:
y_p.shape

In [None]:
y_pd = []

In [None]:
for i in range(len(y_p)):
    a = softmax(y_p[i])
    y_pd.append(list(a))

In [None]:
y_y = np.array(y_pd)

In [None]:
y_y.shape

In [None]:
y_y

In [None]:
y

In [None]:
roc_auc_score(y, y_y, multi_class="ovr")

In [None]:
from sklearn.preprocessing import label_binarize

In [None]:
y = label_binarize(y, classes=[0,1, 2, 3, 4])

In [None]:
y.shape

In [None]:
fpr = dict()
tpr = dict()
ab_auc_ = dict()
for i in range(5):
    fpr[i], tpr[i], _ = roc_curve(y[:, i],y_y[:,i])
    ab_auc_[i] = auc(fpr[i], tpr[i])

In [None]:
s = ["No DR", "Mild", "Moderate", "Severe", "PDR"]

In [None]:
# roc for each class
fig, ax = plt.subplots(figsize=(6, 6))
ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.05])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver operating characteristic curve')
for i in range(5):
    ax.plot(fpr[i], tpr[i], label='ROC curve (area = %0.2f) for class %i' % (ab_auc_[i], i))
ax.legend(loc="best")
ax.grid(alpha=.4)
#sns.despine()
fig.savefig("margin_0.1.png", dpi=500)
plt.show()

# Visualize with TSNE

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
encoder2 = torch.load('../input/xception-128-100/Xception_embedding_128_100.pth')

In [None]:
features = []
labels = []
encoder2.eval()
with torch.no_grad():
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x1, x2, y = batch
        x1 = x1.to(device)
        del x2
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = encoder2(x1)
        labels += list(y.cpu())
        features.append(pred.cpu().numpy())

In [None]:
X = np.array(features).reshape(-1, 128)

In [None]:
X.shape

In [None]:
embedded_space = TSNE(n_components=2).fit_transform(X)

In [None]:
y = [labels[i].item() for i in range(len(labels))]

In [None]:
len(y)

In [None]:
df = pd.DataFrame()
df["y"] = y
df["Dimension 1"] = embedded_space[:,0]
df["Dimension 2"] = embedded_space[:,1]
#df["Dimension 3"] = embedded_space[:, 2]

In [None]:
s = sns.scatterplot(x="Dimension 1", y="Dimension 2", hue=df.y.tolist(),
                palette=sns.color_palette("hls", 5),
                data=df)#.set(title="Diabetic Retinopathy Test Data T-SNE projection")

In [None]:
s.get_figure().savefig("train_points.png")

# End-to-end Xception classifier without SCL

In [None]:
df = pd.read_csv('../input/aptos-csv/train.csv')

In [None]:
image_dir = '../input/cropped-clahe-dr/Cropped_CLAHE_train_images/'
train_df, val_df = train_test_split(df, test_size = 0.20, shuffle=True, random_state= 44, stratify = df['diagnosis'])

In [None]:
transform = transforms.Compose([
            transforms.Resize(size=(224, 224), interpolation=Image.NEAREST),
            transforms.ToTensor()
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, image_dir, transform= None):
        super(CustomDataset, self).__init__()
        self.image_ids = list(df['id_code'])
        self.labels = list(df['diagnosis'])
        self.image_dir = image_dir
        self.transform = transform
        

    def __getitem__(self, idx):
        file_name = self.image_ids[idx]
        label = self.labels[idx]
        image = Image.open(self.image_dir+file_name+'.png').convert('RGB')
        
        if self.transform:
            #image2 = self.transform(image2)
            image = self.transform(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_dataset = CustomDataset(train_df, image_dir, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=False)

In [None]:
test_dataset = CustomDataset(val_df, image_dir, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
del Model

In [None]:
model_name = 'xception' # could be fbresnet152 or inceptionresnetv2
model = pretrainedmodels.__dict__[model_name](num_classes=1000, pretrained='imagenet')

In [None]:
model.last_linear = nn.Linear(in_features=2048, out_features=5)

In [None]:
model.to(device)

In [None]:
weights = torch.FloatTensor([1/1805, 1/999, 1/370, 1/295, 1/193]).to(device)
learning_rate = 0.001
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
Test = 0.0

In [None]:
training_acc = []
Test_Result = []
for n in range(30):
    total = 0
    train_loss = 0.0
    model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        total += (pred.argmax(1) == y).type(torch.float).sum().item()
    total /= len(train_dataloader.dataset)
    train_loss /= len(train_dataloader.dataset)
    training_acc.append({'Accuracy': total, 'Avg loss': train_loss})
    
    """evaluation"""
    model.eval()
    size = len(test_dataloader.dataset)
    test_loss, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm_notebook(test_dataloader, leave=False):
            x, y = batch
            x = x.to(device)
            y = torch.from_numpy(np.asarray(y)).to(device)
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print("epoch number: ", n)
    print(f"Train Accuracy: {(100*total):.2f}%")
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    Test_Result.append({'Accuracy': correct, 'Avg loss': test_loss})
    if Test<correct:
        torch.save(model, "test_classifier.pth")
        Test = correct
        print("new added")

In [None]:
Test = pd.DataFrame(Test_Result)
Test

In [None]:
Test.to_csv('./test_result.csv')

In [None]:
pd.DataFrame(training_acc).to_csv('./training_acc.csv')

In [None]:
model_100 = torch.load('./test_classifier.pth')
model.load_state_dict(model_100.state_dict())

In [None]:
y_true = []
y_pred = []

In [None]:
nb_classes = 5
confusion_matrix = torch.zeros(nb_classes, nb_classes)
model.eval()
size = len(test_dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad():
    for batch in tqdm_notebook(test_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        #print(pred.shape)
        _, preds = torch.max(pred, 1)
        
        for t, p in zip(y.view(-1), preds.view(-1)):
            confusion_matrix[t.long(), p.long()] += 1
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        y_pred.append(pred.cpu().numpy())
        y_true.append(y.cpu().numpy())
test_loss /= size
correct /= size
print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
print(confusion_matrix)

In [None]:
from sklearn.metrics import roc_curve, auc, roc_auc_score
import matplotlib.pyplot as plt

In [None]:
def softmax(x):
    """Compute softmax values for each sets of scores in x."""
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0) # only difference

In [None]:
y = np.concatenate(y_true, axis=0)
y_p = np.concatenate( y_pred, axis=0 )
y_pd = []

In [None]:
for i in range(len(y_p)):
    a = softmax(y_p[i])
    y_pd.append(list(a))

y_y = np.array(y_pd)

In [None]:
roc_auc_score(y, y_y, multi_class="ovr")
from sklearn.preprocessing import label_binarize
y = label_binarize(y, classes=[0, 1, 2, 3, 4])

In [None]:
fpr = dict()
tpr = dict()
ab_auc_ = dict()
for i in range(5):
    fpr[i], tpr[i], _ = roc_curve(y[:, i],y_y[:,i])
    ab_auc_[i] = auc(fpr[i], tpr[i])

In [None]:
# roc for each class
fig, ax = plt.subplots(figsize=(7, 5))
ax.plot([0, 1], [0, 1], 'k--')
ax.set_xlim([0.0, 1.0])
ax.set_ylim([0.0, 1.05])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver operating characteristic curve')
for i in range(5):
    ax.plot(fpr[i], tpr[i], label='ROC curve (area = %0.2f) for class %i' % (ab_auc_[i], i))
ax.legend(loc="best")
ax.grid(alpha=.4)
#sns.despine()
plt.savefig("roc_no_SCL.png", dpi=500)
plt.show()

# Other pretrained Models

# 1. Resnet50

In [None]:
model = torchvision.models.resnet50(True)

In [None]:
model

In [None]:
model.fc = nn.Linear(in_features=2048, out_features=5)
model.to(device)

In [None]:
df = pd.read_csv('../input/aptos-csv/train.csv')

In [None]:
image_dir = '../input/cropped-clahe-dr/Cropped_CLAHE_train_images/'
train_df, val_df = train_test_split(df, test_size = 0.20, shuffle=True, random_state= 44, stratify = df['diagnosis'])

In [None]:
transform = transforms.Compose([
            transforms.Resize(size=(224, 224), interpolation=Image.NEAREST),
            transforms.ToTensor()
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, image_dir, transform= None):
        super(CustomDataset, self).__init__()
        self.image_ids = list(df['id_code'])
        self.labels = list(df['diagnosis'])
        self.image_dir = image_dir
        self.transform = transform
        

    def __getitem__(self, idx):
        file_name = self.image_ids[idx]
        label = self.labels[idx]
        image = Image.open(self.image_dir+file_name+'.png').convert('RGB')
        
        if self.transform:
            #image2 = self.transform(image2)
            image = self.transform(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_dataset = CustomDataset(train_df, image_dir, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=False)

test_dataset = CustomDataset(val_df, image_dir, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
weights = torch.FloatTensor([1/1805, 1/999, 1/370, 1/295, 1/193]).to(device)
learning_rate = 0.001
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
Test = 0.0

In [None]:
training_acc = []
Test_Result = []
for n in range(30):
    total = 0
    train_loss = 0.0
    model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        total += (pred.argmax(1) == y).type(torch.float).sum().item()
    total /= len(train_dataloader.dataset)
    train_loss /= len(train_dataloader.dataset)
    training_acc.append({'Accuracy': total, 'Avg loss': train_loss})
    
    """evaluation"""
    model.eval()
    size = len(test_dataloader.dataset)
    test_loss, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm_notebook(test_dataloader, leave=False):
            x, y = batch
            x = x.to(device)
            y = torch.from_numpy(np.asarray(y)).to(device)
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print("epoch number: ", n)
    print(f"Train Accuracy: {(100*total):.2f}%")
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    Test_Result.append({'Accuracy': correct, 'Avg loss': test_loss})
    if Test<correct:
        torch.save(model, "resnet50_test_classifier.pth")
        Test = correct
        print("new added")

In [None]:
Test = pd.DataFrame(Test_Result)
Test

In [None]:
Test.to_csv('./test_result.csv')

In [None]:
pd.DataFrame(training_acc).to_csv('./training_acc.csv')

In [None]:
model_100 = torch.load('./resnet50_test_classifier.pth')
model.load_state_dict(model_100.state_dict())

In [None]:
y_true = []
y_pred = []

In [None]:
nb_classes = 5
confusion_matrix = torch.zeros(nb_classes, nb_classes)
model.eval()
size = len(test_dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad():
    for batch in tqdm_notebook(test_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        _, preds = torch.max(pred, 1)
        
        for t, p in zip(y.view(-1), preds.view(-1)):
            confusion_matrix[t.long(), p.long()] += 1
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        y_pred.append(pred.cpu().numpy())
        y_true.append(y.cpu().numpy())
test_loss /= size
correct /= size
print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
print(confusion_matrix)

# 2. VGG19

In [None]:
model = torchvision.models.vgg19(True)

In [None]:
model

In [None]:
model.classifier[6] = nn.Linear(4096, 5)
model.to(device)

In [None]:
df = pd.read_csv('../input/aptos-csv/train.csv')

In [None]:
image_dir = '../input/cropped-clahe-dr/Cropped_CLAHE_train_images/'
train_df, val_df = train_test_split(df, test_size = 0.20, shuffle=True, random_state= 44, stratify = df['diagnosis'])

In [None]:
transform = transforms.Compose([
            transforms.Resize(size=(224, 224), interpolation=Image.NEAREST),
            transforms.ToTensor()
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, image_dir, transform= None):
        super(CustomDataset, self).__init__()
        self.image_ids = list(df['id_code'])
        self.labels = list(df['diagnosis'])
        self.image_dir = image_dir
        self.transform = transform
        

    def __getitem__(self, idx):
        file_name = self.image_ids[idx]
        label = self.labels[idx]
        image = Image.open(self.image_dir+file_name+'.png').convert('RGB')
        
        if self.transform:
            #image2 = self.transform(image2)
            image = self.transform(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_dataset = CustomDataset(train_df, image_dir, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=False)

test_dataset = CustomDataset(val_df, image_dir, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
Test = 0.0

In [None]:
weights = torch.FloatTensor([1/1805, 1/999, 1/370, 1/295, 1/193]).to(device)
learning_rate = 0.001
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
training_acc = []
Test_Result = []
for n in range(30):
    total = 0
    train_loss = 0.0
    model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        total += (pred.argmax(1) == y).type(torch.float).sum().item()
    total /= len(train_dataloader.dataset)
    train_loss /= len(train_dataloader.dataset)
    training_acc.append({'Accuracy': total, 'Avg loss': train_loss})
    
    """evaluation"""
    model.eval()
    size = len(test_dataloader.dataset)
    test_loss, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm_notebook(test_dataloader, leave=False):
            x, y = batch
            x = x.to(device)
            y = torch.from_numpy(np.asarray(y)).to(device)
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print("epoch number: ", n)
    print(f"Train Accuracy: {(100*total):.2f}%")
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    Test_Result.append({'Accuracy': correct, 'Avg loss': test_loss})
    if Test<correct:
        torch.save(model, "test_classifier.pth")
        Test = correct
        print("new added")

In [None]:
Test = pd.DataFrame(Test_Result)
Test

In [None]:
Test.to_csv('./test_result.csv')

In [None]:
pd.DataFrame(training_acc).to_csv('./training_acc.csv')

In [None]:
model_100 = torch.load('./test_classifier.pth')
model.load_state_dict(model_100.state_dict())

In [None]:
y_true = []
y_pred = []

In [None]:
nb_classes = 5
confusion_matrix = torch.zeros(nb_classes, nb_classes)
model.eval()
size = len(test_dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad():
    for batch in tqdm_notebook(test_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        _, preds = torch.max(pred, 1)
        
        for t, p in zip(y.view(-1), preds.view(-1)):
            confusion_matrix[t.long(), p.long()] += 1
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        y_pred.append(pred.cpu().numpy())
        y_true.append(y.cpu().numpy())
test_loss /= size
correct /= size
print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
print(confusion_matrix)

# 3. DenseNet

In [None]:
model = torchvision.models.densenet121(True)

In [None]:
model

In [None]:
model.classifier = nn.Linear(1024, 5)
model.to(device)

In [None]:
df = pd.read_csv('../input/aptos-csv/train.csv')

In [None]:
image_dir = '../input/cropped-clahe-dr/Cropped_CLAHE_train_images/'
train_df, val_df = train_test_split(df, test_size = 0.20, shuffle=True, random_state= 44, stratify = df['diagnosis'])

In [None]:
transform = transforms.Compose([
            transforms.Resize(size=(224, 224), interpolation=Image.NEAREST),
            transforms.ToTensor()
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, image_dir, transform= None):
        super(CustomDataset, self).__init__()
        self.image_ids = list(df['id_code'])
        self.labels = list(df['diagnosis'])
        self.image_dir = image_dir
        self.transform = transform
        

    def __getitem__(self, idx):
        file_name = self.image_ids[idx]
        label = self.labels[idx]
        image = Image.open(self.image_dir+file_name+'.png').convert('RGB')
        
        if self.transform:
            #image2 = self.transform(image2)
            image = self.transform(image)
        label = torch.tensor(label)
        return image, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_dataset = CustomDataset(train_df, image_dir, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=False)

test_dataset = CustomDataset(val_df, image_dir, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
Test = 0.0

In [None]:
weights = torch.FloatTensor([1/1805, 1/999, 1/370, 1/295, 1/193]).to(device)
learning_rate = 0.001
epochs = 10
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), learning_rate)

In [None]:
training_acc = []
Test_Result = []
for n in range(30):
    total = 0
    train_loss = 0.0
    model.train()
    for batch in tqdm_notebook(train_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        total += (pred.argmax(1) == y).type(torch.float).sum().item()
    total /= len(train_dataloader.dataset)
    train_loss /= len(train_dataloader.dataset)
    training_acc.append({'Accuracy': total, 'Avg loss': train_loss})
    
    """evaluation"""
    model.eval()
    size = len(test_dataloader.dataset)
    test_loss, correct = 0, 0
    with torch.no_grad():
        for batch in tqdm_notebook(test_dataloader, leave=False):
            x, y = batch
            x = x.to(device)
            y = torch.from_numpy(np.asarray(y)).to(device)
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= size
    correct /= size
    print("epoch number: ", n)
    print(f"Train Accuracy: {(100*total):.2f}%")
    print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    Test_Result.append({'Accuracy': correct, 'Avg loss': test_loss})
    if Test<correct:
        torch.save(model, "test_classifier.pth")
        Test = correct
        print("new added")

In [None]:
Test = pd.DataFrame(Test_Result)
Test

In [None]:
Test.to_csv('./test_result.csv')

In [None]:
pd.DataFrame(training_acc).to_csv('./training_acc.csv')

In [None]:
model_100 = torch.load('./test_classifier.pth')
model.load_state_dict(model_100.state_dict())

In [None]:
y_true = []
y_pred = []

In [None]:
nb_classes = 5
confusion_matrix = torch.zeros(nb_classes, nb_classes)
model.eval()
size = len(test_dataloader.dataset)
test_loss, correct = 0, 0
with torch.no_grad():
    for batch in tqdm_notebook(test_dataloader, leave=False):
        x, y = batch
        x = x.to(device)
        y = torch.from_numpy(np.asarray(y)).to(device)
        pred = model(x)
        _, preds = torch.max(pred, 1)
        
        for t, p in zip(y.view(-1), preds.view(-1)):
            confusion_matrix[t.long(), p.long()] += 1
        test_loss += loss_fn(pred, y).item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        y_pred.append(pred.cpu().numpy())
        y_true.append(y.cpu().numpy())
test_loss /= size
correct /= size
print(f"Test Result: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
print(confusion_matrix)