In [17]:
#import libraries
import torch
import torch.nn as nn
import shutil
from torchvision import datasets, models, transforms
from torch.utils.data import Dataset,DataLoader,random_split
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import os
from PIL import Image,ImageFile
import torch.optim as optim
import pandas as pd
import copy
from torch.optim import lr_scheduler
from torch.utils.tensorboard import SummaryWriter
import cv2
from torchvision import transforms
import albumentations as augment
from albumentations.pytorch import ToTensorV2
import numpy as np

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [18]:
seed_val = 420
torch.manual_seed(seed_val)

<torch._C.Generator at 0x76ee3eb536b0>

In [19]:
base_data_dir = '../data'
fake_dir = os.path.join(base_data_dir, 'generated')  # Fake artworks directory
real_dir = os.path.join(base_data_dir, 'real')  # Real artworks directory
writer = SummaryWriter() 

ARTWORK DATASET

In [20]:
#Class to manage artworks with respect to their authenticity

class ArtworkDataset(Dataset):
  def __init__(self,links,transform):
      self.data = links
      self.transform = transform

  def __len__(self):
    return self.data.index.shape[0]
    
  def __getitem__(self,idx):
        img = Image.open(self.data.iloc[idx,0])
        label_index = self.data.iloc[idx, 1]
        if (img.mode != 'RGB'):
            img = img.convert('RGB')

        if self.transform:
            img = np.array(img)
            augmented = self.transform(image = img)
            img = augmented['image']
        return img, label_index


In [21]:
# Create a CSV file with paths to artwork images and their labels (real or fake)
data = [] 

# Iterate over the fake artworks and add their paths and labels to the list
for dirpath, dirnames, filenames in os.walk(fake_dir):
    for filename in filenames:
        if filename.endswith(".jpg"): # only consider jpg files
            filepath = os.path.join(dirpath, filename)
            data.append((filepath, "0"))


# Iterate over the real artworks and add their paths and labels to the list
for dirpath, dirnames, filenames in os.walk(real_dir):
    for filename in filenames:
        if filename.endswith(".jpg"):
            filepath = os.path.join(dirpath, filename)
            data.append((filepath, "1"))  # Label 1 for real artworks

# Convert the list "data" to a pandas dataframe
df = pd.DataFrame(data, columns=["path", "label"])

# Save the dataframe to a CSV file
csv_output_path = os.path.join(base_data_dir, "image_labels.csv")
df.to_csv(csv_output_path, index=False)
print(f"CSV file saved at {csv_output_path}")


CSV file saved at ../data/image_labels.csv


LOAD PRETRAINED MODEL

In [None]:
%pip install timm
import timm

model = timm.create_model('convnext_base',pretrained=True, num_classes=2)

model = model.to(device)
print(model)

Note: you may need to restart the kernel to use updated packages.


In [None]:
#Setting the model weights to non-trainable
for param in model.parameters(): 
    param.requires_grad = False

In [None]:
#Make the last layer of the model trainable
for p in model.head.parameters(): #instead of fc, we use head
    p.requires_grad=True


SPLIT IN TRAINING AND VALIDATION SET

In [None]:
dataset = df
dataset['label'] = dataset['label'].astype(int)
dataset

Unnamed: 0,path,label
0,../data/generated/stylegan3-t-metfaces-1024x10...,0
1,../data/generated/stylegan3-t-metfaces-1024x10...,0
2,../data/generated/stylegan3-t-metfaces-1024x10...,0
3,../data/generated/stylegan3-t-metfaces-1024x10...,0
4,../data/generated/stylegan3-t-metfaces-1024x10...,0
...,...,...
88146,../data/real/paul-albert-besnard_robert-de-mon...,1
88147,../data/real/nikolai-ge_christ-and-the-discipl...,1
88148,../data/real/paul-bril_view-of-a-port-1607.jpg,1
88149,../data/real/felix-vallotton_the-port-of-marse...,1


