In [2]:
import albumentations as A
import cv2
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms as transforms

from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

# https://machinelearningmastery.com/learning-curves-for-diagnosing-machine-learning-model-performance/
#https://discuss.pytorch.org/t/how-to-plot-train-and-validation-accuracy-graph/105524
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter

In [3]:
config = {
    'batch_size': 64,
    'num_workers': 2,
    'lr': 0.001,
    'epochs': 10,
    'device': 'cuda',
    'image_size': 224
}
torch.cuda.is_available()

In [4]:
# transforms = A.Compose(
#     [A.Resize(height=config["image_size"], width=config["image_size"], p=1)],
#     torchvision.transforms.RandomHorizontalFlip(p=0.5)
#     p=1,
# )
#------------------------------------(Предпроцесс)-------------------------------------
#https://pytorch.org/vision/main/generated/torchvision.transforms.Compose.html
#https://pytorch.org/vision/main/transforms.html
transform_train = transforms.Compose([transforms.Resize(config['image_size']),
                                transforms.RandomHorizontalFlip(0.5),
                                transforms.ToTensor(),
                                transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])
transform_test = transforms.Compose([transforms.Resize(config['image_size']),
                                     transforms.ToTensor(),
                                     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

In [17]:
#------------------------------------Datasets process--------------------------------------
#https://pillow.readthedocs.io/en/stable/handbook/tutorial.html
#https://www.geeksforgeeks.org/python-pil-image-open-method/
from PIL import Image
class RealFakeDataset(Dataset):
    def __init__(self, data_path, target=None, is_test=False, augmentation=None):
        super().__init__()
        self.data_path = data_path
        self.target = target
        self.is_test = is_test
        self.augmentation = augmentation

    def __len__(self):
        return len(self.data_path)

    def __getitem__(self, item):
#         image = cv2.imread(self.data_path[item])
        image = Image.open(self.data_path[item]).convert('RGB') # switch to 3 channels
        if self.augmentation:
            image = self.augmentation(image)
#             sample = self.augmentation(image=image)
#             image = sample["image"]
        
        if self.is_test:
            return torch.tensor(np.moveaxis(image, -1, 0), dtype=torch.float)
#         return torch.tensor(np.moveaxis(image, -1, 0), dtype=torch.float), torch.tensor(
#             self.target[item], dtype=torch.float
        return image, torch.tensor(self.target[item], dtype=torch.float)
# image = Image.open(r"../input/cmc-robust-real-vs-fake/test/100.jpg").convert('RGB')
# image.show()

In [6]:
#-----------------------------------File Read---------------------------------------------
df = pd.read_csv("../input/cmc-robust-real-vs-fake/train.csv")
df.label

In [7]:
#-----------------------------------Случайно разделить обучающее и тестовое множества-------------------------------
train, val = train_test_split(df)
print(train[0:3], val[0:3])

In [8]:
train_paths = [f"../input/cmc-robust-real-vs-fake/train/{i}.jpg" for i in train["id"].values]
train_target = train["label"].values
train_target[0:3]

In [10]:
valid_paths = [f"../input/cmc-robust-real-vs-fake/train/{i}.jpg" for i in val["id"].values]
valid_target = val["label"].values

In [15]:
train_dataset = RealFakeDataset(
    train_paths,
    train_target,
    is_test=False,
    augmentation=transform_train,
)
valid_dataset = RealFakeDataset(
    valid_paths,
    valid_target,
    is_test=False,
    augmentation=transform_test
)

train_loader = DataLoader(
    train_dataset,
    batch_size=config["batch_size"],
    shuffle=True,
    num_workers=config["num_workers"],
    drop_last=True,
)
valid_loader = DataLoader(
    valid_dataset,
    batch_size=config["batch_size"],
    shuffle=False,
    num_workers=config["num_workers"],
    drop_last=False,
)

In [None]:
# materials: 1) https://arxiv.org/abs/1512.03385 2) https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py 
# 3) https://programmerall.com/article/8807556331/ 4) https://blog.csdn.net/weixin_36979214/article/details/108879684
                
class ResNetBasicBlock(nn.Module) :
    def __init__ (self, in_channels , out_channels, stride) :
        super(ResNetBasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d (out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
    def forward (self, x):
        output = self.conv1(x)
        output = F.relu(self.bn1(output))
        output = self.conv2(output)
        output = self.bn2(output)
        return F.relu(x + output)

class ResNetDownBlock(nn.Module) :
    def __init__(self, in_channels, out_channels, stride) :
        super(ResNetDownBlock, self).__init__()
        self.conv1 = nn. Conv2d(in_channels, out_channels, kernel_size=3, stride=stride[0], padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn. Conv2d(out_channels, out_channels, kernel_size=3, stride=stride[1], padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.extra = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], padding=0),
        nn.BatchNorm2d(out_channels))
    
    def forward(self, x):
        extra_x = self.extra(x)
        output = self.conv1(x)
        out = F.relu(self.bn1(output))
    
        out = self.conv2(out)
        out = self.bn2(out)
        return F.relu(extra_x + out)

class ResNet18_For_FakeAndTrue(nn.Module):
    def __init__(self):
        super(ResNet18_For_FakeAndTrue, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2 , padding=1)
        
        self.layer1 = nn.Sequential(ResNetBasicBlock(64, 64, 1),
        ResNetBasicBlock(64, 64, 1))
        self.layer2 = nn.Sequential(ResNetDownBlock (64, 128, [2, 1]),
        ResNetBasicBlock(128, 128, 1))
        self.layer3 = nn.Sequential(ResNetDownBlock(128, 256, [2, 1]),
        ResNetBasicBlock(256, 256, 1) )
        self.layer4 = nn.Sequential(ResNetDownBlock (256, 512, [2, 1]),
        ResNetBasicBlock(512, 512, 1))
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = nn.Linear(512, 1) # число класса == 1
        
    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = out.reshape(x.shape [0], -1)
        out = self.fc(out)
        return out


In [None]:
#---------------------------------------новая модель----------------------
# model = ResNet18_For_FakeAndTrue()

In [None]:
#-------------------------------------Снова-------------------------
model = torch.load("../input/final-version-competition/my_model.pt")

In [None]:
#------------- Не успешно:(--------------
# Graph_Network = SummaryWriter('./Graph_CNN')
# image_s,label_s = next(iter(train_loader)) 
# Graph_Network.add_graph(model, image_s)
# Graph_Network.close()

In [None]:
model.to(config["device"])

In [None]:
#-----------------------------------Я попробовал другой адаптивный оптимизатор: Adamax----------------------
criterion = F.binary_cross_entropy_with_logits
optimizer = torch.optim.Adamax(model.parameters(), lr=config["lr"])
#optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"], momentum=0.9, weight_decay=5e-4)
#optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])

In [None]:
#----------------------------------------Функция обучения--------------------------------
def train_fn(data_loader, model, optimizer, criterion, device):
    sum_loss = 0
    model.train()

    for bi, batch in tqdm(enumerate(data_loader), total=len(data_loader)):
        X, targets = batch
        X = X.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        outputs = model(X)
        outputs = outputs.squeeze(1)

        loss = criterion(outputs, targets)
        loss.backward()
        sum_loss += loss.detach().item()

        optimizer.step()

    return sum_loss / len(data_loader)

#--------------------------------------Функция оценивания--------------------------
def eval_fn(data_loader, model, criterion, device):
    model.eval()
    sum_loss = 0
    fin_targets = []
    fin_outputs = []
    with torch.no_grad():
        for bi, batch in tqdm(enumerate(data_loader), total=len(data_loader)):
            X, targets = batch
            X = X.to(device)
            targets = targets.to(device)

            outputs = model(X)
            outputs = outputs.squeeze(1)

            loss = criterion(outputs, targets)
            sum_loss += loss.detach().item()
            
            fin_targets.extend(targets.tolist())
            fin_outputs.extend(outputs.tolist())

    roc = roc_auc_score(fin_targets, fin_outputs)
    return sum_loss / len(data_loader), roc

def predict_fn(data_loader, model, device):
    model.eval()
    fin_outputs = []
    with torch.no_grad():
        for bi, batch in tqdm(enumerate(data_loader), total=len(data_loader)):
            X = batch
            X = X.to(device)

            outputs = model(X)
            outputs = outputs.squeeze(1)

            fin_outputs.extend(outputs.tolist())

    return fin_outputs

In [None]:
counter_loss = 0
best_loss = 0.0007
val_loss_set = []
train_loss_set =[]
for _ in range(config["epochs"]):
    train_loss = train_fn(train_loader, model, optimizer, criterion, config["device"])
    val_loss, metric = eval_fn(valid_loader, model, criterion, config["device"])
    val_loss_set.append(val_loss)
    train_loss_set.append(train_loss)
    
    print(
        f"""
        Train loss = {train_loss},
        Validation loss = {val_loss},
        ROC AUC = {metric}
        """
    )
    if val_loss < best_loss:
        print("Model saved!")
        best_loss = val_loss
        torch.save(model,"./my_model.pt")
    else:
        counter_loss += 1
        if counter_loss == int(config["epochs"] * 0.4):
            print("The point is lost in the valley")
            break
        else:
            print("The point is trying to find way to bottom of the hill")
plt.title("Model Loss")
plt.plot(train_loss_set,label="train_loss")
plt.plot(val_loss_set,label="val_loss")
plt.xlabel("epoch")
plt.ylabel("Loss")
plt.legend(['train','valid'], loc='upper left')
plt.show()
plt.savefig('./loss_graph.png')

In [None]:
class RealFakeDataset(Dataset):
    def __init__(self, data_path, target=None, is_test=False, augmentation=None):
        super().__init__()
        self.data_path = data_path
        self.target = target
        self.is_test = is_test
        self.augmentation = augmentation

    def __len__(self):
        return len(self.data_path)

    def __getitem__(self, item):
#         image = cv2.imread(self.data_path[item])
        image = Image.open(self.data_path[item]).convert('RGB')
        if self.augmentation:
            image = self.augmentation(image)
#             sample = self.augmentation(image=image)
#             image = sample["image"]
        
        if self.is_test:
#             return torch.tensor(np.moveaxis(image, -1, 0), dtype=torch.float)
            return image
#         return torch.tensor(np.moveaxis(image, -1, 0), dtype=torch.float), torch.tensor(
#             self.target[item], dtype=torch.float
        return image, torch.tensor(self.target[item], dtype=torch.float)

In [None]:

model = torch.load("./my_model.pt")
model.to(config["device"])
model.eval()

submission = pd.read_csv("../input/cmc-robust-real-vs-fake/submission.csv")
test_paths = [f"../input/cmc-robust-real-vs-fake/test/{i}.jpg" for i in submission["id"].values]


test_dataset = RealFakeDataset(
    test_paths,
    is_test=True,
    augmentation=transform_test,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=config["batch_size"],
    shuffle=False,
    num_workers=config["num_workers"],
    drop_last=False,
)
print("Length of the test_loader:", len(test_loader))

In [None]:
result = predict_fn(test_loader, model, config["device"])
submission["label"] = result
submission.to_csv("./submission_new.csv", index=None)
submission.head()