In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import timm
import os
import torch.nn.functional as F
from torch import nn
from torch.utils.data import Dataset, DataLoader
from skimage import io
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import cv2

In [2]:
#Controllo che torch sia installato con CUDA abilitato
def check_cuda():
    print(torch.version.cuda)
    cuda_is_ok = torch.cuda.is_available()
    print(f"CUDA Enabled: {cuda_is_ok}")

In [3]:
print(torch.version.cuda)

11.7


In [4]:
model_weights = True

In [5]:
#Directory per i file di train
DATA_DIR = ''

#Grandezza del Batch (iperparametro)
BATCH_SIZE = 32

#Learning Rate (Iperparametro)
LR = 0.001

#Numero di epoche (Iperparametro)
EPOCHS = 80

DEVICE = 'cuda'

In [6]:
# Carica il file csv
#df = pd.read_csv("spectrum_train_dataset.csv")

df_real = pd.read_csv("real_image_dataset.csv")

df_fake = pd.read_csv("fake_image_dataset.csv")

train_df_real, valid_df_real = train_test_split(df_real, test_size = 0.20, random_state = 42)
train_df_fake, valid_df_fake = train_test_split(df_fake, test_size = 0.20, random_state = 42)

test_df_real = train_df_real.sample(500)
test_df_fake = train_df_fake.sample(500)
train_df_real = train_df_real.drop(test_df_real.index)
train_df_fake = train_df_fake.drop(test_df_fake.index)

train_df = pd.concat([train_df_real, train_df_fake])
valid_df = pd.concat([valid_df_real, valid_df_fake])
test_df = pd.concat([test_df_real, test_df_fake])

train_df= train_df.sample(frac= 1)
valid_df= valid_df.sample(frac= 1)

# Unisci tutti i dataframe in un unico dataframe finale per il train set
final_df = pd.concat([train_df, valid_df])

# Salva il dataframe finale di trian in un file CSV
final_df.to_csv("spectrum_train_dataset.csv", index=False)
df = pd.read_csv("spectrum_train_dataset.csv")

# Creazione di liste concatenate delle immagini reali e false
real_images = pd.concat([test_df_real['Anchor'], test_df_real['Positive']]).sample(500, random_state=42).tolist()
fake_images = pd.concat([test_df_fake['Anchor'], test_df_fake['Positive']]).sample(500, random_state=42).tolist()

# Assicurati che la lunghezza delle liste sia esattamente 500
real_images = real_images[:500]
fake_images = fake_images[:500]

# Crea il dataframe del test set con due colonne: 'real' e 'fake'
test_df = pd.DataFrame({
    'real': real_images,
    'fake': fake_images
})

# Salva il dataframe di test in un file CSV
test_df.to_csv("test_set.csv", index=False)

print('Size of train set:', len(train_df))
print('Size of valid set:', len(valid_df))
print('Size of test set:', len(test_df))

Size of train set: 15000
Size of valid set: 4000
Size of test set: 500


In [7]:
# Carico le immagini del file CSV che ho fornito, per settarle in modo corretto e darle in input alla rete neurale
class APN_Dataset(Dataset):

  def __init__(self, df):
    self.df = df

  def __len__(self):
    return len(self.df)

  def __getitem__(self, idx):
    row = self.df.iloc[idx]

    A_img = io.imread(DATA_DIR + row.Anchor, as_gray= True)
    P_img = io.imread(DATA_DIR + row.Positive, as_gray= True)
    N_img = io.imread(DATA_DIR + row.Negative, as_gray= True)

    
    #Permute because the third channel has to be in first channel in torch
    A_img = np.expand_dims(A_img, 0)
    P_img = np.expand_dims(P_img, 0)
    N_img = np.expand_dims(N_img, 0)

    A_img = torch.from_numpy(A_img)/ 255.0
    P_img = torch.from_numpy(P_img)/ 255.0
    N_img = torch.from_numpy(N_img)/ 255.0
      
    #A_img = torch.from_numpy(A_img.astype(np.int32)) / 65536.0
    #P_img = torch.from_numpy(P_img.astype(np.int32)) / 65536.0
    #N_img = torch.from_numpy(N_img.astype(np.int32)) / 65536.0

    return A_img, P_img, N_img

In [8]:
data = APN_Dataset(train_df)
item1, item2, item3 = data.__getitem__(0)
print(item1.shape)

