In [None]:
from diffusers import StableDiffusionPipeline, DDIMScheduler
import torch
from PIL import Image
from torchvision import transforms
from torchvision.utils import make_grid
import matplotlib.pyplot as plt

In [None]:
pipeline = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4")
scheduler = DDIMScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")

In [None]:
vae = pipeline.vae.cuda()
unet = pipeline.unet.cuda()
tokenizer = pipeline.tokenizer
text_encoder = pipeline.text_encoder.cuda()

In [None]:
# image = Image.open('real_n01514668_18815.JPEG').convert("RGB").resize((512,512))
batch = Image.open('ai_008_sdv5_00084.png').convert("RGB").resize((512,512))

batch

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])
image_tensor = transform(batch).unsqueeze(0).cuda()

In [None]:
with torch.no_grad():
    vae_output = vae.encode(image_tensor)

latents = vae_output.latent_dist.sample() * vae.config.scaling_factor

In [None]:
timesteps = 30
t_start = 20
scheduler.set_timesteps(timesteps)

In [None]:
noise_list = []
latent_list = []
for t in scheduler.timesteps[t_start:]:
    noise = torch.randn_like(latents).cuda()
    noise_list.append(noise)
    noisy_latents = scheduler.add_noise(latents, noise, t)
    # beta_t = scheduler.betas[t].cuda()
    # prev_latents = latent_list[-1] if latent_list else latents
    # noisy_latents = (1 - beta_t).sqrt() * prev_latents + beta_t.sqrt() * noise
    latent_list.append(noisy_latents)

In [None]:
prompt = ""
with torch.no_grad():
    text_embeddings = text_encoder(tokenizer(prompt, return_tensors="pt").input_ids.cuda())[0]

In [None]:
from tqdm.auto import tqdm

denoise_list = []
pred_noise_list = []

for t, lat in tqdm(zip(scheduler.timesteps[t_start:], latent_list), total=len(latent_list)):
    with torch.no_grad():
        noise_pred = unet(lat, t, text_embeddings).sample
        pred_noise_list.append(noise_pred)
    denoise_lat = scheduler.step(noise_pred, t.item(), lat).prev_sample
    denoise_list.append(denoise_lat)

In [None]:
import scipy

def extract_noise_features(pred_noise: torch.Tensor, noise: torch.Tensor) -> list[torch.Tensor]:
    # tensor shape: (batch_size, channels, height, width)
    residual = pred_noise - noise

    batch = []

    for i in range(pred_noise.shape[0]):

        pred_mean = pred_noise[i].mean().item()
        pred_std = pred_noise[i].std().item()
        pred_skew = scipy.stats.skew(pred_noise[i].flatten().cpu().numpy()).item()
        pred_kurtosis = scipy.stats.kurtosis(pred_noise[i].flatten().cpu().numpy()).item()
        pred_l2 = torch.linalg.norm(pred_noise[i]).item()

        pred_fft = torch.fft.fft2(pred_noise[i], norm="ortho")
        pred_fft_magnitude = torch.abs(pred_fft).mean().item()
        pred_fft_phase = torch.angle(pred_fft).mean().item()

        residual_mean = residual[i].mean().item()
        residual_std = residual[i].std().item()
        residual_skew = scipy.stats.skew(residual[i].flatten().cpu().numpy())
        residual_kurtosis = scipy.stats.kurtosis(residual[i].flatten().cpu().numpy())
        residual_l2 = torch.norm(residual[i]).item()

        cosine_sim = torch.nn.functional.cosine_similarity(pred_noise[i].flatten(), noise[i].flatten(), dim=0).item()

        batch.append( torch.tensor([
            pred_mean, pred_std, pred_skew, pred_kurtosis, pred_l2,
            pred_fft_magnitude, pred_fft_phase,
            residual_mean, residual_std, residual_skew, residual_kurtosis, residual_l2,
            cosine_sim
        ]))

    return batch

def extract_noise_features_no_noise(pred_noise: torch.Tensor) -> list[torch.Tensor]:
    # tensor shape: (batch_size, channels, height, width)
    batch = []

    for i in range(pred_noise.shape[0]):

        pred_mean = pred_noise[i].mean().item()
        pred_std = pred_noise[i].std().item()
        pred_skew = scipy.stats.skew(pred_noise[i].flatten().cpu().numpy()).item()
        pred_kurtosis = scipy.stats.kurtosis(pred_noise[i].flatten().cpu().numpy()).item()
        pred_l2 = torch.linalg.norm(pred_noise[i]).item()

        pred_fft = torch.fft.fft2(pred_noise[i], norm="ortho")
        pred_fft_magnitude = torch.abs(pred_fft).mean().item()
        pred_fft_phase = torch.angle(pred_fft).mean().item()

        batch.append( torch.tensor([
            pred_mean, pred_std, pred_skew, pred_kurtosis, pred_l2,
            pred_fft_magnitude, pred_fft_phase
        ]))

    return batch

