In [None]:
# Cell 00: Robust Environment Setup with Force Installation

import sys
import subprocess
import importlib

# 0. Function to force package installation with system override
def force_install_package(package_spec):
    """Force installation ignoring system packages"""
    subprocess.check_call([sys.executable, '-m', 'pip', 'install',
                          '--force-reinstall', '--no-deps', package_spec])

def install_with_deps(package_spec):
    """Install package with dependencies"""
    subprocess.check_call([sys.executable, '-m', 'pip', 'install',
                          '--force-reinstall', package_spec])

# 1. Upgrade pip first
print("--- Step 0: Upgrading pip ---")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--upgrade', 'pip'])

# 2. CRITICAL: Uninstall Colab's default packages that conflict
print("\n--- Step 1: Removing conflicting Colab defaults ---")
# Uninstall packages that Colab pre-installs which conflict with our requirements
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y',
               'Pillow', 'numpy', 'protobuf'], capture_output=True)

# 3. Install core dependencies with explicit force
print("\n--- Step 2: Installing critical base packages ---")
# Install NumPy 1.26.4 first as many packages depend on it
force_install_package('numpy==1.26.4')

# Install Pillow 9.5.0 to fix the is_directory error
force_install_package('Pillow==9.5.0')

# Install protobuf early (use 4.x since MediaPipe is removed)
force_install_package('protobuf==4.25.3')

# *** NEW STEP: Install a stable google-api-core ***
print("\n--- Step 2.5: Installing stable google-api-core ---")
install_with_deps('google-api-core[grpc]~=2.11.1') # Or another stable 2.x like 2.15.0

# 4. Verify critical packages before proceeding
print("\n--- Verification 1: Base packages ---")
importlib.invalidate_caches()
import numpy
import PIL
import google.protobuf
import google.api_core # Verify this new addition

print(f"NumPy version: {numpy.__version__}")
print(f"Pillow version: {PIL.__version__}")
print(f"Protobuf version: {google.protobuf.__version__}")
print(f"google-api-core version: {google.api_core.__version__}") # Check its version

assert numpy.__version__ == "1.26.4", f"NumPy version mismatch: {numpy.__version__}"
assert PIL.__version__ == "9.5.0", f"Pillow version mismatch: {PIL.__version__}"
assert google.api_core.__version__.startswith("2.11.1"), f"google-api-core version mismatch: {google.api_core.__version__}"

# Test critical imports
try:
    from PIL import ImageFont
    print("✅ PIL.ImageFont imported successfully")
except ImportError as e:
    print(f"❌ PIL.ImageFont import failed: {e}")
    raise

# 5. Install PyTorch with CUDA support
print("\n--- Step 3: Installing PyTorch stack ---")
torch_cmd = [sys.executable, '-m', 'pip', 'install',
             'torch==2.2.1+cu118', 'torchvision==0.17.1+cu118',
             'torchaudio==2.2.1+cu118',
             '--index-url', 'https://download.pytorch.org/whl/cu118']
subprocess.check_call(torch_cmd)

# 6. Install transformers ecosystem with specific versions
print("\n--- Step 4: Installing Transformers stack ---")
# Quote version constraints to avoid shell interpretation
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "transformers>=4.30.0,<4.41.0"])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "huggingface-hub>=0.20.0"])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "tokenizers>=0.14.0"])

# 7. Install ftfy before open-clip
print("\n--- Step 5: Installing ftfy and open-clip ---")
install_with_deps('ftfy>=6.0')
force_install_package('open-clip-torch==2.23.0')

# 8. Verify torch and open-clip
print("\n--- Verification 2: Torch and vision stack ---")
importlib.invalidate_caches()
try:
    import torch
    import open_clip
    print(f"✅ PyTorch version: {torch.__version__}")
    print("✅ open-clip imported successfully")
except ImportError as e:
    print(f"❌ Import error: {e}")
    raise

# 9. Install Google Cloud Libraries
print("\n--- Step 6: Installing Google Generative AI ---")
install_with_deps('google-generativeai==0.5.2')
# install_with_deps('google-cloud-vision~=3.4')

# 10. Pre-install compatible versions for Whisper dependencies
print("\n--- Step 7: Pre-installing Whisper dependencies ---")
# Install numba compatible with numpy 1.26.4
install_with_deps('numba==0.58.1')

# Install spacy 3.4.4 to get thinc 8.1.x (compatible with numpy 1.26.4)
# This prevents whisper from pulling thinc 8.3.6 which requires numpy 2.x
install_with_deps('spacy==3.4.4')
install_with_deps('thinc>=8.1.0,<8.2.0')