torch.Size([1, 200, 200])


Qui di seguito mi stampo il numero che compone l'insieme di dati di train, test e validation

In [9]:
trainset = APN_Dataset(train_df)
validset = APN_Dataset(valid_df)
testset = APN_Dataset(test_df)

print(f"Size of trainset: {len(trainset)}")
print(f"Size of validset: {len(validset)}")
print(f"Size of validset: {len(testset)}")

Size of trainset: 15000
Size of validset: 4000
Size of validset: 500


Carichiamo i dati di train e validation nella batch

In [10]:
trainloader = DataLoader(trainset, batch_size = BATCH_SIZE, shuffle = True)
validloader = DataLoader(validset, batch_size = BATCH_SIZE)
testloader = DataLoader(testset, batch_size = 1)

In [11]:
print(f"No. of batches in trainloader : {len(trainloader)}")
print(f"No. of batches in validloader : {len(validloader)}")
print(f"No. of batches in testloader : {len(testloader)}")

No. of batches in trainloader : 469
No. of batches in validloader : 125
No. of batches in testloader : 500


Questa funzione definisce un modello di rete neurale chiamato APN_Model, che carica un'architettura di rete preaddestrata e sostituisce il classificatore finale con un nuovo classificatore personalizzato.

In [12]:
#Carico il modello di rete neurale
class APN_Model(nn.Module):

    #Viene definita la size del vettore di embedding
  def __init__(self, emb_size = 512):
    super(APN_Model, self).__init__()

    #QUI CAIRCATE IL MODELLO, IN QUESTO CASO EFFICIENTNET VERSIONE B0 (LA PIù LEGGERA DELLA FAMIGLIA)
    self.efficientnet = timm.create_model('tf_efficientnetv2_b0', pretrained = False)
    self.efficientnet.classifier = nn.Linear(in_features=self.efficientnet.classifier.in_features, out_features = emb_size)

  def forward(self, images):
    embeddings = self.efficientnet(images)
    return embeddings

In [13]:
#QUI FATE UNA PICCOLA MODIFICA ALLA RETE PER FARLE AVERE IN INPUT IMMAGINI IN SCALA DI GRIGIO DELLO SPETTRO DI FOURIER
model = APN_Model()
model.efficientnet.conv_stem = nn.Conv2d(1, 32, 3, 2, 1, bias=False);

if model_weights == True :
    model.load_state_dict(torch.load('trained_model_on_20000.pt'))

model.to(DEVICE);

In [14]:
#FUNZIONE DI TEST
def test_fn(model, dataloader, criterion):
  model.train() #ON Dropout
  total_loss = 0.0

  with torch.no_grad():  
    for A, P, N in tqdm(dataloader):
        A, P, N = A.to(DEVICE), P.to(DEVICE), N.to(DEVICE)

        A_embs = model(A)
        P_embs = model(P)
        N_embs = model(N)
      
        loss = criterion(A_embs, P_embs, N_embs)
      
        total_loss += loss.item()

    return total_loss / len(dataloader)

In [15]:
#FUNZIONE DI TRAINING
def train_fn(model, dataloader, optimizer, criterion):
  model.train() #ON Dropout
  total_loss = 0.0

  for A, P, N in tqdm(dataloader):
    A, P, N = A.to(DEVICE), P.to(DEVICE), N.to(DEVICE)

    A_embs = model(A)
    P_embs = model(P)
    N_embs = model(N)

    loss = criterion(A_embs, P_embs, N_embs)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    total_loss += loss.item()

    return total_loss / len(dataloader)

In [16]:
#FUNZIONE DI EVALUATION
def eval_fn(model, dataloader, criterion):
  model.eval() #OFF Dropout
  total_loss = 0.0

  with torch.no_grad():
    for A, P, N in tqdm(dataloader):
      A, P, N = A.to(DEVICE), P.to(DEVICE), N.to(DEVICE)

      A_embs = model(A)
      P_embs = model(P)
      N_embs = model(N)

      loss = criterion(A_embs, P_embs, N_embs)

      total_loss += loss.item()

    return total_loss / len(dataloader)

In [17]:
criterion = nn.TripletMarginLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = LR)

