In [2]:
import time
import torch
import sys
sys.path.append('LLaVA')
sys.path.append('C:\transf\XAI_DEEPFAKE\LLaVA')

import pandas as pd
from PIL import Image
import numpy as np
import cv2
from torch.nn.functional import cosine_similarity
from pathlib import Path
import json
from llava.model.builder import load_pretrained_model
from llava.mm_utils import get_model_name_from_path
from llava.eval.run_llava import eval_model_frame
from tqdm import tqdm
import clip

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# if this happens, restart the notebook
def safe_load_model(model_path, max_retries=3, delay=5):
    attempt = 0
    while attempt < max_retries:
        try:
            tokenizer, llava_model, image_processor, context_len = load_pretrained_model(
                model_path=model_path,
                model_base=None,
                model_name=get_model_name_from_path(model_path)
            )
            return tokenizer, llava_model, image_processor, context_len
        except ZeroDivisionError:
            print(f"ZeroDivisionError during model loading. Retrying... (Attempt {attempt + 1})")
            time.sleep(delay)
            attempt += 1

    raise RuntimeError("Failed to load the model after multiple attempts")

model_path = "liuhaotian/llava-v1.5-7b"
tokenizer, llava_model, image_processor, context_len = safe_load_model(model_path)

llava_model = llava_model.to(device)

csv_path = r'C:\transf\XAI_DEEPFAKE\dataset.csv'
df = pd.read_csv(csv_path)

unique_videos = df['movie_name'].nunique()
print(f"Total unique videos: {unique_videos}")

# Split dataset into training and validation sets at the video level
df = df.sample(frac=1, random_state=0).reset_index(drop=True)
video_paths = df['movie_name'].unique()
val_videos = video_paths[:100]
train_videos = video_paths[100:]
train_df = df[df['movie_name'].isin(train_videos)]
val_df = df[df['movie_name'].isin(val_videos)]

# Load CLIP model for extracting embeddings
print(device)
clip_model, preprocess = clip.load("RN50", device=device)  # Using ResNet-50 backbone, can be changed to RN101
clip_model.eval()

print("CLIP Model Architecture:")
#print(clip_model)

# Function to get specific layer embedding from CLIP
def get_clip_embedding(image, layer="last"):
    image = preprocess(image).unsqueeze(0).to(device)
    # Ensure input type matches model weights
    image = image.half() if clip_model.visual.conv1.weight.dtype == torch.half else image

    with torch.no_grad():
        if layer == "first":
            #print("Extracting features after first block...")
            x = clip_model.visual.conv1(image)
            #print(f"Shape after conv1: {x.shape}")
            x = clip_model.visual.bn1(x)
            x = clip_model.visual.relu1(x)
            x = clip_model.visual.conv2(x)
            #print(f"Shape after conv2: {x.shape}")
            x = clip_model.visual.bn2(x)
            x = clip_model.visual.relu2(x)
            x = clip_model.visual.conv3(x)
            #print(f"Shape after conv3: {x.shape}")
            x = clip_model.visual.bn3(x)
            x = clip_model.visual.relu3(x)
            x = clip_model.visual.avgpool(x)
            #print(f"Shape after avgpool: {x.shape}")
            embedding = x


        elif layer == "middle":
            x = clip_model.visual.conv1(image)
            x = clip_model.visual.bn1(x)
            x = clip_model.visual.relu1(x)
            x = clip_model.visual.conv2(x)
            x = clip_model.visual.bn2(x)
            x = clip_model.visual.relu2(x)
            x = clip_model.visual.conv3(x)
            x = clip_model.visual.bn3(x)
            x = clip_model.visual.relu3(x)
            x = clip_model.visual.avgpool(x)  # Apply average pooling

            x = clip_model.visual.layer1(x)
            x = clip_model.visual.layer2(x)

            embedding = x

        elif layer == "last":
            embedding = clip_model.encode_image(image)

        else:
            raise ValueError("Invalid layer specified. Choose from 'first', 'middle', or 'last'.")

        # Flatten the embedding to ensure consistent shape across layers
        embedding = embedding.view(embedding.size(0), -1)  # Flatten spatial dimensions
    return embedding