In [None]:
extracted_features = []

for pred_noise, noise in tqdm(zip(pred_noise_list, noise_list)):
    features = extract_noise_features(pred_noise, noise)
    extracted_features.append(features)

In [None]:
decode_list = []

for lat in tqdm(denoise_list):
    with torch.no_grad():
        decode_output = vae.decode(lat / vae.config.scaling_factor).sample
        decode_list.append(decode_output)

In [None]:
decode_list = torch.cat(decode_list)
decode_tensor = (decode_list.clamp(-1, 1) + 1) / 2

In [None]:
def normalize_noise(tensor):
    min_val = tensor.min()
    max_val = tensor.max()
    return (tensor - min_val) / (max_val - min_val)

In [None]:
grid = make_grid(decode_tensor, nrow=5)
plt.figure(figsize=(20,20))
plt.axis("off")
plt.imshow(grid.permute(1, 2, 0).cpu().numpy())

In [None]:
noise_list = [normalize_noise(noise) for noise in noise_list]
pred_noise_list = [normalize_noise(noise) for noise in pred_noise_list]
noise_diff_list = [noise - pred_noise for noise, pred_noise in zip(noise_list, pred_noise_list)]
noise_diff_list = [normalize_noise(noise) for noise in noise_diff_list]

latent_diff_list = [latents - lat for lat in latent_list]
latent_diff_list = [normalize_noise(noise) for noise in latent_diff_list]

In [None]:
grid = make_grid(torch.cat(noise_diff_list), nrow=5)
plt.figure(figsize=(20,20))
plt.axis("off")
plt.imshow(grid.permute(1, 2, 0).cpu().numpy())

In [None]:
grid = make_grid(torch.cat(latent_diff_list), nrow=5)
plt.figure(figsize=(20,20))
plt.axis("off")
plt.imshow(grid.permute(1, 2, 0).cpu().numpy())

In [None]:
class MyPipeline:

    timesteps = 30
    t_start = 20

    def __init__(self, vae, unet, tokenizer, text_encoder, scheduler, device='cuda'):
        self.vae = vae.to(device)
        self.unet = unet.to(device)
        self.scheduler = scheduler
        self.device = device

        self.scheduler.set_timesteps(self.timesteps)

        text_encoder = text_encoder.to(device)
        with torch.no_grad():
            self.text_embeddings = text_encoder(tokenizer("", return_tensors="pt").input_ids.to(device))[0]

    def __call__(self, batch: torch.Tensor):
        batch = batch.to(self.device)

        batch_text_embeddings = self.text_embeddings.repeat(batch.shape[0], 1, 1)

        # Encode the image using VAE
        with torch.no_grad():
            vae_output = self.vae.encode(batch)

        latents = vae_output.latent_dist.sample() * self.vae.config.scaling_factor

        # Add noise to the latents
        # noises_list = []
        # latents_list = []
        noise = torch.randn_like(latents).to(self.device)
        noisy_latents = scheduler.add_noise(latents, noise, self.scheduler.timesteps[self.t_start])
        # for t in self.scheduler.timesteps[self.t_start:]:
        #     noises_list.append(noise)
        #     noisy_latents = scheduler.add_noise(latents, noise, t)
        #     latents_list.append(noisy_latents)

        # denoise_list = []
        pred_noises_list = []

        for t in tqdm(self.scheduler.timesteps[self.t_start:], desc="Denoising", leave=False):
            with torch.no_grad():
                noises_pred = self.unet(noisy_latents, t, batch_text_embeddings).sample
                pred_noises_list.append(noises_pred)
            noisy_latents = self.scheduler.step(noises_pred, t.item(), noisy_latents).prev_sample
            # denoise_list.append(denoise_lat)

        extracted_features = []
        # for pred_noises, noises in tqdm(zip(pred_noises_list, noises_list), total=len(pred_noises_list)):
        for pred_noises in tqdm(pred_noises_list, desc="Extracting features", leave=False):
            # features = extract_noise_features(pred_noises, noises)
            features = extract_noise_features_no_noise(pred_noises)
            extracted_features.append(features)

        extracted_features = zip(*extracted_features)
        extracted_features = [torch.stack(feature) for feature in extracted_features]
        extracted_features = torch.stack(extracted_features)

        return extracted_features