In [18]:
#Training
if model_weights == False :
    best_valid_loss = np.Inf
    training_loss = []
    validation_loss = []
    for i in range(EPOCHS):
      train_loss = train_fn(model, trainloader, optimizer, criterion)
      valid_loss = eval_fn(model, validloader, criterion)
      training_loss.append(train_loss)
      validation_loss.append(valid_loss)

      if valid_loss < best_valid_loss:
        torch.save(model.state_dict(), 'trained_model_on_20000.pt')
        best_valid_loss = valid_loss
        print("SAVED_WEIGHTS_SUCCESS")

      print(f"EPOCHS : {i+1} train_loss : {train_loss} valid_loss : {valid_loss}")

In [19]:
#Salvataggio plottato dei dati
if(model_weights == False):    
    fig, axes = plt.subplots(2, 1, sharex= True, figsize=(10, 6))
    axes[0].plot(training_loss)
    axes[0].set_title('Training Loss')
    axes[1].plot(validation_loss)
    axes[1].set_title('Validation Loss')
    plt.savefig('training_data5_20000(biggan)-80-epochs.png')
    plt.show()

In [20]:
#Test
#best_test_loss = np.Inf
#test_loss = []

#testing_loss = test_fn(model, testloader, criterion)

#print(f"test_loss : {testing_loss}")

In [21]:
#QUESTA E' LA FUNZIONE PER GENERARE I VETTORI DI ENCODING
def get_encoding_csv(model, anc_img_names, dirFolder):
  anc_img_names_arr = np.array(anc_img_names)
  encodings = []

  model.eval()

  with torch.no_grad():
    for i in tqdm(anc_img_names_arr):
      A = io.imread(dirFolder + i)
      #A = torch.from_numpy(A).permute(2, 0, 1) / 255.0
      A = np.expand_dims(A, 0)
      A = torch.from_numpy(A.astype(np.int32)) / 255.0
      A = A.to(DEVICE)
      A_enc = model(A.unsqueeze(0))
      encodings.append(A_enc.squeeze().cpu().detach().numpy())

    encodings = np.array(encodings)
    encodings = pd.DataFrame(encodings)
    df_enc = pd.concat([anc_img_names, encodings], axis = 1)

    return df_enc

In [22]:
#QUI RICARICO IL MODELLO UNA VOLTA TRAINATO
model.load_state_dict(torch.load('trained_model_on_20000.pt'))

#QUI CREO IL DATABASE DI FEATURE VECTORS DEL TRAINING SET
df_enc = get_encoding_csv(model, df['Anchor'], DATA_DIR)

100%|████████████████████████████████████████████████████████████████████████████| 19000/19000 [03:37<00:00, 87.52it/s]


In [23]:
#QUI IL DATABASE COME CSV IN MODO TALE DA NON DOVER FARE QUESTA OPERAZIONE OGNI VOLTA
#OVVIAMENTE, SE DEVO FARE UN NUOVO TRAINING DEVO ANCHE RICREARE GLI ENCODINGS
df_enc.to_csv('database.csv', index = False)

df_enc = pd.read_csv('database.csv')
df_enc.head()

Unnamed: 0,Anchor,0,1,2,3,4,5,6,7,8,...,502,503,504,505,506,507,508,509,510,511
0,dataset\big_gan\big\biggan-spectrum\852\img008...,-0.090214,-0.264717,-0.216647,0.032853,0.378233,0.116311,-0.175489,0.121259,0.067288,...,0.088355,0.235892,-0.088446,-0.12562,0.047661,0.344168,-0.177199,0.076804,-0.046667,0.137589
1,dataset\coco\coco\coco2017\train_spectrum\img0...,-0.053666,-0.144451,0.146673,-0.08222,-0.090097,0.083666,-0.195972,-0.490259,-0.014724,...,0.020914,0.175289,-0.010938,-0.038295,-0.251522,-0.028093,0.342762,0.129735,0.114022,-0.140757
2,dataset\big_gan\big\biggan-spectrum\318\img002...,-0.032845,-0.179673,-0.127086,-0.056337,0.268887,0.084888,-0.050379,-0.07124,-0.025579,...,0.11548,0.185573,-0.077517,-0.09788,-0.041625,0.187829,-0.0217,0.076438,-0.041155,0.152363
3,dataset\big_gan\big\biggan-spectrum\218\img001...,-0.06916,-0.168132,-0.206332,-0.010379,0.451171,0.07923,-0.050864,0.157542,-0.005467,...,0.156118,0.196045,-0.110679,-0.162092,0.046835,0.317521,-0.188559,0.067422,-0.057077,0.22432
4,dataset\big_gan\big\biggan-spectrum\649\img006...,-0.139863,-0.288083,-0.188613,0.095502,0.38393,0.139068,-0.11175,0.102739,0.020576,...,0.106074,0.23703,-0.173063,-0.150995,0.03771,0.372867,-0.203903,0.093567,-0.026282,0.09134


