In [1]:
!pip install -q mediapipe==0.10.32
!pip install -q torch torch-geometric librosa opencv-python scikit-learn matplotlib seaborn tqdm

import os
import cv2
import numpy as np
import librosa
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torch_geometric.nn import GCNConv
from sklearn.metrics import accuracy_score
from google.colab import drive

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("✅ Device:", device)

if not os.path.exists("/content/drive"):
    drive.mount("/content/drive")


✅ Device: cuda


In [2]:
!wget -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task


In [3]:
MP_TO_DLIB_68 = [
    162,234,93,58,172,136,149,148,152,377,378,365,397,288,323,454,389,
    71,63,105,66,107,336,296,334,293,300,
    168,6,195,4,64,60,94,285,292,419,197,19,1,2,98,327,276,283,282,295,294,
    33,246,161,160,159,158,157,173,133,155,154,153,144,145,153,154,155,133,
    78,95,88,178,87,14,317,402,318,324,308,415,310,311,312,13,82,81,80,191,78
]

SELECTED_LANDMARKS = MP_TO_DLIB_68[:68]

def build_edge_index():
    edges = []
    groups = [
        range(0,17), range(17,22), range(22,27),
        range(27,31), range(31,36),
        range(36,42), range(42,48),
        range(48,60), range(60,68)
    ]
    for g in groups:
        g = list(g)
        for i in range(len(g)-1):
            edges.append([g[i], g[i+1]])
            edges.append([g[i+1], g[i]])
    return torch.tensor(edges, dtype=torch.long).t().contiguous().to(device)

STATIC_EDGE_INDEX = build_edge_index()
print("✅ Graph built")


✅ Graph built


In [4]:
base_options = python.BaseOptions(model_asset_path="face_landmarker.task")
options = vision.FaceLandmarkerOptions(
    base_options=base_options,
    num_faces=1
)
face_landmarker = vision.FaceLandmarker.create_from_options(options)

class DeceptionDataset(Dataset):
    def __init__(self, root_dir, frame_limit=60):
        self.samples = []
        self.labels = []
        self.frame_limit = frame_limit
        self.cache = {}

        classes = {"truth": 0, "lie": 1}

        for cls, label in classes.items():
            path = os.path.join(root_dir, cls)
            if not os.path.exists(path):
                continue
            for file in os.listdir(path):
                if file.endswith((".mp4",".avi",".mov")):
                    self.samples.append(os.path.join(path,file))
                    self.labels.append(label)

        print(f"Loaded {len(self.samples)} samples from {root_dir}")

    def extract_audio(self, path):
        try:
            y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
            mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).T
            if np.std(mfcc) > 1e-6:
                mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
        except:
            mfcc = np.zeros((100,13))
        return mfcc

    def process_video(self, path):
        if path in self.cache:
            return self.cache[path]

        mfcc = self.extract_audio(path)
        cap = cv2.VideoCapture(path)
        node_feats = []

        while cap.isOpened() and len(node_feats) < self.frame_limit:
            ret, frame = cap.read()
            if not ret:
                break

            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)
            result = face_landmarker.detect(mp_img)

            if result.face_landmarks:
                lm = result.face_landmarks[0]
                face = np.array([[lm[i].x,lm[i].y,lm[i].z] for i in SELECTED_LANDMARKS])
                idx = min(int(len(node_feats)*(len(mfcc)/self.frame_limit)), len(mfcc)-1)
                audio = np.tile(mfcc[idx], (68,1))
                fused = np.concatenate((face,audio),axis=1)
            else:
                fused = np.zeros((68,16))

            node_feats.append(fused)

        cap.release()

        if len(node_feats)==0:
            tensor = np.zeros((self.frame_limit,68,16))
        else:
            tensor = np.stack(node_feats)
            if tensor.shape[0] < self.frame_limit:
                pad = np.zeros((self.frame_limit-tensor.shape[0],68,16))
                tensor = np.concatenate((tensor,pad),axis=0)

        tensor = torch.tensor(tensor,dtype=torch.float32)
        self.cache[path]=tensor
        return tensor

    def __len__(self):
        return len(self.samples)

    def __getitem__(self,idx):
        return self.process_video(self.samples[idx]), self.labels[idx]


In [5]:
TRAIN_DIR = "/content/drive/My Drive/Academics/DOLOS_Train"
TEST_DIR = "/content/drive/My Drive/Academics/RLT_Test"

train_dataset = DeceptionDataset(TRAIN_DIR)
test_dataset = DeceptionDataset(TEST_DIR)

BATCH_SIZE = 16

count0 = train_dataset.labels.count(0)
count1 = train_dataset.labels.count(1)
w0 = (count0+count1)/(2*count0) if count0>0 else 1
w1 = (count0+count1)/(2*count1) if count1>0 else 1

class_weights = torch.tensor([w0,w1]).float().to(device)

