In [None]:
!pip install git+https://github.com/Breakthrough/PySceneDetect.git
!pip install scenedetect
!pip install scenedetect
!pip install deepface
!pip install torch torchvision torchaudio
!pip install transformers
!pip install opencv-python-headless
!pip install ultralytics
!pip install matplotlib


In [None]:
from pyngrok import ngrok, conf

# Add this at the beginning of your run_server() function
def run_server():
    # Clean up any existing tunnels
    try:
        ngrok.kill()
    except:
        pass

    # Set ngrok configuration
    conf.get_default().auth_token = "your-ngrok-token"
    conf.get_default().region = "us"  # or other preferred region

    # Create tunnel with retry logic
    max_retries = 3
    for attempt in range(max_retries):
        try:
            public_url = ngrok.connect(8000, bind_tls=True).public_url
            print(f" * Public URL: {public_url}")
            break
        except Exception as e:
            if attempt == max_retries - 1:
                print("Failed to establish ngrok tunnel after multiple attempts")
                print("You can still access the server locally at http://localhost:8000")
                public_url = None
            else:
                print(f"Retrying ngrok connection (attempt {attempt + 1})")
                time.sleep(2)
                continue

    nest_asyncio.apply()
    uvicorn.run(app, host="0.0.0.0", port=8000)

In [None]:
# Install required packages
!pip install fastapi uvicorn python-multipart pyngrok nest-asyncio
!pip install torch transformers ultralytics deepface
!apt-get install -y libgl1-mesa-glx  # For OpenCV

# Import libraries
from fastapi import FastAPI, UploadFile, File, Request, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
import torch
import requests
import cv2
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
from ultralytics import YOLO
from deepface import DeepFace
import numpy as np
import time
import collections
import os
import uvicorn
from pyngrok import ngrok
import nest_asyncio
import shutil
from pathlib import Path

# Initialize FastAPI app
app = FastAPI()

# Allow CORS for frontend development
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Mount static files from Google Drive
drive_path = "/content/drive/MyDrive/Project"
app.mount("/static", StaticFiles(directory=f"{drive_path}/static"), name="static")
templates = Jinja2Templates(directory=drive_path)

# 🔑 Replace with your OpenRouter API Key
API_KEY = "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"

# ✅ Global Model Initialization
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base", torch_dtype=torch.float16)
blip_device = "cuda" if torch.cuda.is_available() else "cpu"
blip_model.to(blip_device)

yolo_model_large = YOLO("yolov8n.pt")
yolo_model_small = YOLO("yolov8n.pt")

# Create uploads directory if not exists
os.makedirs(f"{drive_path}/static/uploads", exist_ok=True)

def get_task_from_prompt(prompt):
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    system_msg = (
        "You're a strict classifier. ONLY respond with one of the following exact words:\n"
        "image_captioning, scene_description, object_detection, object_counting, "
        "question_answering, emotion_detection, video_summarization, real_time_tracking.\n"
        "No explanation. No extra text."
    )
    data = {
        "model": "deepseek/deepseek-chat",
        "messages": [
            {"role": "system", "content": system_msg},
            {"role": "user", "content": prompt}
        ],
        "temperature": 0
    }
    try:
        response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data)
        result = response.json()
        return result['choices'][0]['message']['content'].strip().lower()
    except:
        return "error"

# 🖊️ Generic Chat

def get_small_talk_reply(prompt):
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    data = {
        "model": "deepseek/deepseek-chat",
        "messages": [
            {"role": "system", "content": "You're a helpful and friendly assistant."},
            {"role": "user", "content": prompt}
        ],
        "temperature": 0.7
    }
    try:
        response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data)
        return response.json()['choices'][0]['message']['content'].strip()
    except:
        return "❌ Error getting reply."