In [24]:
def euclidean_dist(img_enc, anc_enc_arr):
    #dist = np.sqrt(np.dot(img_enc-anc_enc_arr, (img_enc- anc_enc_arr).T))
    dist = np.dot(img_enc-anc_enc_arr, (img_enc- anc_enc_arr).T)
    #dist = np.sqrt(dist)
    return dist

In [25]:
def getImageEmbeddings(img, model):

    img = np.expand_dims(img, 0);
    img = torch.from_numpy(img) / 255;
    model.eval();

    with torch.no_grad():
        img = img.to(DEVICE);
        img_enc = model(img.unsqueeze(0));
        img_enc = img_enc.detach().cpu().numpy();
        img_enc = np.array(img_enc);

    return img_enc;

In [26]:
def searchInDatabase(img_enc, database):
    anc_enc_arr = database.iloc[:, 1:].to_numpy();
    anc_img_names = database['Anchor'];

    distance = [];
    for i in range(anc_enc_arr.shape[0]):
        dist = euclidean_dist(img_enc, anc_enc_arr[i : i+1, :]);
        distance = np.append(distance, dist);

    closest_idx = np.argsort(distance);

    return database['Anchor'][closest_idx[0]];

In [27]:
DataTestReal = 'test_set.csv'
y_true = []
y_pred = []
tempDf = df
tempDf.head()
tempDf.shape

(19000, 3)

In [28]:
#Testo i fake
currentTest = 'fake'
database = df_enc
# Prendo i primi 500 Fake
for index, row in tqdm(tempDf.iterrows()):
    img_name = DataTestReal + row[currentTest];

    img = io.imread(img_name);

    img_enc = getImageEmbeddings(img, model);

    closestLabel = searchInDatabase(img_enc, database);

    if "real" in closestLabel:
        y_pred.append("real");
    else:
        y_pred.append("fake");

0it [00:00, ?it/s]


KeyError: 'fake'

In [None]:
print(len(y_true))
print(len(y_pred))
print(y_pred)

In [None]:
database = df_enc

In [None]:
#Testo i real
currentTest = 'real'
## Prendo i primi 500 Fake
for index, row in tqdm(tempDf.iterrows()):
    img_name = DataTestReal + row[currentTest]
    img = io.imread(img_name)

    img_enc = getImageEmbeddings(img, model)

    closestLabel = searchInDatabase(img_enc, database)
    if "real" in closestLabel:
        y_pred.append("real")
    else:
        y_pred.append("fake")

In [None]:
print(len(y_true))
print(len(y_pred))
print(y_pred)

In [None]:
#Creo i vettori di ground truth
y_true = np.array(['fake'] * 1523)
print(y_true.shape)

temp = np.array(['real'] * 1523)
print(temp.shape)

y_true = np.concatenate([y_true, temp])
print(y_true.shape)

#Calcolo la matrice di confusione
from sklearn.metrics import confusion_matrix
confusion_matrix(y_true, y_pred, labels=["real", "fake"])

In [None]:
#Estraggo dalla matrice di confusione i True Negative, False Positive, False Negative, True Positive
TN, FP, FN, TP = confusion_matrix(y_true, y_pred, labels=["real", "fake"]).ravel()

In [None]:
#Calcolo alcune metriche per vedere come si comporta
accuracy = round((TP + TN) /(TP + TN + FP + FN), 4) * 100
precision = round((TP) / (TP + FP), 4) * 100
sensitivy_recall = round((TP) / (TP + FN), 4) * 100
specificity = round((TN) / (TN + FP) * 100, 4)
F1_score = round((2* precision * sensitivy_recall) / (precision + sensitivy_recall), 2)

print({"Accuracy":accuracy,"Precision":precision,"Sensitivity_recall":sensitivy_recall, "Specificity": specificity, "F1_score":F1_score})