In [None]:
#train, validation = train_test_split(dataset.values, stratify=dataset.values[:, 1], test_size=.3, random_state = 1) 

# Split the dataset into two parts (train + validation, and test)
train_val_data, test = train_test_split(dataset.values, test_size=0.1, random_state=seed_val)

# Split the train + validation part into training and validation sets
train, validation = train_test_split(train_val_data, test_size=0.1, random_state=seed_val)

In [None]:
train_links = pd.DataFrame(train, columns = dataset.columns)
validation_links = pd.DataFrame(validation, columns = dataset.columns)
test_links = pd.DataFrame(test, columns = dataset.columns)

BUILDING DATA LOADERS

In [None]:
data_transforms = transforms.Compose([
                                transforms.Resize(224),
                                transforms.CenterCrop(224),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# # Data augmentation for training
# train_transforms = transforms.Compose([
#     transforms.Resize(256),  # Resize to 256x256 to allow for random crops
#     transforms.RandomCrop(224),  # Randomly crop the image to 224x224
#     transforms.RandomHorizontalFlip(),  # Random horizontal flip
#     transforms.RandomRotation(10),  # Random rotation ±10 degrees
#     transforms.ToTensor(),  # Convert image to tensor
#     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize the image
# ])

# No data augmentation for validation and testing
# val_test_transforms = transforms.Compose([
#     transforms.Resize(224),  # Resize directly to 224x224
#     transforms.CenterCrop(224),  # Center crop to 224x224 (optional, but recommended)
#     transforms.ToTensor(),
#     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize the image
# ])

train_transforms = augment.Compose([
    augment.Resize(224, 224),                          # Resize to model input size (224x224)
    augment.HorizontalFlip(p=0.5),                     # Flip image horizontally with 50% chance
    augment.RandomBrightnessContrast(                  # Randomly adjust brightness and contrast
        brightness_limit=0.2, contrast_limit=0.2, p=0.5
    ),
    augment.HueSaturationValue(                        # Adjust color slightly to handle color variations
        hue_shift_limit=10, sat_shift_limit=10, val_shift_limit=10, p=0.3
    ),
    augment.GaussianBlur(blur_limit=(3, 5), p=0.3),    # Add slight blur to mimic imperfections
    augment.ISONoise(color_shift=(0.01, 0.05), p=0.2), # Add light noise to simulate digital artifacts
    augment.CoarseDropout(                             # Randomly mask parts to prevent overfitting on specific details
        max_holes=4, max_height=8, max_width=8, min_holes=1, p=0.3
    ),
    augment.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),  # Standard normalization
    ToTensorV2()                                 # Convert to PyTorch tensor
])

val_test_transforms = augment.Compose([
    augment.Resize(224, 224),
    augment.CenterCrop(224, 224),
    augment.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])


batch_size = 32

train_set = ArtworkDataset( train_links, train_transforms)

validation_set = ArtworkDataset( validation_links, val_test_transforms)

test_set = ArtworkDataset( test_links,val_test_transforms)


train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, 
                               drop_last=False,num_workers=2)

validation_loader = DataLoader(validation_set, batch_size=batch_size, shuffle=False, 
                               drop_last=False,num_workers=2)

test_loader = DataLoader(test_set,batch_size=batch_size, shuffle = False,
                              drop_last=False,num_workers=2)

TRAINING

