# Object-Aware Video Search System with BLIP & Grounding DINO

This notebook implements a semantic video search engine that:
- Extracts frames from videos with similarity-based filtering
- Detects objects using Grounding DINO (optional)
- Generates captions using BLIP (scene or object-focused)
- Stores embeddings in Pinecone for semantic search
- Enables natural language queries to find exact timestamps

---

## Step 1: Installation & Setup

Clone repository and install all dependencies.

In [None]:
# Clone the repository
!git clone https://github.com/pranavacchu/capstone-BLIP.git
%cd capstone-BLIP

# Install dependencies
print("üì¶ Installing dependencies... This will take 3-5 minutes")
!pip install -q opencv-python-headless pillow numpy pandas tqdm python-dotenv
!pip install -q torch torchvision transformers sentence-transformers
!pip install -q pinecone FlagEmbedding timm supervision

print("\n‚úÖ Installation complete!")

# Check GPU availability
import torch
if torch.cuda.is_available():
    print(f"\nüöÄ GPU detected: {torch.cuda.get_device_name(0)}")
    print(f"   Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("\n‚ö†Ô∏è No GPU detected. Using CPU (slower but functional)")

## Step 2: Apply Hotfixes (If Needed)

Fixes known issues in the pipeline.

In [None]:
print("üîß Applying hotfixes...")

# Hotfix 1: Add deduplicate_embeddings method to TextEmbeddingGenerator
with open('embedding_generator.py', 'r') as f:
    content = f.read()

if 'def deduplicate_embeddings' not in content:
    print("   - Adding deduplicate_embeddings method...")
    marker = '    def get_embedding_statistics'
    if marker in content:
        dedupe_method = '''    def deduplicate_embeddings(self,
                              embedded_frames: List[EmbeddedFrame],
                              similarity_threshold: float = 0.95) -> List[EmbeddedFrame]:
        """
        Remove duplicate embeddings based on similarity threshold
        """
        if not embedded_frames or len(embedded_frames) <= 1:
            return embedded_frames
        
        logger.info(f"Deduplicating {len(embedded_frames)} embeddings with threshold {similarity_threshold}")
        
        embeddings = np.array([ef.embedding for ef in embedded_frames])
        keep_mask = np.ones(len(embedded_frames), dtype=bool)
        
        for i in range(len(embeddings)):
            if not keep_mask[i]:
                continue
            for j in range(i + 1, len(embeddings)):
                if not keep_mask[j]:
                    continue
                similarity = np.dot(embeddings[i], embeddings[j]) if self.normalize else \
                    np.dot(embeddings[i], embeddings[j]) / (np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[j]))
                if similarity >= similarity_threshold:
                    keep_mask[j] = False
        
        unique_frames = [ef for ef, keep in zip(embedded_frames, keep_mask) if keep]
        logger.info(f"Removed {len(embedded_frames) - len(unique_frames)} duplicates, kept {len(unique_frames)} unique")
        return unique_frames

'''
        content = content.replace(marker, dedupe_method + marker)
        with open('embedding_generator.py', 'w') as f:
            f.write(content)
        print("   ‚úì Added deduplicate_embeddings method")
else:
    print("   ‚úì deduplicate_embeddings already exists")

# Hotfix 2: Fix Grounding DINO dtype mismatch
print("   - Fixing Grounding DINO dtype mismatch...")
with open('object_detector.py', 'r') as f:
    content = f.read()

if 'torch_dtype=torch.float32' not in content:
    # Ensure model is loaded with float32
    print("   ‚ö† Grounding DINO should load with float32. Check object_detector.py line 79-82")
else:
    print("   ‚úì Grounding DINO dtype fix already applied")

print("\n‚úÖ Hotfixes complete!")

In [None]:
# Reload modules after hotfixes
import sys
for m in ['object_detector', 'object_caption_pipeline', 'video_search_engine', 'embedding_generator']:
    if m in sys.modules:
        del sys.modules[m]
print("‚úÖ Modules reloaded")

## Step 3: Configure Pinecone Credentials

In [None]:
import os

# Set your Pinecone credentials
PINECONE_API_KEY = "your_api_key_here"  # Replace with your actual key
PINECONE_HOST = "https://your-index-host.pinecone.io"  # Replace with your host
PINECONE_ENVIRONMENT = "us-east-1"

# Write to .env file
with open('.env', 'w') as f:
    f.write(f"PINECONE_API_KEY={PINECONE_API_KEY}\n")
    f.write(f"PINECONE_HOST={PINECONE_HOST}\n")
    f.write(f"PINECONE_ENVIRONMENT={PINECONE_ENVIRONMENT}\n")

print("‚úÖ Configuration saved!")

## Step 4: Test Pinecone Connection

In [None]:
from video_search_engine import VideoSearchEngine

print("üîå Connecting to Pinecone...")
engine = VideoSearchEngine()

# Get database stats
stats = engine.get_index_stats()

print("\n‚úÖ Successfully connected to Pinecone!")
print(f"\nüìä Database Statistics:")
print(f"   Index: capstone")
print(f"   Total vectors: {stats.get('total_vectors', 0):,}")
print(f"   Dimension: {stats.get('dimension', 1024)}")

## Step 5: Upload Video

Choose how to get your video:
1. Upload from computer
2. Download from URL
3. Download from YouTube

In [None]:
from google.colab import files
import subprocess
from urllib.parse import urlparse
import cv2

def upload_from_computer():
    print("üìÅ Please select your video file...")
    uploaded = files.upload()
    if uploaded:
        video_path = list(uploaded.keys())[0]
        print(f"‚úÖ Uploaded: {video_path}")
        return video_path
    print("‚ùå No file uploaded")
    return None

def download_from_url(url):
    filename = os.path.basename(urlparse(url).path) or "downloaded_video.mp4"
    print(f"‚¨áÔ∏è Downloading from URL to {filename}...")
    
    result = subprocess.run(
        ['wget', '-O', filename, url, '--no-check-certificate', '-q', '--show-progress'],
        capture_output=True, timeout=300
    )
    
    if result.returncode == 0 and os.path.exists(filename) and os.path.getsize(filename) > 0:
        print(f"‚úÖ Downloaded: {filename}")
        return filename
    
    # Fallback to curl
    print("üîÑ Trying alternative method (curl)...")
    result = subprocess.run(
        ['curl', '-L', '-o', filename, url, '--silent', '--show-error'],
        capture_output=True, timeout=300
    )
    
    if result.returncode == 0 and os.path.exists(filename) and os.path.getsize(filename) > 0:
        print(f"‚úÖ Downloaded: {filename}")
        return filename
    
    print("‚ùå Download failed")
    return None

def download_from_youtube(url):
    print("‚¨áÔ∏è Downloading from YouTube...")
    subprocess.run(['pip', 'install', '-q', 'yt-dlp'], check=False)
    
    filename = "youtube_video.mp4"
    result = subprocess.run(
        ['yt-dlp', '-f', 'best[ext=mp4]/best', '-o', filename, '--no-playlist', '--quiet', '--progress', url],
        capture_output=True, timeout=600
    )
    
    if result.returncode == 0 and os.path.exists(filename) and os.path.getsize(filename) > 0:
        print(f"‚úÖ Downloaded: {filename}")
        return filename
    
    print("‚ùå YouTube download failed")
    print("üí° Tips: Ensure video is public and not age-restricted")
    return None

def validate_video(video_path):
    if not os.path.exists(video_path):
        return False
    
    file_size = os.path.getsize(video_path) / (1024*1024)
    print(f"\nüìπ Video ready: {video_path} ({file_size:.1f} MB)")
    
    cap = cv2.VideoCapture(video_path)
    if cap.isOpened():
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = frame_count / fps if fps > 0 else 0
        print(f"   Duration: {duration:.1f}s | FPS: {fps:.1f} | Frames: {frame_count:,}")
        cap.release()
        return True
    
    print("‚ö†Ô∏è Unable to read video file. It may be corrupted.")
    return False

# Main upload logic
print("üì§ Choose video source:\n")
print("1. Upload from computer (recommended for small files < 100MB)")
print("2. Download from URL (direct video file)")
print("3. Download from YouTube URL\n")

choice = input("Enter choice (1/2/3): ").strip()
video_path = None

if choice == "1":
    video_path = upload_from_computer()
elif choice == "2":
    url = input("\nEnter video URL: ").strip()
    if url:
        video_path = download_from_url(url)
elif choice == "3":
    url = input("\nEnter YouTube URL: ").strip()
    if url:
        video_path = download_from_youtube(url)
else:
    print("‚ö†Ô∏è Invalid choice")

# Validate
if video_path and validate_video(video_path):
    print("\n‚úÖ Video is ready for processing")
else:
    print("\n‚ùå No valid video available. Please run this cell again.")
    video_path = None

## Step 6: Choose Captioning Method

**Option 1: Standard BLIP** - Faster, generates general scene descriptions  
**Option 2: Object Detection + BLIP** - Slower, focuses on detected objects with attributes

In [None]:
print("Choose captioning method:\n")
print("1. Standard BLIP (faster, general scene captions)")
print("2. Object Detection + BLIP (slower, object-focused)\n")

method_choice = input("Enter choice (1/2, default=1): ").strip() or "1"
use_object_detection = (method_choice == "2")

if use_object_detection:
    print("\n‚ÑπÔ∏è Using Object Detection + BLIP pipeline")
    print("   Detects: bags, laptops, helmets, phones, people, vehicles, etc.")
else:
    print("\n‚ÑπÔ∏è Using Standard BLIP captioning")

## Step 7: Process Video

This will:
1. Extract frames (with similarity filtering)
2. Generate captions (object-focused or scene-based)
3. Create embeddings for semantic search
4. Upload to Pinecone database

**Expected time:**  
- 1 min video: ~2-3 min with GPU  
- 5 min video: ~8-10 min with GPU

In [None]:
import time
from datetime import datetime

if 'video_path' not in locals() or not video_path:
    print("‚ùå No video available. Please run the upload cell first.")
else:
    # Set video name
    video_name = input("Enter a name for this video (or press Enter for auto-name): ").strip()
    if not video_name:
        video_name = f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    print(f"\nüé¨ Processing video: {video_name}")
    print("‚è≥ This will take a few minutes... Please wait.\n")
    print("=" * 60)
    
    start_time = time.time()
    
    try:
        # Process the video
        stats = engine.process_video(
            video_path=video_path,
            video_name=video_name,
            save_frames=False,
            upload_to_pinecone=True,
            use_object_detection=use_object_detection
        )
        
        processing_time = time.time() - start_time
        
        print("\n" + "=" * 60)
        print("\n‚úÖ VIDEO PROCESSING COMPLETE!\n")
        print(f"üìä Processing Statistics:")
        print(f"   Video name: {video_name}")
        print(f"   Frames extracted: {stats['total_frames_extracted']:,}")
        print(f"   Frames with captions: {stats['frames_with_captions']:,}")
        print(f"   Unique embeddings: {stats.get('embeddings_generated', 0):,}")
        print(f"   Uploaded to Pinecone: {stats['embeddings_uploaded']:,}")
        print(f"   Processing time: {processing_time/60:.1f} minutes")
        print(f"   Frame reduction: {stats.get('frame_reduction_percent', 0):.1f}%")
        
        # Save video name for search
        processed_video_name = video_name
        
    except Exception as e:
        print(f"\n‚ùå Error processing video: {e}")
        print("\nüí° Troubleshooting tips:")
        print("   - GPU memory error: Restart runtime and try again")
        print("   - Video format error: Convert to MP4 format")

## Step 8: Search Your Video

Use natural language to find specific content in your video.

**Example queries:**
- "person walking"
- "black bag"
- "someone with a laptop"
- "red backpack"
- "student on phone"

In [None]:
# Single search query
query = input("üîç Enter your search query: ")

print(f"\nSearching for: '{query}'...")
print("=" * 60)

results = engine.search(
    query=query,
    top_k=5,
    similarity_threshold=0.5
)

if results:
    print(f"\n‚úÖ Found {len(results)} results:\n")
    for i, result in enumerate(results, 1):
        print(f"{i}. ‚è±Ô∏è Timestamp: {result['time_formatted']}")
        print(f"   üìù Caption: {result['caption']}")
        print(f"   üìä Confidence: {result['similarity_score']:.1%}")
        print(f"   üé• Video: {result['video_name']}")
        print()
else:
    print("\n‚ùå No results found. Try:")
    print("   - Different search terms")
    print("   - More general queries")
    print("   - Lowering the similarity threshold")

## Step 9: Batch Search (Multiple Queries)

In [None]:
# Define multiple queries
queries = [
    "person walking",
    "backpack",
    "laptop",
    "phone",
    "outdoor scene"
]

print("üîç Running batch search...\n")
print("=" * 60)

batch_results = engine.batch_search(queries, top_k=3)

for query, results in batch_results.items():
    print(f"\nüìå Query: '{query}'")
    print(f"   Found {len(results)} results")
    
    if results:
        for result in results[:2]:  # Show top 2
            print(f"   ‚îî‚îÄ {result['time_formatted']} - {result['caption'][:50]}... ({result['similarity_score']:.0%})")
    else:
        print("   ‚îî‚îÄ No results")

print("\n" + "=" * 60)

## Step 10: Advanced Search with Filters

Search with additional constraints:
- Filter by specific video
- Search within time range
- Adjust confidence threshold

In [None]:
query = input("Enter search query: ")

# Optional: Time range filter
use_time_filter = input("Filter by time range? (y/n): ").lower() == 'y'
time_window = None
if use_time_filter:
    start_time = float(input("Start time (seconds): "))
    end_time = float(input("End time (seconds): "))
    time_window = (start_time, end_time)

# Optional: Video filter
video_filter = None
if 'processed_video_name' in locals():
    filter_video = input(f"Search only in '{processed_video_name}'? (y/n): ").lower() == 'y'
    if filter_video:
        video_filter = processed_video_name

# Perform search
print(f"\nüîç Searching with filters...")
results = engine.search(
    query=query,
    top_k=10,
    similarity_threshold=0.4,
    video_filter=video_filter,
    time_window=time_window
)

print(f"\n‚úÖ Found {len(results)} results:\n")
for i, result in enumerate(results, 1):
    print(f"{i}. {result['time_formatted']} - {result['caption'][:60]}... ({result['similarity_score']:.1%})")

## Step 11: Interactive Search Interface

Enter queries continuously. Type 'quit' to exit.

In [None]:
print("üéØ INTERACTIVE VIDEO SEARCH")
print("=" * 60)
print("Enter your search queries (type 'quit' to exit)\n")

while True:
    query = input("\nüîç Search: ").strip()
    
    if query.lower() in ['quit', 'exit', 'q']:
        print("\nüëã Goodbye!")
        break
    
    if not query:
        continue
    
    results = engine.search(query, top_k=5)
    
    if results:
        print(f"\n‚úÖ Found {len(results)} results:")
        for i, result in enumerate(results, 1):
            score_emoji = "üü¢" if result['similarity_score'] > 0.7 else "üü°" if result['similarity_score'] > 0.5 else "üü†"
            print(f"\n{i}. {score_emoji} {result['time_formatted']} ({result['similarity_score']:.0%})")
            print(f"   {result['caption']}")
    else:
        print("\n‚ùå No results found. Try a different query.")

## Cleanup (Optional)

Free up GPU memory after processing.

In [None]:
# Clear GPU cache and unload models
engine.cleanup()
print("‚úÖ Resources cleaned up")