In [2]:
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

In [3]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from timm import create_model

from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import seaborn as sns


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
RAW_REAL = r"C:\Users\EliteLaptop\Desktop\kawtar\GAN_inversion\raw\real"
RAW_FAKE = r"C:\Users\EliteLaptop\Desktop\kawtar\GAN_inversion\raw\fake"

FRAME_DIR = "frames_Adapter"
RESULT_DIR = "results_Adapter"

os.makedirs(f"{FRAME_DIR}/real", exist_ok=True)
os.makedirs(f"{FRAME_DIR}/fake", exist_ok=True)
os.makedirs(RESULT_DIR, exist_ok=True)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 5


In [4]:
def extract_frames(video_dir, out_dir, label, step=10):
    for vid in os.listdir(video_dir):
        cap = cv2.VideoCapture(os.path.join(video_dir, vid))
        idx = 0
        saved = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if idx % step == 0:
                frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
                name = f"{vid}_{saved}.jpg"
                cv2.imwrite(os.path.join(out_dir, label, name), frame)
                saved += 1
            idx += 1
        cap.release()

extract_frames(RAW_REAL, FRAME_DIR, "real")
extract_frames(RAW_FAKE, FRAME_DIR, "fake")


In [5]:
class DeepFakeDataset(Dataset):
    def __init__(self, root):
        self.samples = []
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.5]*3, [0.5]*3)
        ])

        for label, cls in enumerate(["real", "fake"]):
            path = os.path.join(root, cls)
            for img in os.listdir(path):
                self.samples.append((os.path.join(path, img), label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = self.transform(img)
        return img, label


In [6]:
class GBA(nn.Module):
    def __init__(self, dim=768, bottleneck=64):
        super().__init__()
        self.down = nn.Linear(dim, bottleneck)
        self.up = nn.Linear(bottleneck, dim)
        self.scale = nn.Parameter(torch.ones(1))

    def forward(self, x):
        return self.scale * self.up(F.relu(self.down(x)))


In [7]:
class LSA_Head(nn.Module):
    def __init__(self, dim=768):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 64, 3, 2, 1),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, 2, 1),
            nn.ReLU(),
            nn.Conv2d(128, dim, 3, 2, 1),
        )

    def forward(self, x):
        x = self.conv(x)
        return x.flatten(2).transpose(1, 2)


class CrossAttention(nn.Module):
    def __init__(self, dim=768):
        super().__init__()
        self.attn = nn.MultiheadAttention(dim, 6, batch_first=True)

    def forward(self, q, kv):
        out, _ = self.attn(q, kv, kv)
        return q + out


In [8]:
class DeepFakeAdapter(nn.Module):
    def __init__(self):
        super().__init__()
        self.vit = create_model("vit_base_patch16_224", pretrained=True, num_classes=0)

        for p in self.vit.parameters():
            p.requires_grad = False

        self.gbas = nn.ModuleList([GBA() for _ in range(12)])
        self.lsa_head = LSA_Head()
        self.cross1 = CrossAttention()
        self.cross2 = CrossAttention()
        self.cls = nn.Linear(768, 2)

    def forward(self, x):
        f_spa = self.lsa_head(x)

        tokens = self.vit.patch_embed(x)
        tokens = self.vit.pos_drop(tokens)

        for i, blk in enumerate(self.vit.blocks):
            tokens = blk.attn(blk.norm1(tokens))
            tokens = blk.mlp(tokens) + self.gbas[i](tokens)

            if i in [0, 4, 8]:
                tokens = self.cross1(tokens, f_spa)
                f_spa = self.cross2(f_spa, tokens)

        feat = f_spa.mean(dim=1)
        return self.cls(feat), feat


In [9]:
from tqdm import tqdm  # Pour la barre de progression

dataset = DeepFakeDataset(FRAME_DIR)
loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

model = DeepFakeAdapter().to(DEVICE)
opt = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

