In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import timm
import os
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset, DataLoader
from skimage import io
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import cv2

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#Controllo che torch sia installato con CUDA abilitato
def check_cuda():
    print(torch.version.cuda)
    cuda_is_ok = torch.cuda.is_available()
    print(f"CUDA Enabled: {cuda_is_ok}")

In [3]:
print(torch.version.cuda)

11.7


In [4]:
# Ottiene il percorso corrente dello script
current_dir = os.getcwd()

In [5]:
#Directory per i file di train
DATA_DIR = os.path.join(current_dir, "train")

#Grandezza del Batch (iperparametro)
BATCH_SIZE = 32

#Learning Rate (Iperparametro)
LR = 0.001

#Numero di epoche (Iperparametro)
EPOCHS = 30

DEVICE = 'cuda'

In [6]:
#Split del file csv da sottomettere alla rete neurale
df = pd.read_csv("spectrum_paths_dataset.csv")
df.head()
train_df, valid_df = train_test_split(df, test_size = 0.20, random_state = 42)
print(train_df.size)
print(valid_df.size)

48000
12000


In [7]:
# Carico le immagini del file CSV che ho fornito, per settarle in modo corretto e darle in input alla rete neurale
class APN_Dataset(Dataset):

  def __init__(self, df):
    self.df = df

  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx):
    row = self.df.iloc[idx]

    A_img = io.imread(DATA_DIR + row.Anchor)
    P_img = io.imread(DATA_DIR + row.Positive)
    N_img = io.imread(DATA_DIR + row.Negative)

    #Permute because the third channel has to be in first channel in torch

    #A_img = torch.from_numpy(A_img).permute(2, 0, 1) / 255.0
    #P_img = torch.from_numpy(P_img).permute(2, 0, 1) / 255.0
    #N_img = torch.from_numpy(N_img).permute(2, 0, 1) / 255.0


    A_img = np.expand_dims(A_img, 0)
    P_img = np.expand_dims(P_img, 0)
    N_img = np.expand_dims(N_img, 0)

    A_img = torch.from_numpy(A_img) / 255.0
    P_img = torch.from_numpy(P_img) / 255.0
    N_img = torch.from_numpy(N_img) / 255.0

    #A_img = torch.from_numpy(A_img.astype(np.int32)) / 65536.0
    #P_img = torch.from_numpy(P_img.astype(np.int32)) / 65536.0
    #N_img = torch.from_numpy(N_img.astype(np.int32)) / 65536.0

    return A_img, P_img, N_img

Qui di seguito mi stampo il numero che compone l'insieme di dati di train e il numero che compone l'insieme di dati di validation

In [8]:
trainset = APN_Dataset(train_df)
validset = APN_Dataset(valid_df)

print(f"Size of trainset: {len(trainset)}")
print(f"Size of validset: {len(validset)}")

Size of trainset: 16000
Size of validset: 4000


Carichiamo i dati di train e validation nella batch

In [9]:
trainloader = DataLoader(trainset, batch_size = BATCH_SIZE, shuffle = True)
validloader = DataLoader(validset, batch_size = BATCH_SIZE)

In [10]:
print(f"No. of batches in trainloader : {len(trainloader)}")
print(f"No. of batches in validloader : {len(validloader)}")

No. of batches in trainloader : 500
No. of batches in validloader : 125


Questa funzione definisce un modello di rete neurale chiamato APN_Model, che carica un'architettura di rete preaddestrata e sostituisce il classificatore finale con un nuovo classificatore personalizzato.

In [11]:
#Carico il modello di rete neurale
class APN_Model(nn.Module):

    #Viene definita la size del vettore di embedding
  def __init__(self, emb_size = 512):
    super(APN_Model, self).__init__()

    #QUI CAIRCATE IL MODELLO, IN QUESTO CASO EFFICIENTNET VERSIONE B0 (LA PIù LEGGERA DELLA FAMIGLIA)
    self.efficientnet = timm.create_model('tf_efficientnetv2_b0', pretrained = False)
    self.efficientnet.classifier = nn.Linear(in_features=self.efficientnet.classifier.in_features, out_features = emb_size)

  def forward(self, images):
    embeddings = self.efficientnet(images)
    return embeddings

