In [None]:
!pip install -U datasets huggingface-hub
!pip install -U accelerate

### Restart the kernel before proceeding further

In [None]:
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC, LinearSVC
from sklearn.metrics import accuracy_score, average_precision_score, precision_score, recall_score
import re
import sklearn

import numpy as np
import pandas as pd
from datasets import Dataset, load_dataset

import torch
import torch.nn as nn
from torch.nn import functional as F
from tqdm import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"

* ## Training is done using only ProGAN images.
* ## Testing is done on GAN and Diffusion-generated images.
* ## Below two sections show the training process.
* ## SVM and Neural Network are trained respectively.

# **1) Training using SVM**

### Loading Train Dataset with CLIP Embeddings

In [None]:
train_dataset = load_dataset("rajendrabaskota/progan-train-dataset-all", split="train")
train_dataset

In [None]:
X = np.array(train_dataset['img_embed'])
y = np.array(train_dataset['label'])

In [None]:
X.shape, y.shape

### Loading Test Datasets with CLIP Embeddings

In [None]:
dataset_diffusion = load_dataset("rajendrabaskota/diffusion-test-dataset", split="test")
dataset_gan = load_dataset("rajendrabaskota/gan-test-dataset", split="test")

In [None]:
X_gan = np.array(dataset_gan['img_embed'])
X_diffusion = np.array(dataset_diffusion['img_embed'])

### Training a Support Vector Classifier

In [None]:
clf = LinearSVC(C=0.05, verbose=1)
clf.fit(X, y)

### Saving the model

In [None]:
import pickle

with open('image_detection.pkl', 'wb') as file:
    pickle.dump(clf, file)

### Prediction

In [None]:
y_pred_gan = clf.predict(X_gan)
y_pred_diffusion = clf.predict(X_diffusion)

In [None]:
df_gan = Dataset.to_pandas(dataset_gan)
df_diffusion = Dataset.to_pandas(dataset_diffusion)

In [None]:
def identify_model(x):
    match = re.match(r'^([^/]+)', x)

    if match:
        result = match.group(1)
    else:
        result = ''
        
    return result

df_gan['model'] = df_gan['file_path'].apply(lambda x: identify_model(x))
df_diffusion['model'] = df_diffusion['file_path'].apply(lambda x: identify_model(x))

In [None]:
df_gan['y_pred'] = y_pred_gan
df_diffusion['y_pred'] = y_pred_diffusion

In [None]:
gan_models = df_gan['model'].unique().tolist()
diffusion_models = df_diffusion['model'].unique().tolist()

In [None]:
df = pd.concat([df_gan, df_diffusion], ignore_index=True)
models = df['model'].unique().tolist()
models.remove('imagenet')
models.remove('laion')

In [None]:
results = pd.DataFrame({'model': [],
                       'accuracy': [],
                       'f1_score': []
                       })

for model in models:
    temp = df[df['model']==model]
    if model in diffusion_models:
        if model == 'guided':
            temp = pd.concat([temp, df[df['model']=='imagenet']], ignore_index=True)
        else:
            temp = pd.concat([temp, df[df['model']=='laion']], ignore_index=True)
    
    y = temp['label']
    y_pred = temp['y_pred']
    acc = accuracy_score(y, y_pred)
    f1 = sklearn.metrics.f1_score(y, y_pred, average='macro')
    
    results = pd.concat([results, pd.DataFrame([[model, acc, f1]], columns=['model', 'accuracy', 'f1_score'])], ignore_index=True)

In [None]:
results

In [None]:
results.to_csv("results-svm.csv", index=False)

# **2) Training using Logistic Regression and Neural Network**

### Loading Train Dataset

In [None]:
train_dataset = load_dataset("rajendrabaskota/progan-train-dataset-all", split="train")
train_dataset

### Loading Test Dataset

In [None]:
dataset_diffusion = load_dataset("rajendrabaskota/diffusion-test-dataset", split="test")
dataset_gan = load_dataset("rajendrabaskota/gan-test-dataset", split="test")

### Converting Features into Tensors

In [None]:
X = torch.tensor(train_dataset['img_embed']).to(device)
y = torch.tensor(train_dataset['label'], dtype=torch.float32).to(device)
y = torch.reshape(y, (y.shape[0], 1))

X_gan = torch.tensor(dataset_gan['img_embed']).to(device)
# y_gan = torch.tensor(dataset_gan['label']).to(device)
X_diffusion = torch.tensor(dataset_diffusion['img_embed']).to(device)
# y_diffusion = torch.tensor(dataset_diffusion['label']).to(device)

### Train-Test Split

In [None]:
test_ratio = 0.02
test_n = int(X.shape[0]*test_ratio)
X_train = X[:-test_n]
y_train = y[:-test_n]
X_test = X[-test_n:]
y_test = y[-test_n:]

## **Logistic Regression**

In [None]:
# clf = SGDClassifier(loss='log_loss', max_iter=1000)
# # clf.fit(X, y)
# clf.partial_fit(X, y, classes=np.unique(y))

In [None]:
# y_pred_gan = clf.predict(X_gan)
# y_pred_diffusion = clf.predict(X_diffusion)

## **Neural Network**

In [None]:
class NN(nn.Module):
    def __init__(self, input_units, hidden_units, output_units):
        super(NN, self).__init__()
        self.input_units = input_units
        self.output_units = output_units
        self.hidden_units = hidden_units
        
        self.network = nn.Sequential(
            nn.Linear(self.input_units, self.hidden_units),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(self.hidden_units, self.output_units)
        )
        