In [None]:
import kagglehub, os, shutil, random

datapath = kagglehub.dataset_download("yangsangtai/tiny-genimage")

os.makedirs('data/0_real', exist_ok=True)
os.makedirs('data/1_fake', exist_ok=True)

for dir in os.listdir(datapath):
    for file in os.listdir(f'{datapath}/{dir}/val/nature'):
        shutil.copy(f'{datapath}/{dir}/val/nature/{file}', f'data/0_real/{dir}_{file}')
    for file in os.listdir(f'{datapath}/{dir}/val/ai'):
        shutil.copy(f'{datapath}/{dir}/val/ai/{file}', f'data/1_fake/{dir}_{file}')

os.makedirs('data_1000/0_real', exist_ok=True)
os.makedirs('data_1000/1_fake', exist_ok=True)

random.seed(42)  # For reproducibility
random_1000_real = random.sample(os.listdir('data/0_real'), 1000)
random_1000_fake = random.sample(os.listdir('data/1_fake'), 1000)
for file in random_1000_real:
    shutil.copy(f'data/0_real/{file}', f'data_1000/0_real/{file}')
for file in random_1000_fake:
    shutil.copy(f'data/1_fake/{file}', f'data_1000/1_fake/{file}')

In [None]:
import torchvision

transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

dataset = torchvision.datasets.ImageFolder(
    root='data_1000',
    transform=transform
)

dataloader = torch.utils.data.DataLoader(
    dataset,
    batch_size=8,
)

In [None]:
results = []
results_labels = []

mypipeline = MyPipeline(
    vae=pipeline.vae,
    unet=pipeline.unet,
    tokenizer=pipeline.tokenizer,
    text_encoder=pipeline.text_encoder,
    scheduler=scheduler
)

for batch, labels in tqdm(dataloader):
    features = mypipeline(batch)
    results.append(features)
    results_labels.append(labels)

results_ts = torch.cat(results, dim=0)
results = results_ts.cpu().numpy()
results = results.reshape(results.shape[0], -1)

results_labels_ts = torch.cat(results_labels, dim=0)
results_labels = results_labels_ts.cpu().numpy()

In [None]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

In [None]:
pca = PCA(n_components=2)
tsne = TSNE(n_components=2, random_state=42)

In [None]:
x_pca = pca.fit_transform(results)
x_tsne = tsne.fit_transform(results)

In [None]:
import matplotlib.pyplot as plt

def plot_embedding(X_embedded, labels, title="Embedding"):
    plt.figure(figsize=(6,6))
    scatter = plt.scatter(X_embedded[:, 0], X_embedded[:, 1], c=labels, cmap='coolwarm', alpha=0.6)
    plt.legend(*scatter.legend_elements(), title="Class")
    plt.title(title)
    plt.xlabel("Dim 1")
    plt.ylabel("Dim 2")
    plt.grid(True)
    plt.show()

plot_embedding(x_pca, results_labels, title="PCA")
plot_embedding(x_tsne, results_labels, title="t-SNE")

In [None]:
class LSTMClassifier(torch.nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=1, num_classes=2):
        super().__init__()
        self.lstm = torch.nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True
        )
        self.fc = torch.nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        output, _ = self.lstm(x)
        logits = self.fc(output[:, -1, :])
        return logits

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 8
num_epochs = 50

results_mean = results_ts.mean(dim=(0, 1))
results_std = results_ts.std(dim=(0, 1))
results_ts = (results_ts - results_mean[None, None, :]) / (results_std[None, None, :] + 1e-8)

train_indices = torch.randperm(results_ts.shape[0])
train_size = int(0.8 * results_ts.shape[0])
train_indices, val_indices = train_indices[:train_size], train_indices[train_size:]

X_train = results_ts[train_indices]
X_test = results_ts[val_indices]
y_train = results_labels_ts[train_indices]
y_test = results_labels_ts[val_indices]

model = LSTMClassifier(input_size=results_ts.shape[-1]).to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

model.train()

for epoch in tqdm(range(num_epochs)):
    running_loss = 0
    correct = 0
    total = 0

    for i in tqdm(range(0, X_train.shape[0], batch_size), leave=False):
        batch = X_train[i:i+batch_size].to(device)
        labels = y_train[i:i+batch_size].to(device)

        logits = model(batch)
        loss = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * batch.size(0)

        preds = logits.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    print(f"Epoch {epoch+1}: Loss = {running_loss / total:.4f}, Accuracy = {correct / total:.4f}")