# 🏷️ Image Captioning
def caption_image(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = blip_processor(images=image, return_tensors="pt").to(blip_device, torch.float16)
    generated_ids = blip_model.generate(**inputs, max_new_tokens=50)
    return blip_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

# 📦 Object Detection
def detect_objects(image_path):
    results = yolo_model_large(image_path)
    classes = results[0].names
    class_ids = results[0].boxes.cls.tolist()
    return list(set(classes[int(cls_id)] for cls_id in class_ids))

# 🔢 Object Counting
def count_objects(image_path):
    results = yolo_model_large(image_path)
    classes = results[0].names
    class_ids = results[0].boxes.cls.tolist()
    count_dict = {}
    for cls_id in class_ids:
        label = classes[int(cls_id)]
        count_dict[label] = count_dict.get(label, 0) + 1
    return count_dict

# 😊 Emotion Detection
def detect_emotion_np(image_np):
    try:
        result = DeepFace.analyze(img_path=image_np, actions=["emotion"], enforce_detection=False)
        if isinstance(result, list):
            return [r["dominant_emotion"] for r in result]
        else:
            return [result['dominant_emotion']]
    except:
        return []

# 📝 Scene Description
def describe_caption_with_deepseek(caption, objects):
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    prompt = f"Describe the scene based on the following details:\nCaption: {caption}\nDetected Objects: {', '.join(objects)}"
    data = {
        "model": "deepseek/deepseek-chat",
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.7
    }
    try:
        response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data)
        return response.json()['choices'][0]['message']['content'].strip()
    except:
        return "❌ Error generating scene description."

# 🎩 Video Summarization
def summarize_video(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return "❌ Could not open video."
    frame_samples = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    sample_interval = max(1, total_frames // 10)
    for i in range(0, total_frames, sample_interval):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if ret:
            frame_samples.append(frame)
    cap.release()

    detected = collections.Counter()
    emotions = []
    for frame in frame_samples:
        results = yolo_model_large.predict(frame)
        boxes = results[0].boxes
        if boxes:
            class_ids = boxes.cls.tolist()
            class_names = [results[0].names[int(c)] for c in class_ids]
            detected.update(class_names)
        emotions.extend(detect_emotion_np(frame))

    summary_prompt = f"Summarize this video:\nDetected objects: {dict(detected)}\nEmotions: {collections.Counter(emotions)}"
    summary = get_small_talk_reply(summary_prompt)
    print("\n🎩 Video Summary:\n", summary)
# 🔄 Real-Time Tracking
def track_objects_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("❌ Could not open video.")
        return

    output_path = "output_tracked.avi"
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'XVID'), 2.5,
                          (640, 360))  # Half speed & resized

    frame_id = 0
    unique_objects = collections.defaultdict(set)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_id += 1
        if frame_id % 10 != 0:  # Process every 10th frame
            continue

        frame = cv2.resize(frame, (640, 360))  # Resize for speed
        results = yolo_model_small.track(frame, persist=True)
        boxes = results[0].boxes
        if boxes:
            class_ids = boxes.cls.tolist()
            track_ids = boxes.id.tolist() if boxes.id is not None else []
            for box, cls_id, track_id in zip(boxes.xyxy, class_ids, track_ids):
                x1, y1, x2, y2 = map(int, box)
                class_name = results[0].names[int(cls_id)]
                unique_objects[class_name].add(int(track_id))
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{class_name} ID:{int(track_id)}", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        out.write(frame)

    cap.release()
    out.release()
    object_summary = "\n".join([f"{cls}: {len(ids)}" for cls, ids in unique_objects.items()])
    summary_prompt = f"Summarize the video:\nDetected objects:\n{object_summary}"
    summary = get_small_talk_reply(summary_prompt)
    print("\n🧠 AI Summary of the video:\n", summary)
    print("\n📦 Final Unique Object Count:\n", object_summary)
    print(FileLink(output_path))
def upload_file():
    uploaded = files.upload()
    for filename in uploaded.keys():
        return filename