# 11. Install Whisper
print("\n--- Step 8: Installing OpenAI Whisper ---")
install_with_deps('openai-whisper==20231117')

# 12. Install remaining utilities
print("\n--- Step 9: Installing other utilities ---")
install_with_deps('ffmpeg-python==0.2.0')
install_with_deps('opencv-python-headless==4.9.0.80')
install_with_deps('nest-asyncio==1.6.0')

# 13. Force reinstall our exact versions one more time to ensure they stick
print("\n--- Step 10: Final version enforcement ---")
force_install_package('numpy==1.26.4')
force_install_package('Pillow==9.5.0')

# 14. Final comprehensive verification
print("\n--- FINAL VERIFICATION ---")
importlib.invalidate_caches()

overall_setup_ok_final = True

def verify_import(module_name, version_attr='__version__', expected_version=None, critical=False):
    global overall_setup_ok_final # Use the renamed global
    try:
        module = importlib.import_module(module_name)
        version = getattr(module, version_attr, 'N/A')
        status = "✅"
        message = f"{module_name}: {version}"
        if expected_version:
            message += f" (expected: {expected_version})"
            if version != expected_version:
                status = "⚠️"
                if critical: overall_setup_ok_final = False # Fail build on critical mismatch
        print(f"{status} {message}")

        if module_name == "PIL" and expected_version == "9.5.0" and version == "9.5.0":
             from PIL import ImageFont # Test only if Pillow is our target version
             print("    ✅ PIL.ImageFont works with Pillow 9.5.0")
        return True
    except ImportError as e:
        print(f"❌ {module_name}: Import failed - {e}")
        if critical: overall_setup_ok_final = False
        return False
    except Exception as e_gen:
        print(f"❌ {module_name}: Verification error - {e_gen}")
        if critical: overall_setup_ok_final = False
        return False

# Critical version checks
verify_import('PIL', expected_version='9.5.0', critical=True)
verify_import('numpy', expected_version='1.26.4', critical=True)
verify_import('google.api_core', expected_version='2.11.1', critical=True) # Verify pinned GAC

# Other important checks
verify_import('torch', expected_version='2.2.1+cu118')
verify_import('transformers') # No strict version, just check import
verify_import('huggingface_hub') # No strict version
verify_import('open_clip')
verify_import('whisper', expected_version='20231117') # package version
verify_import('cv2')
verify_import('google.generativeai', version_attr='VERSION', expected_version='0.5.2')
# verify_import('google.cloud.vision') # Vision API client import (commented out)

print("\n" + "="*50)
if overall_setup_ok_final:
    print("✅ Environment setup targeted critical versions. Check ⚠️ for non-critical or resolved versions.")
else:
    print("❌ Critical issues remain in environment setup. Check errors above.")
print("="*50)

--- Step 0: Upgrading pip ---

--- Step 1: Removing conflicting Colab defaults ---

--- Step 2: Installing critical base packages ---

--- Step 2.5: Installing stable google-api-core ---

--- Verification 1: Base packages ---
NumPy version: 1.26.4
Pillow version: 9.5.0
Protobuf version: 4.25.7
google-api-core version: 2.11.1
✅ PIL.ImageFont imported successfully

--- Step 3: Installing PyTorch stack ---

--- Step 4: Installing Transformers stack ---

--- Step 5: Installing ftfy and open-clip ---

--- Verification 2: Torch and vision stack ---
✅ PyTorch version: 2.2.1+cu118
✅ open-clip imported successfully

--- Step 6: Installing Google Generative AI ---

--- Step 7: Pre-installing Whisper dependencies ---

--- Step 8: Installing OpenAI Whisper ---

--- Step 9: Installing other utilities ---

--- Step 10: Final version enforcement ---

--- FINAL VERIFICATION ---
✅ PIL: 9.5.0 (expected: 9.5.0)
    ✅ PIL.ImageFont works with Pillow 9.5.0
✅ numpy: 1.26.4 (expected: 1.26.4)
✅ google.api_co

In [47]:
# Cell 1: Imports, API Key & Model Setup, Verifications

import os
import sys
import json
import time
import uuid
import asyncio
import importlib  # Added missing import
from pathlib import Path
from typing import Dict, Any, List, Optional
from IPython.display import display, Video, Markdown
import io

# Append /content to sys.path to ensure Colab can find our .py files
if '/content' not in sys.path:
    sys.path.append('/content')

# Import custom utility functions
# (Make sure you have uploaded these .py files to your Colab environment's /content/ directory)
try:
    import video
    import models
    import signals
    import gemini
    import fusion
    print("✅ Custom utility modules imported successfully.")