In [None]:
class EarlyStopping():
    """
    Early stopping to stop the training when the loss does not improve after
    certain epochs.
    """
    def __init__(self, patience=5, min_delta=0.001):
        """
        :param patience: how many epochs to wait before stopping when loss is
               not improving
        :param min_delta: minimum difference between new loss and old loss for
               new loss to be considered as an improvement
        """
        self.patience = patience
        self.min_delta = min_delta
        self.wait = 0
        self.best_loss = None
        self.early_stop = False
        # Define the directory for saving the model
        self.model_dir = os.path.join(os.getcwd(), 'saved_models/convnext_model')  # Save in 'vit_model' folder in the current directory
        # Create the directory if it doesn't exist
        os.makedirs(self.model_dir, exist_ok=True)

    def __call__(self, current_loss):
        if self.best_loss == None:
            self.best_loss = current_loss
        elif (current_loss - self.best_loss) < -self.min_delta:
            self.best_loss = current_loss
            self.wait = 0
            
             # Save the model state
            model_path = os.path.join(self.model_dir, 'RealArt_vs_FakeArt_convnext_base.pt')

            # Remove the old model if it exists in the same directory
            if os.path.exists(model_path):
                os.remove(model_path)
            
            # save the new model 
            torch.save(model.state_dict(), model_path)
        else:
            self.wait = self.wait + 1
            print(f"INFO: Early stopping counter {self.wait} of {self.patience}")
            if self.wait >= self.patience:
                self.early_stop = True