sample_weights = [w0 if y==0 else w1 for y in train_dataset.labels]
sampler = WeightedRandomSampler(sample_weights,len(sample_weights),replacement=True)

train_loader = DataLoader(train_dataset,batch_size=BATCH_SIZE,sampler=sampler)
test_loader = DataLoader(test_dataset,batch_size=BATCH_SIZE,shuffle=False)

print("⚖️ Truth:",count0,"Lie:",count1)


Loaded 1404 samples from /content/drive/My Drive/Academics/DOLOS_Train
Loaded 121 samples from /content/drive/My Drive/Academics/RLT_Test
⚖️ Truth: 633 Lie: 771


In [6]:
class CGNND(nn.Module):
    def __init__(self,node_feats=16,hidden=64):
        super().__init__()
        self.bn_input = nn.BatchNorm1d(node_feats)
        self.gcn = GCNConv(node_feats,hidden)
        self.bn_gcn = nn.BatchNorm1d(hidden)
        self.lstm = nn.LSTM(hidden,hidden,num_layers=2,batch_first=True,dropout=0.2)
        self.fc = nn.Sequential(
            nn.Linear(hidden,32),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(32,2)
        )

    def forward(self,x,edge_index):
        B,T,N,F_dim = x.shape
        x = self.bn_input(x.view(-1,F_dim)).view(B,T,N,F_dim)

        spatial=[]
        for t in range(T):
            frame = x[:,t,:,:]
            edges=[]
            for b in range(B):
                edges.append(edge_index + b*N)
            edges = torch.cat(edges,dim=1)
            out = self.gcn(frame.reshape(B*N,F_dim),edges)
            out = out.view(B,N,-1).mean(dim=1)
            out = self.bn_gcn(out)
            spatial.append(F.relu(out))

        spatial = torch.stack(spatial,dim=1)
        _,(hn,_) = self.lstm(spatial)
        emb = hn[-1]
        logits = self.fc(emb)
        return logits

model = CGNND().to(device)
optimizer = torch.optim.AdamW(model.parameters(),lr=0.001,weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,'min',patience=3)
criterion = nn.CrossEntropyLoss(weight=class_weights)


In [None]:
EPOCHS=20
PATIENCE=5
best_val=float("inf")
stop_counter=0

for epoch in range(EPOCHS):
    model.train()
    train_loss=0
    correct=0
    total=0

    loop=tqdm(train_loader)
    for x,y in loop:
        x,y=x.to(device),y.to(device)
        optimizer.zero_grad()
        logits=model(x,STATIC_EDGE_INDEX)
        loss=criterion(logits,y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(),1.0)
        optimizer.step()

        train_loss+=loss.item()
        preds=logits.argmax(1)
        correct+=(preds==y).sum().item()
        total+=y.size(0)

    train_acc=correct/total
    train_loss/=len(train_loader)

    model.eval()
    val_loss=0
    val_correct=0
    val_total=0
    with torch.no_grad():
        for x,y in test_loader:
            x,y=x.to(device),y.to(device)
            logits=model(x,STATIC_EDGE_INDEX)
            loss=criterion(logits,y)
            val_loss+=loss.item()
            val_correct+=(logits.argmax(1)==y).sum().item()
            val_total+=y.size(0)

    val_loss/=len(test_loader)
    val_acc=val_correct/val_total

    scheduler.step(val_loss)

    print(f"Epoch {epoch+1} | Train Acc {train_acc:.3f} | Val Acc {val_acc:.3f}")

    if val_loss<best_val:
        best_val=val_loss
        stop_counter=0
        torch.save(model.state_dict(),"best_model.pt")
    else:
        stop_counter+=1
        if stop_counter>=PATIENCE:
            print("Early stopping")
            break

model.load_state_dict(torch.load("best_model.pt"))
print("✅ Training Complete")


  0%|          | 0/88 [00:00<?, ?it/s]

  y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(path, sr=16000, duration=3.0, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa ver

In [None]:
# ============================================
# 7️⃣ FINAL EVALUATION
# ============================================

from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

model.eval()
all_preds = []
all_labels = []
all_probs = []

print("🧪 Evaluating on Real-Life Trial Test Set...")

with torch.no_grad():
    for batch_x, batch_y in tqdm(test_loader):
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        logits = model(batch_x, STATIC_EDGE_INDEX)
        probs = torch.softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())
        all_probs.extend(probs[:, 1].cpu().numpy())  # Probability of "Lie"

# ==========================
# Confusion Matrix
# ==========================
cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(5, 4))
sns.heatmap(
    cm,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=['Truth', 'Lie'],
    yticklabels=['Truth', 'Lie']
)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix (Cross-Corpus)')
plt.show()

# ==========================
# Classification Report
# ==========================
print("\n📊 Classification Report:\n")
print(classification_report(all_labels, all_preds, target_names=['Truth', 'Lie']))