# logic to detect deepfake artifacts in a frame
def detect_deepfake(frame, custom_prompt):
    if isinstance(frame, np.ndarray):
        frame = frame.astype('uint8')
    else:
        raise ValueError("Invalid frame data. Expected a NumPy array.")
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image_pil = Image.fromarray(frame_rgb)
    
    local_args = type('Args', (), {
        "model_path": model_path,
        "model_base": None,
        "model_name": get_model_name_from_path(model_path),
        "query": custom_prompt,
        "conv_mode": None,
        "temperature": 0,
        "top_p": None,
        "num_beams": 1,
        "max_new_tokens": 512,
    })()
    
    predicted_sentence = eval_model_frame(local_args, image_pil, tokenizer=tokenizer, model=llava_model, image_processor=image_processor)
    return predicted_sentence

def create_custom_prompt(annotation):
    base_prompt = (
        "Based on the following description: '{annotation}', analyze the face in the image and identify any signs of deepfake artifacts. "
        "Consider how the description might relate to potential manipulations. "
        "Look for abnormalities such as inconsistent lighting, unnatural facial movements, "
        "blurriness around the edges, strange reflections in the eyes, mismatched facial features, "
        "or any other indications of digital manipulation. Provide a detailed description of any "
        "anomalies found."
    )
    return base_prompt.format(annotation=annotation)

# def create_custom_prompt(annotation):
#     base_prompt = (
#         "Analyze the face in the image based on the description: '{annotation}'. Identify any deepfake artifacts, focusing specifically on the "
#         "affected parts of the face mentioned. Provide a short and direct explanation highlighting the inconsistencies or manipulations."
#     )
#     return base_prompt.format(annotation=annotation)

# Extract relevant information from the CSV
def get_relevant_frames_from_csv(df, num_files=0):
    print(f"Total number of videos before filtering: {df['movie_name'].nunique()}")
    print(f"Total rows in dataset: {len(df)}")

    frames = {}
    total_rows = len(df)
    print(f"Total rows to be processed: {total_rows}")
    if num_files > 0:
        total_rows = min(total_rows, num_files)

    for idx, row in df.iterrows():
        if num_files > 0 and idx >= num_files:
            break
        
        video_name = row['movie_name']
        manipulation_path = Path('C:/transf/XAI_DEEPFAKE/LLaVA') / row['manipulation']
        video_path = (manipulation_path / video_name).resolve()
        click_locations = row['click_locations']
        annotation_text = row['text']
        
        if not click_locations or pd.isna(click_locations) or not annotation_text or pd.isna(annotation_text):
            continue

        try:
            frame_data = json.loads(click_locations)
            relevant_frames = [int(frame) - 1 for frame in frame_data.keys() if frame.isdigit()]
            frames[video_path] = relevant_frames
            print(f"Number of relevant frames for video {video_path}: {len(relevant_frames)}")
        except json.JSONDecodeError as e:
            continue
        except Exception as e:
            continue

    return frames

# Main function
def main(display=True, save=True, num_files=0, layer="last"):
    frames = get_relevant_frames_from_csv(val_df, num_files)  # Process only validation videos
    train_embeddings = {}

    # Encode frames from training set only
    for idx, row in train_df.iterrows():
        video_name = row['movie_name']
        manipulation_path = Path('C:/transf/XAI_DEEPFAKE/LLaVA') / row['manipulation']
        video_path = (manipulation_path / video_name).resolve()
        click_locations = row['click_locations']

        if not click_locations or pd.isna(click_locations):
            continue

        try:
            frame_data = json.loads(click_locations)
            relevant_frames = [int(frame) - 1 for frame in frame_data.keys() if frame.isdigit()]
            for frame_number in relevant_frames:
                cap = cv2.VideoCapture(str(video_path))
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
                ret, frame = cap.read()
                if ret:
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image_pil = Image.fromarray(frame_rgb)
                    embedding = get_clip_embedding(image_pil, layer=layer)
                    train_embeddings[(video_path, frame_number)] = (embedding, row['text'])
                cap.release()
        except json.JSONDecodeError as e:
            continue
        except Exception as e:
            continue

    # Analyze validation frames
    results = []
    for video_path, frame_list in tqdm(frames.items(), desc="Analyzing validation frames"):
        for frame_number in frame_list:
            cap = cv2.VideoCapture(str(video_path))
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
            ret, frame = cap.read()
            if not ret:
                cap.release()
                continue

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image_pil = Image.fromarray(frame_rgb)
            val_embedding = get_clip_embedding(image_pil, layer=layer)
            cap.release()

            # Find the most similar frame in the training set
            max_similarity = -1
            best_annotation = None
            for (train_video_path, train_frame_number), (train_embedding, annotation) in train_embeddings.items():
                similarity = cosine_similarity(val_embedding, train_embedding).item()
                if similarity > max_similarity:
                    max_similarity = similarity
                    best_annotation = annotation

            # Create custom prompt using the closest training frame's annotation
            custom_prompt = create_custom_prompt(best_annotation) if best_annotation else "No annotation available"

            # Analyze the validation frame using the custom prompt
            test_deepfake_analysis = detect_deepfake(frame, custom_prompt) if best_annotation else "No analysis available"

            # Store the result
            results.append({
                'validation_video': str(video_path),
                'validation_frame': frame_number,
                'closest_train_annotation': best_annotation,
                'test_deepfake_analysis': test_deepfake_analysis,
                'cosine_similarity': max_similarity,
            })

    # save results to CSV
    if save:
        results_df = pd.DataFrame(results)
        results_csv_path = 'similarity_results_clip_first_resnet101.csv'
        results_df.to_csv(results_csv_path, index=False)
        print(f"Results saved to {results_csv_path}")

    #  display logs for the most similar frames
    if display and results:
        highest_similarity_result = max(results, key=lambda x: x['cosine_similarity'])
        print(f"Most Similar Pair:")
        print(f"Validation Video: {highest_similarity_result['validation_video']} at frame {highest_similarity_result['validation_frame']}")
        print(f"Cosine Similarity (Highest): {highest_similarity_result['cosine_similarity']:.4f}\n")
        print(f"Annotation for the Closest Training Frame:")
        print(highest_similarity_result['closest_train_annotation'])
        print(f"Deepfake Analysis for Validation Frame:")
        print(highest_similarity_result['test_deepfake_analysis'])
    elif not results:
        print("Could not find the most similar pair.")