In [None]:
def fine_tune(model, train_loader, validation_loader, criterion, optimizer, scheduler, early_stop , num_epochs = 100, log_name = 'run1'):
    best_model = copy.deepcopy(model)
    best_acc = 0.0
    best_epoch=0
    stop = False
    
    for epoch in range(1, num_epochs + 1):
        if stop:
            break
        print(f'Epoch {epoch}/{num_epochs}')
        print('-'*120)

        data_loader = None
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                data_loader = train_loader
            else:
                model.eval()   # Set model to evaluate mode
                data_loader = validation_loader

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in tqdm(data_loader):
                
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    outputs = nn.Softmax(dim = 1)(outputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / (len(data_loader) * data_loader.batch_size)
            epoch_acc = running_corrects.double() / (len(data_loader) * data_loader.batch_size)
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
                
            if phase == 'train':
                writer.add_scalar(f'Loss/Train/{log_name}', epoch_loss, epoch)
                writer.add_scalar(f'Accuracy/Train/{log_name}', epoch_acc, epoch)
                scheduler.step()
            else:
                writer.add_scalar(f'Loss/Validation/{log_name}', epoch_loss, epoch)
                writer.add_scalar(f'Accuracy/Validation/{log_name}', epoch_acc, epoch)
            
            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_epoch = epoch
                best_model = copy.deepcopy(model)
               
                
            if phase == 'val':
                early_stop(epoch_loss)
                print('-'*120, end = '\n\n')
                stop=early_stop.early_stop
                        
    print(f'Best val Acc: {best_acc:4f}')
    print(f'Best epoch: {best_epoch:03d}')
    writer.flush()
    # load the best model 
    return best_model         

In [None]:
model_path = os.path.join('saved_models/convnext_model', 'RealArt_vs_FakeArt_convnext_base.pt')

if not 'RealArt_vs_FakeArt_convnext_base.pt' in os.listdir('saved_models/convnext_model'):
   criterion = nn.CrossEntropyLoss()
   optimizer = optim.Adam(model.parameters(), lr=1e-3)
   scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
   early_stop = EarlyStopping(patience = 3, min_delta = 0.001)
   ImageFile.LOAD_TRUNCATED_IMAGES = True


   best_model_head = fine_tune(model, train_loader, validation_loader, criterion, optimizer, scheduler, 
                               early_stop, num_epochs = 30, log_name = 'Albumentation')

   # Save the new best model head in the vit_model folder
   torch.save(best_model_head, model_path)
else: 
     model = timm.create_model('convnext_base',pretrained=True, num_classes=2) 
     model = torch.load(model_path)  # Load the full model
     print("Model loaded successfully from:", model_path)

Epoch 1/30
------------------------------------------------------------------------------------------------------------------------


  0%|          | 0/2232 [00:00<?, ?it/s]


NameError: Caught NameError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/home/oem/miniconda3/envs/python3.10/lib/python3.10/site-packages/torch/utils/data/_utils/worker.py", line 309, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
  File "/home/oem/miniconda3/envs/python3.10/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 52, in fetch
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/home/oem/miniconda3/envs/python3.10/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 52, in <listcomp>
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/tmp/ipykernel_48110/1782497746.py", line 18, in __getitem__
    img = np.array(img)
NameError: name 'np' is not defined


TESTING

In [None]:
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, roc_auc_score
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

def add_labels_to_image(image, true_label, pred_label):
    """Add true and predicted labels to the image."""
    # Convert the image to uint8 if it's in float format
    if image.dtype != np.uint8:
        image = (image * 255).astype(np.uint8)

    # Convert from RGB to BGR for OpenCV
    bgr_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

    # Prepare text without brackets
    text = f'True: {int(true_label)}, Pred: {int(pred_label)}'
    cv2.putText(bgr_image, text, (5, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    # Convert back to RGB before returning
    return cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)

# funzione per il testing del modello
def test_model(model, test_loader):
    model.eval() # imposto il modello in modalità di valutazione
    test_loss = 0
    correct = 0
    pred_list = []
    true_list = []
    misclassified_images = []
    misclassified_preds = []
    misclassified_true = []


    # inizializza la barra di avanzamento
    pbar = tqdm(total=len(test_loader))
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.cross_entropy(output, target, reduction='sum').item() # sommo il loss di ogni batch
            
            pred = output.argmax(dim=1, keepdim=True) # ottengo la predizione del modello
            pred_list.extend(pred.cpu().numpy()) # aggiungo la predizione alla lista
            true_list.extend(target.cpu().numpy()) # aggiungo il target alla lista
            
            correct += pred.eq(target.view_as(pred)).sum().item() # aggiorno il contatore di classificazioni corrette

            misclassified = ~pred.eq(target.view_as(pred)).squeeze()
            if misclassified.any():
                misclassified_images.extend(data[misclassified].cpu())
                misclassified_preds.extend(pred[misclassified].cpu().numpy())
                misclassified_true.extend(target[misclassified].cpu().numpy())

            # update progress bar
            pbar.update(1)
            
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    recall = recall_score(true_list, pred_list, average='macro') # calcolo la recall
    precision = precision_score(true_list, pred_list, average='macro') # calcolo la precision
    f1 = f1_score(true_list, pred_list, average='macro') # calcolo la F1 score
    auc = roc_auc_score(true_list, pred_list) # calcolo l'AUC
    
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%), Recall: {:.2f}%, Precision: {:.2f}%, F1: {:.2f}%, AUC: {:.2f}%\n'.format(
        test_loss, correct, len(test_loader.dataset), accuracy, recall*100, precision*100, f1*100, auc*100))
    
    #Logging the misclassified images
    for i, img in enumerate(misclassified_images):
        img_np = img.permute(1, 2, 0).numpy()  # Convert tensor to numpy (C, H, W) to (H, W, C)
        
        # Ensure the image is in the right format for TensorBoard
        if img_np.shape[-1] == 1:  # Check if the image is grayscale
            img_np = img_np.squeeze(axis=2)  # Remove the channel dimension if it is 1
        elif img_np.shape[0] == 1:  # If the shape is (1, H, W)
            img_np = img_np.squeeze(axis=0)  # Remove the batch dimension

        img_np = np.clip(img_np, 0, 1)  # Ensure the values are between 0 and 1
        img_np = (img_np * 255).astype(np.uint8)  # Scale to 0-255 and convert to uint8

        labeled_image = add_labels_to_image(img_np, misclassified_true[i], misclassified_preds[i])

        # Log the image to TensorBoard
        writer.add_image(f'Misclassified_True_{misclassified_true[i]}_Pred_{misclassified_preds[i]}_Index_{i}', labeled_image, dataformats='HWC')

    writer.flush()
    return accuracy, recall, precision, f1, auc

use this for logs:
tensorboard --logdir=./runs

In [None]:
# accuracy,recall,precision,f1,auc = test_model(model,test_loader)
writer.close()