In [1]:
import numpy as np
import pandas as pd
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import torch.optim as optim
from torch.utils.data import ConcatDataset, DataLoader, Subset, Dataset
from torchvision.datasets import DatasetFolder, VisionDataset
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import random
from sklearn.model_selection import train_test_split
from sklearn.metrics import log_loss
import re 

In [2]:
train_tfm = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomAffine(degrees=(0,135), translate=(0.1, 0.3), scale=(0.8, 0.95)),
    transforms.RandomInvert(p=0.5),
    transforms.RandomAdjustSharpness(sharpness_factor=2,p=0.5),
    transforms.ColorJitter(brightness=0.4, contrast=2, saturation=1.5, hue=0.5),
    transforms.functional.equalize,
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
])
test_tfm = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5,0.5, 0.5],[0.5,0.5,0.5])
])


In [3]:
class cdDataset(Dataset):
    def __init__(self, path, tfm=test_tfm, files=None, mode=None):
        super(cdDataset, self).__init__()
        self.path = path
        if mode == 'test':
            def sort_(file_path):
                return int(re.search(r'\d+', os.path.basename(file_path)).group())
            self.files = sorted([os.path.join(path, x) for x in os.listdir(path) if x.endswith(".jpg")], key=sort_)
        else:
            self.files = sorted([os.path.join(path, x) for x in os.listdir(path) if x.endswith(".jpg")])
        self.transform = tfm
        self.mode = mode
        if mode != 'test':
            train_files, valid_files = train_test_split(self.files, test_size=0.2)
            if mode == 'train':
                self.files = train_files
            elif mode == 'val':
                self.files = valid_files
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        fname = self.files[idx]
        im = Image.open(fname)
        im = self.transform(im)
        if self.mode != 'test':
            label_str = fname.split('\\')[-1].split(".")[0]
            label = 0 if label_str == 'cat' else 1
        else:
            label = -1
        return im, label

In [4]:
def Conv1(in_planes, places, stride=2):
    return nn.Sequential(
        nn.Conv2d(in_channels=in_planes, out_channels=places, kernel_size=7, stride=stride, padding=3, bias=False),
        nn.BatchNorm2d(places),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
    )
class Bottleneck(nn.Module):
    def __init__(self, in_places, places, stride=1, downsampling=False, expansion=4):
        super(Bottleneck, self).__init__()
        self.expansion = expansion
        self.downsampling = downsampling

        self.bottleneck = nn.Sequential(
            nn.Conv2d(in_channels=in_places, out_channels=places, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(places),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=places, out_channels=places, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(places),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=places, out_channels=places * self.expansion, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(places * self.expansion),
        )

        if self.downsampling:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels=in_places, out_channels=places * self.expansion, kernel_size=1, stride=stride,
                          bias=False),
                nn.BatchNorm2d(places * self.expansion)
            )
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        residual = x
        out = self.bottleneck(x)

        if self.downsampling:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)
        return out
class ResNet(nn.Module):
    def __init__(self, blocks, num_classes=1, expansion=4):
        super(ResNet, self).__init__()
        self.expansion = expansion

        self.conv1 = Conv1(in_planes=3, places=64)

        self.layer1 = self.make_layer(in_places=64, places=64, block=blocks[0], stride=1)
        self.layer2 = self.make_layer(in_places=256, places=128, block=blocks[1], stride=2)
        self.layer3 = self.make_layer(in_places=512, places=256, block=blocks[2], stride=2)
        self.layer4 = self.make_layer(in_places=1024, places=512, block=blocks[3], stride=2)

        self.MaxPool2d = nn.MaxPool2d(7, stride=1)
        self.fc = nn.Linear(2048, num_classes)
        self.sigmoid = nn.Sigmoid()
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def make_layer(self, in_places, places, block, stride):
        layers = []
        layers.append(Bottleneck(in_places, places, stride, downsampling=True))
        for i in range(1, block):
            layers.append(Bottleneck(places * self.expansion, places))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.MaxPool2d(x)
        x = x.view(x.size(0),-1)
        x = self.fc(x.view(x.size(0), -1)).squeeze(1)
        x = self.sigmoid(x)
        return x
def ResNet50():
    return ResNet([3, 4, 6, 3])

In [5]:
batch_size = 64
_dataset_dir = r"D:\sast" 
device = "cuda" 
n_epochs = 40
patience = 300
train_set = cdDataset(os.path.join(_dataset_dir,"train"), tfm=train_tfm,mode='train')
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True) 
valid_set = cdDataset(os.path.join(_dataset_dir,"test"),tfm=test_tfm,mode='val') 
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

In [6]:
model = ResNet50().to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4, weight_decay=1e-5, betas=(0.9, 0.999))

best_loss = 0
best_acc =0

for epoch in range(n_epochs):
    model.train()
    train_loss = []
    correct_train = 0
    total_train = 0
    for batch in tqdm(train_loader):
        imgs, labels = batch

        logits = model(imgs.to(device))
        label=labels.to(device).float()
        loss = criterion(logits, label)
        
        optimizer.zero_grad()

        loss.backward()

        grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=8)

        optimizer.step()

        train_loss.append(loss.item())
        
        correct_train +=  ((logits - labels.to(device)).abs() < 0.2).cpu().sum().item()

        accuracy_train = correct_train
        total_train += labels.size(0)


    train_loss = sum(train_loss) / len(train_loss)
    train_acc = accuracy_train / total_train
    print(f"[ Train | {epoch + 1:03d}/{n_epochs:03d} ] loss = {train_loss:.5f} accuracy = {train_acc:.3f}")

    model.eval()
    valid_loss = []
    correct_valid = 0
    total_valid = 0
    for batch in tqdm(valid_loader):
        imgs, labels = batch

        with torch.no_grad():
            logits = model(imgs.to(device))
        label=labels.to(device).float()
        loss = criterion(logits, label)
        

        valid_loss.append(loss.item())
        correct_valid +=  ((logits - labels.to(device)).abs() < 0.2).cpu().sum().item()
        
        total_valid += labels.size(0)
        accuracy_val = correct_valid
    valid_loss = sum(valid_loss) / len(valid_loss)
    valid_acc =accuracy_val / total_valid


    if valid_acc > best_acc:
        with open(f".sample_log.txt", "a"):
            print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> best")
    else:
        with open(f"./sample_log.txt", "a"):
            print(f"[ Valid | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

    if valid_acc > best_acc:
        print(f"Best model found at epoch {epoch}, saving model")
        checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        }
        torch.save(checkpoint, 'sample_best.ckpt')
        torch.save(checkpoint, f"sample_best.ckpt")  
        best_acc = valid_acc


  0%|          | 0/245 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
test_set = cdDataset(os.path.join(_dataset_dir,"test"), tfm=test_tfm,mode='test')
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
model_best = ResNet50().to(device)
checkpoint = torch.load(f"sample_best.ckpt")
model_best.load_state_dict(checkpoint['model_state_dict'])
model_best.eval()
prediction = []
with torch.no_grad():
    for data,_ in test_loader:
        test_pred = model_best(data.to(device))
        prediction += test_pred.cpu().squeeze().tolist()

df = pd.DataFrame()
df["ID"] = [i for i in range(1,len(test_set)+1)]
df['TARGET'] = pd.Series(prediction).round(2)
df.to_csv("submission.csv",index = False)