In [12]:
#QUI FATE UNA PICCOLA MODIFICA ALLA RETE PER FARLE AVERE IN INPUT IMMAGINI IN SCALA DI GRIGIO DELLO SPETTRO DI FOURIER
model = APN_Model()
model.efficientnet.conv_stem = nn.Conv2d(1, 32, 3, 2, 1, bias=False);

model.to(DEVICE);

In [13]:
#FUNZIONE DI TRAINING
def train_fn(model, dataloader, optimizer, criterion):
  model.train() #ON Dropout
  total_loss = 0.0

  for A, P, N in tqdm(dataloader):
    A, P, N = A.to(DEVICE), P.to(DEVICE), N.to(DEVICE)

    A_embs = model(A)
    P_embs = model(P)
    N_embs = model(N)

    loss = criterion(A_embs, P_embs, N_embs)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

    return total_loss / len(dataloader)

In [14]:
#FUNZIONE DI EVALUATION
def eval_fn(model, dataloader, criterion):
  model.eval() #OFF Dropout
  total_loss = 0.0

  with torch.no_grad():
    for A, P, N in tqdm(dataloader):
      A, P, N = A.to(DEVICE), P.to(DEVICE), N.to(DEVICE)

      A_embs = model(A)
      P_embs = model(P)
      N_embs = model(N)

      loss = criterion(A_embs, P_embs, N_embs)

      total_loss += loss.item()

    return total_loss / len(dataloader)

In [15]:
#QUI UTILIZZATE LA TRIPLET LOSS E COME OTTIMIZZATORE ADAM. PER IL MOMENTO VI SCONSIGLIO DI CAMBIARE ADAM
criterion = nn.TripletMarginLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = LR)

In [16]:
#QUI FATE IL TRAINING VERO E PROPRIO UTILIZZANDO TUTTE LE FUNZIONI CHE AVETE VISTO IN PRECEDENZA
best_valid_loss = np.Inf

for i in range(EPOCHS):

  train_loss = train_fn(model, trainloader, optimizer, criterion)
  valid_loss = eval_fn(model, validloader, criterion)

  if valid_loss < best_valid_loss:
    torch.save(model.state_dict(), 'trained_model.pt')
    best_valid_loss = valid_loss
    print("SAVED_WEIGHTS_SUCCESS")

  print(f"EPOCHS : {i+1} train_loss : {train_loss} valid_loss : {valid_loss}")

  0%|                                                                                          | 0/500 [00:00<?, ?it/s]


FileNotFoundError: No such file: 'C:\Users\marco\Desktop\Gruppo1_Detective_Project\DP_DeepFake\traindataset\big_gan\big\biggan-spectrum\50\img004466.jpg'

In [None]:
#QUESTA E' LA FUNZIONE PER GENERARE I VETTORI DI ENCODING
def get_encoding_csv(model, anc_img_names, dirFolder):
  anc_img_names_arr = np.array(anc_img_names)
  encodings = []

  model.eval()

  with torch.no_grad():
    for i in tqdm(anc_img_names_arr):
      A = io.imread(dirFolder + i)
      #A = torch.from_numpy(A).permute(2, 0, 1) / 255.0
      A = np.expand_dims(A, 0)
      A = torch.from_numpy(A.astype(np.int32)) / 255.0
      A = A.to(DEVICE)
      A_enc = model(A.unsqueeze(0))
      encodings.append(A_enc.squeeze().cpu().detach().numpy())

    encodings = np.array(encodings)
    encodings = pd.DataFrame(encodings)
    df_enc = pd.concat([anc_img_names, encodings], axis = 1)

    return df_enc

In [None]:
#QUI RICARICO IL MODELLO UNA VOLTA TRAINATO
model.load_state_dict(torch.load('trained_model.pt'))

#QUI CREO IL DATABASE DI FEATURE VECTORS DEL TRAINING SET
df_enc = get_encoding_csv(model, df['Anchor'], DATA_DIR)