for epoch in range(EPOCHS):
    model.train()
    loop = tqdm(loader, desc=f"Epoch {epoch+1}/{EPOCHS}")  # barre de progression
    for x, y in loop:
        x, y = x.to(DEVICE), y.to(DEVICE)
        out, _ = model(x)
        loss = criterion(out, y)
        opt.zero_grad()
        loss.backward()
        opt.step()

        # Mettre à jour la barre avec la loss actuelle
        loop.set_postfix(loss=loss.item())

Epoch 1/5:   3%|▎         | 34/1183 [25:39<14:27:02, 45.28s/it, loss=0.538]


KeyboardInterrupt: 

In [2]:
!pip install timm




In [3]:
import os
import cv2
import random
import torch
import timm
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
import matplotlib.pyplot as plt

from tqdm import tqdm
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay,
    accuracy_score, precision_score, recall_score, f1_score
)
from sklearn.manifold import TSNE


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device


device(type='cuda')

In [5]:
RAW_REAL_DIR = r"C:\Users\EliteLaptop\Desktop\kawtar\GAN_inversion\raw\real"
RAW_FAKE_DIR = r"C:\Users\EliteLaptop\Desktop\kawtar\GAN_inversion\raw\fake"

FRAMES_DIR  = "frames_Adapter"     # frames extraites
RESULTS_DIR = "results_Adapter"    # résultats sauvegardés

os.makedirs(FRAMES_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)


In [6]:
def make_dirs():
    for split in ["train", "val", "test"]:
        for cls in ["real", "fake"]:
            os.makedirs(os.path.join(FRAMES_DIR, split, cls), exist_ok=True)

make_dirs()


In [7]:
def split_videos(video_dir, seed=42):
    random.seed(seed)
    videos = [v for v in os.listdir(video_dir) if v.endswith(".mp4")]
    random.shuffle(videos)

    n = len(videos)
    return (
        videos[:int(0.8*n)],
        videos[int(0.8*n):int(0.9*n)],
        videos[int(0.9*n):]
    )