if __name__ == "__main__":

    print("Now running 1sr prompt, resnet50, middle layer")

    main(display=True, save=True, num_files=0, layer="middle")


  from .autonotebook import tqdm as notebook_tqdm
You are using a model of type llava to instantiate a model of type llava_llama. This is not supported for all configurations of models and can yield errors.
  return self.fget.__get__(instance, owner)()
Loading checkpoint shards: 100%|██████████| 2/2 [00:09<00:00,  4.84s/it]


Total unique videos: 816
cuda
CLIP Model Architecture:
Now running 2nd prompt, resnet101, middle layer
Total number of videos before filtering: 100
Total rows in dataset: 100
Total rows to be processed: 100
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\end_to_end\304_M019.mp4: 6
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\dfdc_train_part_49\hoaweiathp.mp4: 6
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\dfdc_train_part_48\ghwlogkoic.mp4: 4
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\end_to_end\443_M137.mp4: 13
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\dfdc_train_part_2\yirhsptlko.mp4: 4
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\dfdc_train_part_2\uqvxjfpwdo.mp4: 3
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\end_to_end\384_M027.mp4: 6
Number of relevant frames for video C:\transf\XAI_DEEPFAKE\LLaVA\end_to_end\311_M135.mp4: 12
Number of relevan

Analyzing validation frames: 100%|██████████| 100/100 [4:48:34<00:00, 173.14s/it] 

Results saved to similarity_results_clip_first_resnet101.csv
Most Similar Pair:
Validation Video: C:\transf\XAI_DEEPFAKE\LLaVA\end_to_end\304_M019.mp4 at frame 345
Cosine Similarity (Highest): 0.0000

Annotation for the Closest Training Frame:
Light flickering on face.
Deepfake Analysis for Validation Frame:
The face in the image is described as having a light flickering on it. However, there are no deepfake artifacts or manipulations visible in the image. The lighting in the room might be causing a reflection on the man's face, but it is not a deepfake or manipulation.





In [3]:
import pandas as pd
import os

dataset_path = r'C:\transf\XAI_DEEPFAKE\dataset.csv'
similarity_results_path = r'C:\transf\XAI_DEEPFAKE\LLaVA\llava\similarity_results_clip_first_resnet101.csv'

video_annotations = pd.read_csv(dataset_path)
similarity_results = pd.read_csv(similarity_results_path)

video_annotations = video_annotations[['movie_name', 'text']]

# Remove duplicates to ensure unique movie_name for mapping
unique_video_annotations = video_annotations.drop_duplicates(subset='movie_name')

# Extract movie names from validation_video paths in the second dataset
similarity_results['movie_name'] = similarity_results['validation_video'].apply(lambda x: os.path.basename(x))

# Map the GT column using the unique video annotations
similarity_results['GT'] = similarity_results['movie_name'].map(
    unique_video_annotations.set_index('movie_name')['text']
)

output_path = r'C:\transf\XAI_DEEPFAKE\LLaVA\llava\similarity_results_clip_first_resnet101_merged.csv'
similarity_results.to_csv(output_path, index=False)