In [None]:
#QUI IL DATABASE COME CSV IN MODO TALE DA NON DOVER FARE QUESTA OPERAZIONE OGNI VOLTA
#OVVIAMENTE, SE DEVO FARE UN NUOVO TRAINING DEVO ANCHE RICREARE GLI ENCODINGS
df_enc.to_csv('database.csv', index = False)

df_enc = pd.read_csv('database.csv')
df_enc.head()

In [None]:
def euclidean_dist(img_enc, anc_enc_arr):
    #dist = np.sqrt(np.dot(img_enc-anc_enc_arr, (img_enc- anc_enc_arr).T))
    dist = np.dot(img_enc-anc_enc_arr, (img_enc- anc_enc_arr).T)
    #dist = np.sqrt(dist)
    return dist

In [None]:
df = pd.read_csv('testList.csv')
print(df['real'])
print(df.size)
df.head()

In [None]:
def getImageEmbeddings(img, model):

    img = np.expand_dims(img, 0);
    img = torch.from_numpy(img) / 255;
    model.eval();

    with torch.no_grad():
        img = img.to(DEVICE);
        img_enc = model(img.unsqueeze(0));
        img_enc = img_enc.detach().cpu().numpy();
        img_enc = np.array(img_enc);

    return img_enc;

In [None]:
def searchInDatabase(img_enc, database):
    anc_enc_arr = database.iloc[:, 1:].to_numpy();
    anc_img_names = database['Anchor'];

    distance = [];
    for i in range(anc_enc_arr.shape[0]):
        dist = euclidean_dist(img_enc, anc_enc_arr[i : i+1, :]);
        distance = np.append(distance, dist);

    closest_idx = np.argsort(distance);

    return database['Anchor'][closest_idx[0]];

In [None]:
DataTestReal = os.path.join(current_dir, "test")
y_true = []
y_pred = []
tempDf = df
tempDf.head()
tempDf.shape

In [None]:
#Testo i fake
currentTest = 'fake'
database = df_enc
# Prendo i primi 500 Fake
for index, row in tqdm(tempDf.iterrows()):
    img_name = DataTestReal + row[currentTest];

    img = io.imread(img_name);

    img_enc = getImageEmbeddings(img, model);

    closestLabel = searchInDatabase(img_enc, database);

    if "real" in closestLabel:
        y_pred.append("real");
    else:
        y_pred.append("fake");

In [None]:
print(len(y_true))
print(len(y_pred))
print(y_pred)

In [None]:
database = df_enc

In [None]:
#Testo i real
currentTest = 'real'
## Prendo i primi 500 Fake
for index, row in tqdm(tempDf.iterrows()):
    img_name = DataTestReal + row[currentTest]
    img = io.imread(img_name)

    img_enc = getImageEmbeddings(img, model)

    closestLabel = searchInDatabase(img_enc, database)
    if "real" in closestLabel:
        y_pred.append("real")
    else:
        y_pred.append("fake")

In [None]:
print(len(y_true))
print(len(y_pred))
print(y_pred)

In [None]:
#Creo i vettori di ground truth
y_true = np.array(['fake'] * 1523)
print(y_true.shape)

temp = np.array(['real'] * 1523)
print(temp.shape)

y_true = np.concatenate([y_true, temp])
print(y_true.shape)

#Calcolo la matrice di confusione
from sklearn.metrics import confusion_matrix
confusion_matrix(y_true, y_pred, labels=["real", "fake"])

In [None]:
#Estraggo dalla matrice di confusione i True Negative, False Positive, False Negative, True Positive
TN, FP, FN, TP = confusion_matrix(y_true, y_pred, labels=["real", "fake"]).ravel()

In [None]:
#Calcolo alcune metriche per vedere come si comporta
accuracy = round((TP + TN) /(TP + TN + FP + FN), 4) * 100
precision = round((TP) / (TP + FP), 4) * 100
sensitivy_recall = round((TP) / (TP + FN), 4) * 100
specificity = round((TN) / (TN + FP) * 100, 4)
F1_score = round((2* precision * sensitivy_recall) / (precision + sensitivy_recall), 2)

print({"Accuracy":accuracy,"Precision":precision,"Sensitivity_recall":sensitivy_recall, "Specificity": specificity, "F1_score":F1_score})