In [8]:
def extract_frames(video_path, out_dir, target_fps=1):
    os.makedirs(out_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    step = max(1, int(fps / target_fps))

    count, saved = 0, 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if count % step == 0:
            frame = cv2.resize(frame, (224,224))
            cv2.imwrite(
                os.path.join(out_dir, f"frame_{saved:04d}.jpg"),
                frame
            )
            saved += 1

        count += 1

    cap.release()


In [9]:
def process_class(video_dir, label):
    train, val, test = split_videos(video_dir)

    splits = {
        "train": train,
        "val": val,
        "test": test
    }

    for split, vids in splits.items():
        for vid in tqdm(vids, desc=f"{label}-{split}"):
            vid_path = os.path.join(video_dir, vid)
            out_dir = os.path.join(
                FRAMES_DIR, split, label,
                vid.replace(".mp4","")
            )
            extract_frames(vid_path, out_dir)


In [13]:
# ⚠️ À lancer UNE SEULE FOIS
process_class(RAW_REAL_DIR, "real")
process_class(RAW_FAKE_DIR, "fake")


real-train: 100%|██████████| 126/126 [01:03<00:00,  1.98it/s]
real-val: 100%|██████████| 16/16 [00:09<00:00,  1.76it/s]
real-test: 100%|██████████| 16/16 [00:09<00:00,  1.68it/s]
fake-train: 100%|██████████| 636/636 [04:41<00:00,  2.26it/s]
fake-val: 100%|██████████| 79/79 [00:38<00:00,  2.06it/s]
fake-test: 100%|██████████| 80/80 [00:41<00:00,  1.92it/s]


In [10]:
transform = T.Compose([
    T.Resize((224,224)),
    T.ToTensor(),
    T.Normalize(
        [0.485,0.456,0.406],
        [0.229,0.224,0.225]
    )
])


In [11]:
train_set = ImageFolder("frames_Adapter/train", transform=transform)
test_set  = ImageFolder("frames_Adapter/test", transform=transform)

train_loader = DataLoader(train_set, batch_size=8, shuffle=True)
test_loader  = DataLoader(test_set, batch_size=8, shuffle=False)


In [12]:
class GBA(nn.Module):
    def __init__(self, dim=768, bottleneck=64):
        super().__init__()
        self.down = nn.Linear(dim, bottleneck)
        self.up = nn.Linear(bottleneck, dim)
        self.scale = nn.Parameter(torch.ones(1))

    def forward(self, x):
        return self.scale * self.up(F.relu(self.down(x)))


In [13]:
class LSA_Head(nn.Module):
    def __init__(self, dim=768):
        super().__init__()
        self.base = nn.Sequential(
            nn.Conv2d(3,64,3,2,1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.c1 = nn.Conv2d(64,128,3,2,1)
        self.c2 = nn.Conv2d(128,256,3,2,1)
        self.c3 = nn.Conv2d(256,256,3,2,1)

        self.p1 = nn.Conv2d(128,dim,1)
        self.p2 = nn.Conv2d(256,dim,1)
        self.p3 = nn.Conv2d(256,dim,1)

    def forward(self, x):
        x = self.base(x)
        f1 = self.c1(x)
        f2 = self.c2(f1)
        f3 = self.c3(f2)

        f1 = self.p1(f1).flatten(2).transpose(1,2)
        f2 = self.p2(f2).flatten(2).transpose(1,2)
        f3 = self.p3(f3).flatten(2).transpose(1,2)

        return torch.cat([f1,f2,f3],1)


In [14]:
class CrossAttention(nn.Module):
    def __init__(self, dim=768, heads=6):
        super().__init__()
        self.attn = nn.MultiheadAttention(dim, heads, batch_first=True)

    def forward(self, q, kv):
        out,_ = self.attn(q,kv,kv)
        return out


In [15]:
class DeepFakeAdapter(nn.Module):
    def __init__(self):
        super().__init__()
        self.vit = timm.create_model(
            "vit_base_patch16_224",
            pretrained=True,
            num_classes=0
        )

        for p in self.vit.parameters():
            p.requires_grad = False

        self.lsa = LSA_Head()
        self.ca_vit = CrossAttention()
        self.ca_spa = CrossAttention()
        self.gbas = nn.ModuleList([GBA() for _ in range(12)])
        self.fc = nn.Linear(768,2)

    def forward(self, x, return_feat=False):
        spa = self.lsa(x)

        tok = self.vit.patch_embed(x)
        tok = tok + self.vit.pos_embed[:,1:]
        cls = self.vit.cls_token.expand(x.size(0),-1,-1)
        tok = torch.cat([cls,tok],1)

        for i,blk in enumerate(self.vit.blocks):
            tok[:,1:] += self.ca_vit(tok[:,1:], spa)
            tok = blk(tok)
            tok[:,1:] += self.gbas[i](tok[:,1:])
            spa += self.ca_spa(spa, tok[:,1:])

        feat = tok[:,0]
        out = self.fc(feat)

        if return_feat:
            return out, feat
        return out


In [16]:
model = DeepFakeAdapter().to(device)

optimizer = torch.optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=1e-4
)
criterion = nn.CrossEntropyLoss()

epochs = 5

for e in range(epochs):
    model.train()
    loss_sum = 0

    for x,y in tqdm(train_loader):
        x,y = x.to(device), y.to(device)

        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out,y)
        loss.backward()
        optimizer.step()

        loss_sum += loss.item()

    print(f"Epoch {e+1} | Loss = {loss_sum/len(train_loader):.4f}")


100%|██████████| 1292/1292 [2:34:33<00:00,  7.18s/it] 


Epoch 1 | Loss = 0.4721


100%|██████████| 1292/1292 [2:31:41<00:00,  7.04s/it] 


Epoch 2 | Loss = nan


  8%|▊         | 102/1292 [12:05<2:20:59,  7.11s/it]


KeyboardInterrupt: 