# API Endpoints
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    try:
        # Save the uploaded file
        file_location = f"{drive_path}/static/uploads/{file.filename}"
        with open(file_location, "wb+") as file_object:
            shutil.copyfileobj(file.file, file_object)

        is_video = file.filename.lower().endswith(('.mp4', '.avi', '.mov'))

        # Process the file
        caption = caption_image(file_location) if not is_video else None
        objects = detect_objects(file_location) if not is_video else None

        return {
            "filename": file.filename,
            "filepath": f"/static/uploads/{file.filename}",
            "is_video": is_video,
            "caption": caption,
            "objects": objects
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/process")
async def process_request(request_data: dict):
    try:
        prompt = request_data.get("prompt", "").strip().lower()
        filepath = request_data.get("filepath", "")
        is_video = request_data.get("is_video", False)
        caption = request_data.get("caption", "")
        objects = request_data.get("objects", [])

        # Handle small talk
        small_talk_phrases = ["hi", "hello", "hey", "yo", "howdy", "sup",
                             "good morning", "good evening", "what's up"]
        if prompt in small_talk_phrases:
            return {
                "response": "🤖 Hey there! 👋\n✨ By the way, I can help you with tasks like:\n- 🖼️ Captioning\n- 📷 Descriptions\n- 🔍 Detection\n- 🔢 Counting\n- 😊 Emotions\n- ❓ Q&A\n- 🎬 Summarization\n- 🔄 Tracking",
                "type": "small_talk"
            }

        # Get task from prompt
        task = get_task_from_prompt(prompt)

        if not task:
            return {
                "response": "🤖 I'm here for anything you need! Feel free to ask me about an image or video task anytime.\n🎯 I'm capable of:\n- 🖼️ Captioning\n- 📷 Descriptions\n- 🔍 Detection\n- 🔢 Counting\n- 😊 Emotions\n- ❓ Q&A\n- 🎬 Summarization\n- 🔄 Tracking",
                "type": "info"
            }

        if task in ["real_time_tracking", "video_summarization"] and not is_video:
            return {
                "response": "❌ This task requires a video file.",
                "type": "error"
            }
        elif task in ["image_captioning", "scene_description", "object_detection",
                      "object_counting", "emotion_detection", "question_answering"] and is_video:
            return {
                "response": "❌ This task requires an image file.",
                "type": "error"
            }

        # Process the task
        full_path = f"{drive_path}{filepath}"
        result = {}
        if task == "image_captioning":
            result["response"] = f"🖼️ Caption: {caption}"
        elif task == "scene_description":
            description = describe_caption_with_deepseek(caption, objects)
            result["response"] = f"📝 Scene Description: {description}"
        elif task == "object_detection":
            result["response"] = f"📦 Detected Objects: {objects}"
        elif task == "object_counting":
            count = count_objects(full_path)
            result["response"] = f"🔢 Object Count: {count}"
        elif task == "emotion_detection":
            img = cv2.imread(full_path)
            emotions = detect_emotion_np(img)
            result["response"] = f"😊 Detected Emotions: {collections.Counter(emotions)}"
        elif task == "question_answering":
            answer = get_small_talk_reply(f"Based on the image caption: '{caption}', answer: {prompt}")
            result["response"] = f"💡 Answer: {answer}"
        elif task == "real_time_tracking":
            tracking_result = track_objects_from_video(full_path)
            result["response"] = f"🧠 AI Summary of the video:\n{tracking_result['summary']}\n\n📦 Final Unique Object Count:\n{tracking_result['object_summary']}"
            result["video_path"] = tracking_result["video_path"]
        elif task == "video_summarization":
            summary = summarize_video(full_path)
            result["response"] = f"🎩 Video Summary:\n{summary}"

        result["type"] = "task_response"
        return result

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/", response_class=HTMLResponse)
async def serve_frontend(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

# Run the FastAPI server with ngrok
def run_server():
    # Set up ngrok (replace with your auth token)
    ngrok.set_auth_token("2wH83sGp2f24MRnLJqg7y7j5Mgy_87wid7Apcni7rhTr76Bmt")
    public_url = ngrok.connect(8000).public_url
    print(f" * Public URL: {public_url}")

    # Configure FastAPI to run with uvicorn
    nest_asyncio.apply()
    uvicorn.run(app, host="0.0.0.0", port=8000)

# Run this to start the server
run_server()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
libgl1-mesa-glx is already the newest version (23.0.4-0ubuntu1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
 * Public URL: https://5cfe-34-145-190-158.ngrok-free.app


INFO:     Started server process [37384]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     139.135.44.42:0 - "GET / HTTP/1.1" 200 OK
INFO:     139.135.44.42:0 - "GET /static/img/ai-avatar.png HTTP/1.1" 404 Not Found
INFO:     139.135.44.42:0 - "GET /static/css/style.css HTTP/1.1" 200 OK
INFO:     139.135.44.42:0 - "GET /static/js/script.js HTTP/1.1" 200 OK
INFO:     139.135.44.42:0 - "GET /static/img/robot.png HTTP/1.1" 200 OK

image 1/1 /content/drive/MyDrive/Project/static/uploads/Screenshot 2025-04-07 224942.png: 608x640 1 person, 259.1ms
Speed: 5.6ms preprocess, 259.1ms inference, 2.5ms postprocess per image at shape (1, 3, 608, 640)
INFO:     139.135.44.42:0 - "POST /upload HTTP/1.1" 200 OK
INFO:     139.135.44.42:0 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:     139.135.44.42:0 - "GET /static/uploads/Screenshot%202025-04-07%20224942.png HTTP/1.1" 200 OK
INFO:     139.135.44.42:0 - "POST /process HTTP/1.1" 200 OK