print(f"Updated dataset with GT column saved to {output_path}")


Updated dataset with GT column saved to C:\transf\XAI_DEEPFAKE\LLaVA\llava\similarity_results_clip_first_resnet101_merged.csv


In [4]:
import pandas as pd
import torch
from transformers import BertTokenizer, BertModel
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

file_path = r'C:\transf\XAI_DEEPFAKE\LLaVA\llava\similarity_results_clip_first_resnet101_merged.csv'
df = pd.read_csv(file_path)

# Group data by validation video ID
video_groups = df.groupby('validation_video')

# Load pre-trained BERT model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')

# Load pre-trained Sentence BERT model
sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

# Function to get BERT embeddings
def get_bert_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=512)
    with torch.no_grad():
        outputs = bert_model(**inputs)
    # Use the average of token embeddings (excluding padding tokens) for the embedding
    token_embeddings = outputs.last_hidden_state  # Shape: (batch_size, seq_length, hidden_size)
    attention_mask = inputs['attention_mask'].unsqueeze(-1)  # Shape: (batch_size, seq_length, 1)
    masked_embeddings = token_embeddings * attention_mask
    sum_embeddings = masked_embeddings.sum(dim=1)  # Shape: (batch_size, hidden_size)
    valid_token_count = attention_mask.sum(dim=1)  # Shape: (batch_size, 1)
    average_embedding = sum_embeddings / valid_token_count  # Shape: (batch_size, hidden_size)
    return average_embedding.squeeze(0).numpy()

# Function to get Sentence BERT embeddings
def get_sbert_embedding(texts):
    return sentence_model.encode(texts, convert_to_tensor=True)

# Function to calculate cosine similarity between embeddings
def calculate_cosine_similarity(embedding1, embedding2):
    return cosine_similarity(embedding1, embedding2)[0][0]

# Function to calculate average cosine similarity per video
def calculate_average_cosine_similarity(group, embed_func):
    test_texts = group['test_deepfake_analysis'].astype(str).tolist()
    gt_texts = group['GT'].astype(str).tolist()
    
    if not test_texts or not gt_texts:
        return 0.0
    
    test_embeddings = [embed_func(text) for text in test_texts]
    gt_embeddings = [embed_func(text) for text in gt_texts]
    
    # Calculate cosine similarities for each pair of test and ground truth text
    cosine_similarities = [
        calculate_cosine_similarity(test_emb.reshape(1, -1), gt_emb.reshape(1, -1))
        for test_emb, gt_emb in zip(test_embeddings, gt_embeddings)
    ]
    
    # Return the average cosine similarity for the video group
    return sum(cosine_similarities) / len(cosine_similarities) if cosine_similarities else 0.0

# Calculate average cosine similarity per video using BERT
bert_similarities = [
    calculate_average_cosine_similarity(group, get_bert_embedding)
    for _, group in video_groups
]
average_cosine_similarity_bert = sum(bert_similarities) / len(bert_similarities) if bert_similarities else 0.0
print(f'Average Cosine Similarity (BERT): {average_cosine_similarity_bert}')

# Calculate average cosine similarity per video using Sentence BERT
sbert_similarities = []
for _, group in video_groups:
    test_texts = group['test_deepfake_analysis'].astype(str).tolist()
    gt_texts = group['GT'].astype(str).tolist()
    
    if not test_texts or not gt_texts:
        sbert_similarities.append(0.0)
        continue
    
    # Get embeddings for both columns using Sentence BERT
    test_embeddings_sbert = get_sbert_embedding(test_texts)
    gt_embeddings_sbert = get_sbert_embedding(gt_texts)
    
    # Calculate cosine similarities for each pair of test and ground truth text
    cosine_similarities_sbert = [
        calculate_cosine_similarity(test_emb.cpu().unsqueeze(0).numpy(), gt_emb.cpu().unsqueeze(0).numpy())
        for test_emb, gt_emb in zip(test_embeddings_sbert, gt_embeddings_sbert)
    ]
    
    sbert_similarities.append(sum(cosine_similarities_sbert) / len(cosine_similarities_sbert) if cosine_similarities_sbert else 0.0)

average_cosine_similarity_sbert = sum(sbert_similarities) / len(sbert_similarities) if sbert_similarities else 0.0
print(f'Average Cosine Similarity (Sentence BERT): {average_cosine_similarity_sbert}')


Average Cosine Similarity (BERT): 0.6026665626833675
Average Cosine Similarity (Sentence BERT): 0.4046939964821095