except ImportError as e:
    print(f"❌ Error importing utility modules: {e}")
    print("Ensure helper files are uploaded to /content/ and have correct typing imports:")
    print("  - video.py needs: from typing import List, Tuple, Optional")
    print("  - models.py needs: from typing import List, Dict, Any, Optional")
    print("  - signals.py needs: from typing import List, Dict, Optional, Tuple, Any")
    print("  - gemini.py needs: from typing import List, Tuple, Any")
    raise  # Stop execution if utilities can't be imported

# Standard ML/AI library imports
import torch
import numpy
import PIL
import open_clip
import whisper
import transformers
import huggingface_hub
import google.protobuf
from google.cloud import vision
import google.generativeai as genai
import nest_asyncio

# Apply nest_asyncio for running asyncio code in Colab cells
nest_asyncio.apply()
print(f"\nTorch CUDA available: {torch.cuda.is_available()}")
print(f"Current Python version: {sys.version.split()[0]}")

# --- AUTHENTICATE GOOGLE CLOUD SERVICES ---
print("\n--- Authenticating Google Cloud Services ---")
try:
    from google.colab import auth
    auth.authenticate_user()
    print("✅ Google Cloud authentication successful")

    # Set the project ID
    import os
    os.environ['GOOGLE_CLOUD_PROJECT'] = 'fakecheck-461121'
    print("✅ Set Google Cloud project to: fakecheck-461121")

except ImportError:
    print("⚠️ Not running in Google Colab - please set up authentication manually")
except Exception as e:
    print(f"❌ Error during Google Cloud authentication: {e}")

# --- Verification of Critical Library Versions ---
print("\n--- Verifying critical library versions after all imports ---")
critical_versions_ok = True

def check_version(module_name_to_check, expected_version, actual_module_instance=None, version_attr='__version__'):
    """Check module version and report status"""
    global critical_versions_ok
    try:
        # If actual_module_instance is provided, use it; otherwise import the module
        if actual_module_instance is not None:
            module = actual_module_instance
        else:
            module = importlib.import_module(module_name_to_check)

        version = getattr(module, version_attr, 'N/A')

        if expected_version:
            if version == expected_version:
                print(f"✅ {module_name_to_check} version: {version} (Matches target)")
            else:
                print(f"⚠️ {module_name_to_check} version: {version} (Target: {expected_version} - MISMATCH!)")
                if module_name_to_check in ["PIL", "numpy"]:  # These are critical mismatches
                    critical_versions_ok = False
        else:
            # No expected version provided, just print current version
            print(f"ℹ️ {module_name_to_check} version: {version}")

    except ImportError:
        print(f"❌ {module_name_to_check} not imported for version check.")
        critical_versions_ok = False
    except Exception as e:
        print(f"❌ Error checking version for {module_name_to_check}: {e}")
        critical_versions_ok = False

# Check critical versions
check_version("PIL", "9.5.0", actual_module_instance=PIL)
try:
    from PIL import ImageFont
    print("    ✅ PIL.ImageFont imported successfully (Pillow integrity check).")
except ImportError as e_font:
    print(f"    ❌ PIL.ImageFont import ERROR: {e_font}")
    critical_versions_ok = False

check_version("numpy", "1.26.4", actual_module_instance=numpy)
check_version("torch", "2.2.1+cu118", actual_module_instance=torch)

# For these, we don't enforce specific versions anymore
check_version("transformers", expected_version=None, actual_module_instance=transformers)
check_version("huggingface_hub", expected_version=None, actual_module_instance=huggingface_hub)
check_version("google.protobuf", expected_version=None, actual_module_instance=google.protobuf)
check_version("open_clip", expected_version=None, actual_module_instance=open_clip)
check_version("whisper", expected_version="1.1.10", actual_module_instance=whisper)
check_version("google.generativeai", expected_version="0.5.2", actual_module_instance=genai, version_attr='__version__')

# For google.cloud.vision, check if it's importable
try:
    import google.cloud.vision
    print("ℹ️ google.cloud.vision: Imported successfully")
except ImportError as e:
    print(f"❌ google.cloud.vision: Import failed - {e}")
    critical_versions_ok = False

if not critical_versions_ok:
    print("🔥🔥🔥 WARNING: Critical Pillow or NumPy versions do not match targets. This may lead to errors.")

# --- Load API Keys & Initialize Models ---
print("\n--- Configuring Models and API Keys ---")
GEMINI_API_KEY = None
try:
    from google.colab import userdata
    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
    if not GEMINI_API_KEY:
        print("⚠️ GEMINI_API_KEY not found in Colab Secrets. Gemini features will be disabled.")
    else:
        genai.configure(api_key=GEMINI_API_KEY)
        print("✅ Gemini API configured for Generative AI.")
