In [17]:
import glob
import shutil
from sched import scheduler

import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import datasets, transforms
from sklearn.model_selection import train_test_split
import numpy as np
from copy import deepcopy
import time
import random
import os

from torchvision.transforms.v2 import RandomVerticalFlip, RandomRotation, ToTensor, Normalize



In [18]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

seed = 42
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)

torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.enabled = False

if device=='cuda':
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

cuda


In [19]:
train_dir = 'train'
test_dir = 'test'

all_train_files = glob.glob(os.path.join(train_dir,'*.jpg'))
test_list = glob.glob(os.path.join(test_dir, '*.jpg'))
train_labels = [path.split('/')[-1].split('.')[0].split('\\')[-1] for path in all_train_files] #cat or dog
train_list, val_list = train_test_split(all_train_files, test_size=0.1, stratify=train_labels, random_state=seed)
len(train_list)

22500

In [20]:
from torchvision import transforms

input_size = 224
transforms_for_train = transforms.Compose([ #augmentation
    transforms.RandomResizedCrop(input_size, scale=(0.5,1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
])
transforms_for_val_test = transforms.Compose([
    transforms.Resize(input_size),
    transforms.CenterCrop(input_size),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])



In [21]:
class CustomDataset(Dataset):
    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        if img_path.split('/')[-1][-3:]=='jpg':
            img=Image.open(img_path)
            if self.transform is not None:
                img_transform = self.transform(img)
                label = img_path.split('/')[-1].split('.')[0].split('\\')[-1]
                if label =='dog':
                    label=1
                elif label=='cat':
                    label=0
        return img_transform, label

dataset_train = CustomDataset(train_list, transform=transforms_for_train)
dataset_valid = CustomDataset(val_list, transform=transforms_for_train)
dataset_test = CustomDataset(test_list, transform=transforms_for_train)

from torch.utils.data import DataLoader

train_batches = DataLoader(dataset=dataset_train, batch_size=8, shuffle=True)
val_batches = DataLoader(dataset=dataset_valid, batch_size=8, shuffle=True)
test_batches = DataLoader(dataset=dataset_test, batch_size=8, shuffle=True)



In [22]:
import timm

model = timm.create_model('vit_base_patch32_224_in21k', pretrained=True)
model.head = nn.Sequential(
    nn.Linear(768, 21843, bias=True),
    nn.LeakyReLU(),
    nn.BatchNorm1d(21843),
    nn.Linear(21843, 512, bias=True),
    nn.LeakyReLU(),
    nn.BatchNorm1d(512),
    nn.Linear(512, 1, bias=True),
    nn.Sigmoid()
)

model.to(device)
loss_func = nn.BCELoss()
optimizer=torch.optim.Adam(model.parameters(), lr=1e-5)
from transformers import get_linear_schedule_with_warmup
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=10
)

  model = create_fn(


In [23]:
def train_model(model, criterion, optimizer, early_stop, epochs, train_loader, valid_loader):
    train_losses, train_accuracies, valid_losses, valid_accuracies, lowest_loss, lowest_epoch =list(), list(), list(), list(), np.inf, 0
    progress_count = 0

    for epoch in range(epochs):
        train_loss, train_accuracy, train_corrects, valid_loss, valid_accuracy, valid_corrects =  0, 0, 0, 0, 0, 0
        train_correct, valid_correct=0,0
        start = time.time()
        model.train()
        for train_x, train_y in train_loader:
            train_x = train_x.to(device)
            train_y = train_y.to(device).float()
            train_y = train_y.view(train_y.size(0),-1)
            pred= model(train_x)
            loss = criterion(pred,train_y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss+=loss.item()

            y_pred = np.round(pred.detach().cpu())
            train_correct+=y_pred.eq(train_y.detach().cpu()).sum().item()

        train_loss = train_loss/len(train_loader)
        train_losses.append(train_loss)
        train_accuracy = train_correct/len(train_loader.dataset)
        train_accuracies.append(train_accuracy)

        model.eval()
        with torch.no_grad():
            for valid_x, valid_y in valid_loader:
                valid_x = valid_x.to(device)
                valid_y = valid_y.to(device).float()
                valid_y=valid_y.view(valid_y.size(0),-1)
                pred=model(valid_x)
                loss=criterion(pred,valid_y)
                valid_loss += loss.item()

                y_pred = np.round(pred.detach().cpu())
                valid_correct+=y_pred.eq(valid_y.detach().cpu()).sum().item()

        valid_loss = valid_loss /len(valid_loader)
        valid_losses.append(valid_loss)
        valid_accuracy = valid_correct/len(valid_loader.dataset)
        valid_accuracies.append(valid_accuracy)

        elapsed_time = time.time()-start

        if valid_losses[-1] < lowest_loss:
            lowest_epoch = epoch
            lowest_loss = valid_losses[-1]
            best_model = deepcopy(model.state_dict())

        else:
            if(early_stop>0) and lowest_epoch+early_stop < epoch:
                print('early stopped', epoch, 'epochs')
                break

        scheduler.step()
    model.load_state_dict(best_model)
    return model, lowest_loss, train_losses,valid_losses,train_accuracies, valid_accuracies

In [None]:
model, lowest_loss, train_losses, valid_losses, train_accuracies, valid_accuracies = train_model(model, loss_func, optimizer, 0, 10, train_batches, val_batches)

In [None]:
model.load_state_dict(torch.load('model_vit_base_patch32_224_in21k_linear_schedule_with_warmup_adam_1e5.pth'))
test_list = glob.glob(os.path.join(test_dir, '*.jpg'))
dataset_test = CustomDataset(test_list, transform=transforms_for_val_test)
test_batches = DataLoader(dataset=dataset_test, batch_size=8, shuffle=False)

def predict(model, data_loader):
    ids = list()
    with torch.no_grad():
        model.eval()
        ret = None
        for img, fileid in data_loader:
            img = img.to(device)
            pred = model(img)
            ids += list(fileid)
            if ret is None:
                ret = pred.cpu().numpy()
            else:
                ret = np.vstack([ret, pred.cpu().numpy()])
    return ret, ids

pred, ids = predict(model, test_batches)

submission = pd.DataFrame({'id': ids, 'label': np.clip(pred, 0.006, 1-0.006).squeeze()})
submission.sort_values(by='id', inplace=True)
submission.reset_index(drop=True, inplace=True)
submission.to_csv('submission.csv', index=False)