#         self.network = nn.Linear(self.input_units, self.output_units)
        
    def forward(self, x, y=None):
        logits = self.network(x)
        probs = F.sigmoid(logits)
        
        if not y == None:
            loss = F.binary_cross_entropy(probs, y)
        else:
            loss = None
        
        return probs, loss

In [None]:
input_units = len(train_dataset[0]['img_embed']) # 768
hidden_units = 100
output_units = 1 # binary classification
learning_rate = 0.03

clf_nn = NN(input_units, hidden_units, output_units).to(device)
optimizer = torch.optim.Adam(clf_nn.parameters(), lr=learning_rate)

### Creating a Dataset

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, x, y):
        super(CustomDataset, self).__init__()
        self.x = x
        self.y = y
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

### Creading a DataLoader

In [None]:
batch_size = 128

dataloader = torch.utils.data.DataLoader(dataset=CustomDataset(X_train, y_train),
                                        batch_size=batch_size,
                                        shuffle=True)

### Learning Rate Scheduling

In [None]:
from copy import deepcopy

lr_scheduling_rate = 0.1
def lr_scheduling(learning_rate):
    temp = deepcopy(optimizer.state_dict())
    learning_rate = learning_rate * lr_scheduling_rate
    temp['param_groups'][0]['lr'] = learning_rate
    optimizer.load_state_dict(temp)
    
    return learning_rate

### Model Training

In [None]:
epochs = 2800
eval_iters = 100
progress_bar = tqdm(total=epochs, desc="Training", dynamic_ncols=True)
train_losses = []
test_losses = []
num_no_consecutive_improvement = 0

clf_nn.train()
for i in tqdm(range(epochs)):
#     for X_train, y_train in dataloader:
    probs, train_loss = clf_nn(X_train, y_train)
    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()

    if (i+1)%eval_iters == 0:
        clf_nn.eval()
        with torch.no_grad():
            probs, test_loss = clf_nn(X_test, y_test)
            test_losses.append(test_loss.item())
            
            try:
                if min(test_losses) <= test_loss:
                    num_no_consecutive_improvement += 1
                elif num_no_consecutive_improvement > 0:
                    num_no_consecutive_improvement = 0
            except:
                pass

        train_losses.append(train_loss.item())
        progress_bar.set_postfix({'Train Loss': train_loss.item(), 'Test Loss': test_loss.item()}, refresh=True)
        clf_nn.train()
        
    if num_no_consecutive_improvement == 10:
        learning_rate = lr_scheduling(learning_rate)
        print(f"Changed lr to {optimizer.state_dict()['param_groups'][0]['lr']}")
        num_no_consecutive_improvement = 0

    progress_bar.update(1)

In [None]:
import matplotlib.pyplot as plt

x_values = range(0, epochs, eval_iters)
plt.plot(x_values, test_losses, color='red')
plt.plot(x_values, train_losses, color='green')

# Prediction

In [None]:
clf_nn.eval()
threshold = 0.5

with torch.no_grad():
    probs_gan, _ = clf_nn(X_gan)
    probs_gan = probs_gan.cpu().detach().numpy()
    probs_diffusion, _ = clf_nn(X_diffusion)
    probs_diffusion = probs_diffusion.cpu().detach().numpy()
    y_pred_gan = (probs_gan > threshold)
    y_pred_diffusion = (probs_diffusion > threshold)

In [None]:
df_gan = Dataset.to_pandas(dataset_gan)
df_diffusion = Dataset.to_pandas(dataset_diffusion)

In [None]:
def identify_model(x):
    match = re.match(r'^([^/]+)', x)

    if match:
        result = match.group(1)
    else:
        result = ''
        
    return result

df_gan['model'] = df_gan['file_path'].apply(lambda x: identify_model(x))
df_diffusion['model'] = df_diffusion['file_path'].apply(lambda x: identify_model(x))

In [None]:
df_gan['y_pred'] = y_pred_gan
df_diffusion['y_pred'] = y_pred_diffusion

df_gan['y_probs'] = probs_gan
df_diffusion['y_probs'] = probs_diffusion

In [None]:
gan_models = df_gan['model'].unique().tolist()
diffusion_models = df_diffusion['model'].unique().tolist()

In [None]:
df = pd.concat([df_gan, df_diffusion], ignore_index=True)
models = df['model'].unique().tolist()
models.remove('imagenet')
models.remove('laion')

In [None]:
results = pd.DataFrame({'model': [],
                        'average_precision': [],
                       'accuracy': [],
                       'f1_score': [],
                        'precision': [],
                        'recall': []
                       })

for model in models:
    temp = df[df['model']==model]
    if model in diffusion_models:
        if model == 'guided':
            temp = pd.concat([temp, df[df['model']=='imagenet']], ignore_index=True)
        else:
            temp = pd.concat([temp, df[df['model']=='laion']], ignore_index=True)
    
    y = temp['label']
    y_pred = temp['y_pred']
    y_scores = temp['y_probs']
    acc = accuracy_score(y, y_pred)
    f1 = sklearn.metrics.f1_score(y, y_pred, average='macro')
    avg_precision = average_precision_score(y, y_scores, average="macro")
    precision = precision_score(y, y_pred)
    recall = recall_score(y, y_pred)
    
    results = pd.concat([results, pd.DataFrame([[model, avg_precision, acc, f1, precision, recall]], columns=['model', 'average_precision', 'accuracy', 'f1_score', 'precision', 'recall'])], ignore_index=True)

In [None]:
results

In [None]:
results.to_csv("nn-progan-results.csv", index=False)

In [None]:
from IPython.display import FileLink

FileLink(r'nn-progan-720k-adm-lr-0.3-no-hidden-units-epochs-20k.csv')