except ImportError:  # Not in Colab
    GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
    if GEMINI_API_KEY:
        genai.configure(api_key=GEMINI_API_KEY)
        print("✅ Gemini API configured from OS env.")
    else:
        print("⚠️ GEMINI_API_KEY OS environment variable not found.")

# Device selection
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

# Initialize models (this can take a moment)
CLIP_MODEL, CLIP_PREPROCESS_FN = None, None
try:
    CLIP_MODEL, _, CLIP_PREPROCESS_FN = open_clip.create_model_and_transforms(
        "ViT-L-14", pretrained="laion2b_s32b_b82k", device=DEVICE
    )
    CLIP_MODEL.eval()
    print("✅ CLIP Model (ViT-L-14) loaded.")
except Exception as e:
    print(f"❌ Error loading CLIP model: {e}")

WHISPER_ASR_MODEL = None
try:
    WHISPER_ASR_MODEL = whisper.load_model("base.en", device=DEVICE)
    print("✅ Whisper Model (base.en) loaded.")
except Exception as e:
    print(f"❌ Error loading Whisper model: {e}")
    if "numpy.dtype size changed" in str(e):
        print("    This Whisper load error is likely due to a NumPy ABI mismatch. Ensure NumPy 1.26.4 is active.")

GEMINI_MODEL_INSTANCE = None
if GEMINI_API_KEY:
    try:
        GEMINI_MODEL_INSTANCE = genai.GenerativeModel("gemini-2.5-pro-preview-05-06")
        print(f"✅ Gemini Model ('{GEMINI_MODEL_INSTANCE.model_name}') initialized for generative tasks.")
    except Exception as e:
        print(f"❌ Error initializing Gemini model: {e}")
else:
    print("⏩ Gemini generative model not initialized (API key missing/config error).")

VISION_API_CLIENT = None
try:
    VISION_API_CLIENT = vision.ImageAnnotatorClient()
    print("✅ Google Cloud Vision API client initialized.")
except Exception as e:
    print(f"❌ Error initializing Google Cloud Vision API client: {e}")
    print("   Ensure you have authenticated with 'auth.authenticate_user()' above")
    print("   and that the Vision API is enabled in your GCP project.")

# Check if all essential models for the current pipeline are loaded
essential_models_loaded_simplified = all([CLIP_MODEL, WHISPER_ASR_MODEL, VISION_API_CLIENT])

if not essential_models_loaded_simplified:
    print("🔥🔥🔥 WARNING: One or more essential ML/Cloud models (CLIP, Whisper, Vision API) failed to load.")
    print("     Pipeline will be severely limited or fail.")
else:
    print("✅✅✅ All currently essential models/clients appear to be initialized.")

# Print final status
print("\n" + "="*60)
print("ENVIRONMENT STATUS SUMMARY:")
print("="*60)
print(f"✓ Pillow 9.5.0: {'YES' if PIL.__version__ == '9.5.0' else 'NO - ' + PIL.__version__}")
print(f"✓ NumPy 1.26.4: {'YES' if numpy.__version__ == '1.26.4' else 'NO - ' + numpy.__version__}")
print(f"✓ CLIP Model: {'YES' if CLIP_MODEL else 'NO'}")
print(f"✓ Whisper Model: {'YES' if WHISPER_ASR_MODEL else 'NO'}")
print(f"✓ Vision API: {'YES' if VISION_API_CLIENT else 'NO'}")
print(f"✓ Gemini Model: {'YES' if GEMINI_MODEL_INSTANCE else 'NO (optional)'}")
print("="*60)

✅ Custom utility modules imported successfully.

Torch CUDA available: True
Current Python version: 3.11.12

--- Authenticating Google Cloud Services ---
✅ Google Cloud authentication successful
✅ Set Google Cloud project to: fakecheck-461121

--- Verifying critical library versions after all imports ---
✅ PIL version: 9.5.0 (Matches target)
    ✅ PIL.ImageFont imported successfully (Pillow integrity check).
✅ numpy version: 1.26.4 (Matches target)
✅ torch version: 2.2.1+cu118 (Matches target)
ℹ️ transformers version: 4.40.2
ℹ️ huggingface_hub version: 0.31.4
ℹ️ google.protobuf version: 4.25.7
ℹ️ open_clip version: N/A
⚠️ whisper version: 20231117 (Target: 1.1.10 - MISMATCH!)
✅ google.generativeai version: 0.5.2 (Matches target)
ℹ️ google.cloud.vision: Imported successfully

--- Configuring Models and API Keys ---
✅ Gemini API configured for Generative AI.
Using device: cuda
✅ CLIP Model (ViT-L-14) loaded.
✅ Whisper Model (base.en) loaded.
✅ Gemini Model ('models/gemini-2.5-pro-preview

In [48]:
# Cell 2: Helper function to run async code from notebook ---
def awaitable(coroutine_to_run):
    try:
        loop = asyncio.get_running_loop()
        return loop.run_until_complete(coroutine_to_run)
    except RuntimeError: # No event loop running
        return asyncio.run(coroutine_to_run)

In [45]:
# Cell 3: Main Detection Pipeline Function

import numpy as np  # Add this import for np alias

async def run_full_deepfake_detection(
    video_file_path: str,
    output_dir_base: str = "/content/detections"  # Output directory in Colab
) -> Dict[str, Any]:
    """
    Main simplified deepfake detection pipeline.
    """
    if not os.path.exists(video_file_path):
        return {"error": f"Input video not found: {video_file_path}"}

    video_basename = os.path.basename(video_file_path)
    run_id = f"{os.path.splitext(video_basename)[0]}_{uuid.uuid4().hex[:6]}"
    # output_dir = os.path.join(output_dir_base, run_id)  # No overlay, so dir might not be used much
    # os.makedirs(output_dir, exist_ok=True)  # Create if we save other artifacts later

    detection_results = {
        "input_video": video_basename,
        "run_id": run_id,
        "pipeline_version": "simplified_v1_cloud_vision_blinks" # Change back to simplified_v1_cloud_vision_blinks when using google-vision-api
    }
    temp_audio_path: Optional[str] = None
    processed_frames_pil: List[PIL.Image.Image] = []

    try:
        print(f"\nProcessing: {video_basename}")
        # 1. Sample Video & Audio
        target_fps = 8  # FPS for internal processing for CLIP/Whisper
        max_video_duration = 30  # seconds

        processed_frames_pil, temp_audio_path, original_duration, processed_duration = \
            video.sample_video_content(video_file_path,
                                                target_fps=target_fps,
                                                max_duration_sec=max_video_duration)

        detection_results["video_original_duration_sec"] = round(original_duration, 2)
        detection_results["video_processed_duration_sec"] = round(processed_duration, 2)
        detection_results["num_frames_sampled_for_clip_whisper"] = len(processed_frames_pil)

        if not processed_frames_pil:
            raise RuntimeError("Frame sampling returned no frames.")

        # 2. CLIP Visual Score
        score_visual_clip = 0.0  # Default if model fails
        if CLIP_MODEL and CLIP_PREPROCESS_FN:
            score_visual_clip = models.calculate_visual_clip_score(
                processed_frames_pil, CLIP_MODEL, CLIP_PREPROCESS_FN, DEVICE
            )
        detection_results["score_visual_clip"] = round(score_visual_clip, 3)

        # 3. Whisper ASR
        transcription_text = ""
        if WHISPER_ASR_MODEL and temp_audio_path:
            transcription_data = models.transcribe_audio_content(
                temp_audio_path, WHISPER_ASR_MODEL
            )
            transcription_text = transcription_data["text"]
        detection_results["transcript_snippet"] = transcription_text[:150] + "..." if transcription_text else "[No Speech/Audio Error]"

        # BERT Score Removed

        # rPPG Score Removed

        # Commented out until Google Vision API usage is fixed
        # # 4. Eye Blink Score (using Google Cloud Vision API)
        # score_blink = 0.5  # Neutral default
        # if VISION_API_CLIENT and processed_frames_pil:
        #     # TEMPORARY: Only send 1 frame for testing
        #     print("\n⚠️ TESTING MODE: Sending only 1 frame to Vision API")

        #     # Select middle frame for testing
        #     test_frame_index = len(processed_frames_pil) // 2
        #     frames_for_vision_api_pil = [processed_frames_pil[test_frame_index]]

        #     # Original code for reference (commented out for testing):
        #     # MAX_FRAMES_FOR_VISION_API = 30
        #     # frames_for_vision_api_pil: List[PIL.Image.Image]
        #     # if len(processed_frames_pil) > MAX_FRAMES_FOR_VISION_API:
        #     #     indices = np.linspace(0, len(processed_frames_pil) - 1, MAX_FRAMES_FOR_VISION_API, dtype=int)
        #     #     frames_for_vision_api_pil = [processed_frames_pil[i] for i in indices]
        #     # else:
        #     #     frames_for_vision_api_pil = processed_frames_pil

        #     frames_for_vision_api_bytes: List[bytes] = []
        #     for frame_pil_img in frames_for_vision_api_pil:
        #         byte_arr = io.BytesIO()
        #         frame_pil_img.save(byte_arr, format='JPEG', quality=85)
        #         frames_for_vision_api_bytes.append(byte_arr.getvalue())

        #     print(f"Sending {len(frames_for_vision_api_bytes)} frame(s) to Google Cloud Vision API for blink detection...", file=sys.stderr)

        #     # The duration for blink rate calculation should be the duration spanned by frames_for_vision_api_pil
        #     # For testing with 1 frame, we'll use a small duration
        #     blink_segment_duration = 0.125  # 1/8 second for single frame test

        #     # Original duration calculation (commented out for testing):
        #     # blink_segment_duration = processed_duration

        #     # Call the async helper function
        #     try:
        #         vision_landmarks_per_frame = await signals.get_eye_landmarks_from_vision_api(
        #             frames_for_vision_api_bytes, VISION_API_CLIENT
        #         )

        #         if vision_landmarks_per_frame:  # Check if any landmarks were returned
        #             print(f"✅ Vision API returned landmarks for {len(vision_landmarks_per_frame)} frame(s)")
        #             score_blink = signals.calculate_blink_score_from_vision_api(
        #                 vision_landmarks_per_frame,
        #                 video_segment_duration_sec=blink_segment_duration
        #             )
        #         else:
        #             print("⚠️ Vision API returned no landmarks")
        #     except Exception as e:
        #         print(f"❌ Vision API call failed: {str(e)}")
        #         import traceback
        #         traceback.print_exc()

        # detection_results["score_blink_rate_vision_api"] = round(score_blink, 3)

        # 4. Gemini Inspections (Visual, Lipsync, AND Blinks)
        flag_gemini_visual, flag_gemini_lipsync, flag_gemini_blinks = 0, 0, 0 # Defaults
        if GEMINI_MODEL_INSTANCE:
            # Ensure 'gemini' module (gemini.py) is used
            flag_gemini_visual, flag_gemini_lipsync, flag_gemini_blinks = \
                await gemini.run_gemini_inspections(
                    processed_frames_pil, video_file_path, transcription_text, GEMINI_MODEL_INSTANCE
                )
        detection_results["flag_gemini_visual_artifact"] = flag_gemini_visual
        detection_results["flag_gemini_lipsync_issue"] = flag_gemini_lipsync
        detection_results["flag_gemini_abnormal_blinks"] = flag_gemini_blinks # New flag

        # 5. Fuse Scores
        # Ensure 'fusion' module (fusion.py) is used
        final_confidence, final_label, anomaly_tags_list = fusion.fuse_detection_scores(
            score_visual_clip,
            flag_gemini_visual,
            flag_gemini_lipsync,
            flag_gemini_blinks # Pass the new Gemini blink flag
        )
        detection_results["deepfake_confidence_overall"] = final_confidence
        detection_results["final_predicted_label"] = final_label
        detection_results["anomaly_tags_detected"] = anomaly_tags_list

        detection_results["overlay_video_path"] = "N/A (Overlay generation removed)"

    except Exception as e:
        import traceback
        error_message = f"Pipeline error for {video_basename}: {str(e)}"
        print(f"{error_message}\n{traceback.format_exc()}", file=sys.stderr)
        detection_results["error"] = error_message
    finally:
        if temp_audio_path and os.path.exists(temp_audio_path):
            try: os.remove(temp_audio_path)
            except OSError: pass

    return detection_results


In [49]:
# Cell 4 : Example Usage  (visual-artifact check only)

from pathlib import Path
import os, sys, time, json, asyncio, importlib
from IPython.display import Markdown
import gemini                      # module with v1.7 code
importlib.reload(gemini)           # ensure fresh version every run

# ------------------------------------------------------------------
# Monkey-patch gemini.run_gemini_inspections only once
# ------------------------------------------------------------------
if not getattr(gemini, "_visual_only_patched", False):
    _orig_runner = gemini.run_gemini_inspections

    async def _visual_only(frames, video, transcript, model, **kwargs):
        return await _orig_runner(
            frames, video, transcript, model,
            enable_visual_artifacts=True,
            enable_lipsync=True,
            enable_abnormal_blinks=True,
        )

    gemini.run_gemini_inspections = _visual_only
    gemini._visual_only_patched = True   # sentinel to avoid double-patch
# ------------------------------------------------------------------

# --- Choose / validate test video ---------------------------------
Path("/content/videos_for_testing").mkdir(parents=True, exist_ok=True)
TEST_VIDEO_PATH = "/content/videos_for_testing/Puppramin.mp4"

if not os.path.exists(TEST_VIDEO_PATH) or os.path.getsize(TEST_VIDEO_PATH) < 1000:
    sample = "/usr/local/lib/python3.10/dist-packages/google/colab/files/video_player_test.mp4"
    if os.path.exists(sample) and os.path.getsize(sample) > 1000:
        print(f"Using Colab sample video: {sample}")
        TEST_VIDEO_PATH = sample
    else:
        print(f"⚠️  No valid test video. Upload one to {TEST_VIDEO_PATH}.", file=sys.stderr)

# ------------------------------------------------------------------
async def run_pipeline_and_display():
    if not (os.path.exists(TEST_VIDEO_PATH) and os.path.getsize(TEST_VIDEO_PATH) > 1000):
        display(Markdown("### ⚠️ Pipeline aborted – missing test video"))
        return

    # verify Cell 1 models
    if not all([CLIP_MODEL, WHISPER_ASR_MODEL, GEMINI_MODEL_INSTANCE]):
        display(Markdown("### ⚠️ Pipeline aborted – models not initialised"))
        return

    print(f"\n>>> Starting detection for: {TEST_VIDEO_PATH} <<<")
    t0 = time.time()

    result = await run_full_deepfake_detection(TEST_VIDEO_PATH)

    result["notebook_total_processing_time_sec"] = round(time.time() - t0, 2)

    display(Markdown("#### Final result"))
    display(Markdown(f"```json\n{json.dumps(result, indent=2)}\n```"))

# ------------------------------------------------------------------
if __name__ == "__main__" and "google.colab" in sys.modules:
    print("Running detection pipeline demo (visual-artifact Gemini check only)…")

    if 'awaitable' not in globals():
        def awaitable(coro):
            try:
                loop = asyncio.get_running_loop()
                return loop.run_until_complete(coro)
            except RuntimeError:
                return asyncio.run(coro)

    awaitable(run_pipeline_and_display())
    print("\nDemo finished. Check results above.")


# # --- Define video path for testing ---
# # Option A: Upload a video manually to /content/ and set its path here
# # e.g., from google.colab import files; uploaded = files.upload()
# # TEST_VIDEO_PATH = list(uploaded.keys())[0]

# # Option B: Place video in /content/videos_for_testing/
# Path("/content/videos_for_testing").mkdir(parents=True, exist_ok=True)
# # TEST_VIDEO_PATH = "/content/videos_for_testing/fake_test.mp4"  # <--- CHANGE THIS TO YOUR UPLOADED VIDEO
# TEST_VIDEO_PATH = "/content/videos_for_testing/Puppramin.mp4"  # <--- CHANGE THIS TO YOUR UPLOADED VIDEO

# # Check if the test video exists, otherwise try a Colab default or inform user
# if not os.path.exists(TEST_VIDEO_PATH) or os.path.getsize(TEST_VIDEO_PATH) < 1000 : # Min 1KB
#     print(f"Test video '{TEST_VIDEO_PATH}' not found, empty, or too small.", file=sys.stderr)
#     # Attempt to find a Colab sample if primary test video isn't there
#     colab_default_samples = [
#         '/usr/local/lib/python3.10/dist-packages/google/colab/files/video_player_test.mp4', # Common path
#         # Add other potential Colab sample paths if known
#     ]
#     found_sample = False
#     for sample_path in colab_default_samples:
#         if os.path.exists(sample_path) and os.path.getsize(sample_path) > 1000:
#             TEST_VIDEO_PATH = sample_path
#             print(f"Using Colab default sample video for demo: {TEST_VIDEO_PATH}")
#             found_sample = True
#             break
#     if not found_sample:
#         print(f"No valid test video found. Please upload a video to '{TEST_VIDEO_PATH}' (or similar) and update the path.")
#         # To allow the rest of the cell to run without immediate error if no video,
#         # but the pipeline itself will fail if TEST_VIDEO_PATH is not valid.
#         if not os.path.exists(TEST_VIDEO_PATH): Path(TEST_VIDEO_PATH).touch() # Creates an empty file

# async def run_pipeline_and_display():
#     if not (os.path.exists(TEST_VIDEO_PATH) and os.path.getsize(TEST_VIDEO_PATH) > 1000): # Check if file > 1KB
#         display(Markdown(f"### ⚠️ Pipeline Aborted \n**Reason:** Test video at `{TEST_VIDEO_PATH}` is not valid or too small. Please upload a real video and update the path."))
#         return

#     # Check if essential models from Cell 1 loaded correctly
#     current_essential_models = all([CLIP_MODEL, WHISPER_ASR_MODEL, GEMINI_MODEL_INSTANCE])

#     if not current_essential_models:
#         missing_models_str = []
#         if not CLIP_MODEL: missing_models_str.append("CLIP_MODEL")
#         if not WHISPER_ASR_MODEL: missing_models_str.append("WHISPER_ASR_MODEL")
#         if not GEMINI_MODEL_INSTANCE: missing_models_str.append("GEMINI_MODEL_INSTANCE (needed for blinks and other checks)")
#         display(Markdown(f"### ⚠️ Pipeline Aborted \n**Reason:** Not all essential models/clients were initialized successfully in Cell 1: {', '.join(missing_models_str)}. Please check errors in Cell 1."))
#         return

#     print(f"\n>>> Starting detection for: {TEST_VIDEO_PATH} <<<")
#     start_time_total = time.time()

#     # Since run_full_deepfake_detection is async, we can await it directly
#     result = await run_full_deepfake_detection(TEST_VIDEO_PATH)

#     end_time_total = time.time()
#     result["notebook_total_processing_time_sec"] = round(end_time_total - start_time_total, 2)

#     print("\n--- FINAL DETECTION RESULT (Notebook) ---")
#     # Pretty print JSON
#     display(Markdown(f"```json\n{json.dumps(result, indent=2)}\n```"))

#     if "error" in result:
#         display(Markdown(f"\n**⚠️ An error occurred during processing:** {result['error']}"))
#     else:
#         display(Markdown(f"\n**Processed Video:** `{result.get('input_video', 'N/A')}`"))
#         display(Markdown(f"**Predicted Label:** `{result.get('final_predicted_label', 'N/A')}`"))
#         display(Markdown(f"**Overall Deepfake Confidence:** `{result.get('deepfake_confidence_overall', 'N/A')}`"))
#         display(Markdown(f"**Anomaly Tags:** `{', '.join(result.get('anomaly_tags_detected', [])) if result.get('anomaly_tags_detected') else 'None'}`"))

# # Run the main processing and display
# if __name__ == "__main__" and "google.colab" in sys.modules:
#     print("Running detection pipeline demo...")

#     # Use the awaitable helper from Cell 2 to run the async function
#     # If awaitable is not defined in this scope, use it from globals or redefine
#     if 'awaitable' not in globals():
#         def awaitable(coroutine_to_run):
#             try:
#                 loop = asyncio.get_running_loop()
#                 return loop.run_until_complete(coroutine_to_run)
#             except RuntimeError:
#                 return asyncio.run(coroutine_to_run)
#             except Exception as e:
#                 import traceback
#                 traceback.print_exc()
#                 display(Markdown(f"**⚠️ Uncaught exception during pipeline:** {str(e)}"))

#     # Run the async function using awaitable
#     awaitable(run_pipeline_and_display())

#     print("\nDemo finished. Check results above.")
#     print(f"Output files (if any, like logs) might be in subdirectories under /content/detections/")

Running detection pipeline demo (visual-artifact Gemini check only)…

>>> Starting detection for: /content/videos_for_testing/Puppramin.mp4 <<<

Processing: Puppramin.mp4


Info: Original video duration 72.17s exceeds max_duration_sec 30s. Processing only the first 30.00s.
FFmpeg extracted 240 frames (target max: 240).
100%|██████████| 3000/3000 [00:01<00:00, 2497.05frames/s]
GEMINI_REPLY (gemini_check_visual_artifacts): NO
GEMINI_WARN: connection reset – retry 1/2 in 3s


#### Final result

```json
{
  "input_video": "Puppramin.mp4",
  "run_id": "Puppramin_7378a3",
  "pipeline_version": "simplified_v1_cloud_vision_blinks",
  "video_original_duration_sec": 72.17,
  "video_processed_duration_sec": 30.0,
  "num_frames_sampled_for_clip_whisper": 240,
  "score_visual_clip": 0.56,
  "transcript_snippet": "I tried everything from my depression. Nothing worked. Every day felt heavy. I felt trapped. Then I tried pup-er-min. Our prescription helps your body...",
  "flag_gemini_visual_artifact": 0,
  "flag_gemini_lipsync_issue": 1,
  "flag_gemini_abnormal_blinks": 1,
  "deepfake_confidence_overall": 0.574,
  "final_predicted_label": "UNCERTAIN",
  "anomaly_tags_detected": [
    "GEMINI_LIPSYNC_ISSUE",
    "GEMINI_ABNORMAL_BLINKS"
  ],
  "overlay_video_path": "N/A (Overlay generation removed)",
  "notebook_total_processing_time_sec": 135.69
}
```


Demo finished. Check results above.
