In [None]:
# -*- coding: utf-8 -*-
"""
UCF Crime Detection - Video Processing and Inference
Author: matapv01
Created: 2025-03-15 09:27:27
"""
import os
import cv2
import torch
import numpy as np
from tqdm import tqdm
from PIL import Image
import torch.nn as nn
from datetime import datetime
from torchvision import transforms
from transformers import ViTImageProcessor, ViTForImageClassification
from huggingface_hub import login
from IPython.display import HTML
from base64 import b64encode
from google.colab import files
# Install required packages
!pip install transformers torch torchvision opencv-python pillow tqdm
import cv2
from IPython.display import HTML
from base64 import b64encode
from transformers import ViTForImageClassification, ViTImageProcessor
import torch
# Load model
# Import the necessary modules
from huggingface_hub import login, HfApi





In [None]:
# Hugging Face settings
MODEL_ID_2 = "mata01/crime-20frame-detection-vit-model"  # Your model repo name
#HF_TOKEN = "hf_xxx"  # Replace with your token
from google.colab import userdata
HF_TOKEN = userdata.get('HF_TOKEN')
token = HF_TOKEN

if HF_TOKEN:
  print(f"Loaded token success")


Loaded token success


In [None]:
# Constants
FRAME_CHUNK_SIZE = 20
BATCH_SIZE = 32
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Label mapping
LABELS = {
    0: "Abuse", 1: "Arrest", 2: "Arson", 3: "Assault",
    4: "Burglary", 5: "Explosion", 6: "Fighting", 7: "Normal",
    8: "Road Accident", 9: "Robbery", 10: "Shooting",
    11: "Shoplifting", 12: "Stealing", 13: "Vandalism"
}

class VideoProcessor:
    def __init__(self):
        print("Loading model from Hugging Face...")
        # Initialize video properties
        self.fps = None
        self.frame_width = None
        self.frame_height = None

        try:
            # Login to Hugging Face
            login(HF_TOKEN)

            # Load model and processor
            self.model = ViTForImageClassification.from_pretrained(
                MODEL_ID_2,
                use_auth_token=HF_TOKEN,
                num_labels=len(LABELS),
                id2label=LABELS
            ).to(DEVICE)

            self.processor = ViTImageProcessor.from_pretrained(
                MODEL_ID_2,
                use_auth_token=HF_TOKEN
            )

            print("Model loaded successfully!")

        except Exception as e:
            print(f"Error loading from Hugging Face: {str(e)}")
            raise

        self.model.eval()

        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])

    def extract_frames(self, video_path):
        print("Extracting frames from video...")
        frames = []
        cap = cv2.VideoCapture(video_path)

        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {video_path}")

        # Get video properties
        self.fps = int(cap.get(cv2.CAP_PROP_FPS))
        self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        print(f"Video properties: {self.fps} FPS, {self.frame_width}x{self.frame_height}")

        for _ in tqdm(range(total_frames), desc="Extracting frames"):
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)

        cap.release()
        return frames

    def create_frame_chunks(self, frames):
        print("Creating frame chunks...")
        chunks = []
        for i in range(0, len(frames), FRAME_CHUNK_SIZE):
            chunk = frames[i:i + FRAME_CHUNK_SIZE]
            if len(chunk) == FRAME_CHUNK_SIZE:  # Only use complete chunks
                chunks.append(chunk)
        return chunks

    def process_chunk(self, chunk):
        processed_frames = []
        for frame in chunk:
            pil_image = Image.fromarray(frame)
            processed_frame = self.transform(pil_image)
            processed_frames.append(processed_frame)

        batch = torch.stack(processed_frames).to(DEVICE)

        with torch.no_grad():
            outputs = self.model(batch)
            predictions = torch.softmax(outputs.logits, dim=-1)
            chunk_pred = predictions.mean(dim=0)
            label_idx = chunk_pred.argmax().item()
            confidence = chunk_pred[label_idx].item()

        return LABELS[label_idx], confidence

    def create_output_video(self, frames, predictions, output_path):
        print("Creating output video...")
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, self.fps,
                            (self.frame_width, self.frame_height))

        # Create mapping from frame index to prediction
        frame_predictions = {}
        for chunk_idx, (label, conf) in enumerate(predictions):
            start_frame = chunk_idx * FRAME_CHUNK_SIZE
            end_frame = start_frame + FRAME_CHUNK_SIZE
            for frame_idx in range(start_frame, end_frame):
                frame_predictions[frame_idx] = (label, conf)

        print("Adding labels to frames...")
        for frame_idx, frame in enumerate(tqdm(frames, desc="Processing frames")):
            if frame_idx in frame_predictions:
                label, conf = frame_predictions[frame_idx]

                # Create copy of frame
                frame = frame.copy()

                # Text to display
                text = f"Action: {label} ({conf:.2f})"

                # Draw black background
                (text_width, text_height), _ = cv2.getTextSize(
                    text, cv2.FONT_HERSHEY_SIMPLEX, 1, 2
                )
                cv2.rectangle(
                    frame,
                    (10, 10),
                    (text_width + 20, 40),
                    (0, 0, 0),
                    -1
                )

                # Draw white text
                cv2.putText(
                    frame,
                    text,
                    (15, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1,
                    (255, 255, 255),
                    2,
                    cv2.LINE_AA
                )

            # Write frame
            frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            out.write(frame_bgr)

        out.release()
        print(f"Video saved to {output_path}")

    def process_video(self, input_path, output_path = 'static/outputs/video.mp4'):
        try:
            # Extract frames
            frames = self.extract_frames(input_path)
            if not frames:
                raise ValueError("No frames extracted from video")

            print(f"Total frames: {len(frames)}")

            # Create and process chunks
            chunks = self.create_frame_chunks(frames)
            print(f"Total chunks: {len(chunks)}")

            # Process chunks
            print("Processing chunks...")
            predictions = []
            for chunk in tqdm(chunks, desc="Processing chunks"):
                label, confidence = self.process_chunk(chunk)
                predictions.append((label, confidence))

            # Create output video
            print("\nCreating output video...")
            self.create_output_video(frames, predictions, output_path)

            return output_path

        except Exception as e:
            print(f"Error processing video: {str(e)}")
            return

def show_video(video_path):
    """
    Displays a video in the notebook
    """
    mp4 = open(video_path, 'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML(f"""
    <video width=600 controls>
        <source src="{data_url}" type="video/mp4">
    </video>
    """)

In [None]:
def loadimgprocessor():
  # Định nghĩa repository ID
  username = "mata01"  # Thay bằng username thật của bạn
  model_name = "crime_action_cctv_image_detection"
  repo_id = f"{username}/{model_name}"

  # Tải mô hình và tiền xử lý từ Hugging Face Model Hub
  model = ViTForImageClassification.from_pretrained(repo_id, token=token)
  processor = ViTImageProcessor.from_pretrained(repo_id, token=token)

  # 1. Kiểm tra và chọn thiết bị (GPU nếu có)
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(f"Using device: {device}")

  # Chuyển mô hình sang GPU
  model.to(device)
  model.eval()  # Đặt mô hình ở chế độ evaluation để tối ưu suy luận
  print("Model moved to GPU successfully!")
  return model, processor

from PIL import Image, ImageDraw, ImageFont
import os

def predict_image(image_path, output_path='static/outputs/output_image.jpg'):
    try:
        # Đọc ảnh sử dụng OpenCV
        img = cv2.imread(image_path)
        if img is None:
            print(f"Lỗi: Không thể đọc ảnh tại {image_path}")
            return

        # Chuyển đổi ảnh sang RGB
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Tiền xử lý ảnh
        inputs = processor_img(images=img, return_tensors="pt").to(device)

        # Dự đoán
        with torch.no_grad():
            outputs = model_img(**inputs)
            logits = outputs.logits
            predicted_class_id = logits.argmax().item()
            predicted_label = model_img.config.id2label[predicted_class_id]

        # Hiển thị ảnh với nhãn dự đoán
        pil_img = Image.fromarray(img)
        draw = ImageDraw.Draw(pil_img)
        font_size = 30
        font = ImageFont.truetype("LiberationSansNarrow-Bold.ttf", font_size)
        text_position = (10, 10)
        text_color = (255, 0, 0)

        # Vẽ nhãn dự đoán lên ảnh
        draw.text(text_position, predicted_label, fill=text_color, font=font)

        # Lưu ảnh đầu ra
        pil_img.save(output_path)
        return output_path

    except Exception as e:
        import traceback
        traceback.print_exc()
        print(f"Đã xảy ra lỗi: {e}")
        return

In [None]:
# Khởi tạo các model AI
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Loading models...")
processor = VideoProcessor()
model_img, processor_img = loadimgprocessor()
print("Models loaded successfully!")

Loading models...
Loading model from Hugging Face...
Model loaded successfully!
Using device: cpu
Model moved to GPU successfully!
Models loaded successfully!


In [None]:
# predict_image('/content/frame_01123.jpg')

In [None]:
# output_video_path = '/content/output_video.mp4'
# input_video_path = '/content/Burglary at Colorado business caught on camera.mp4'
# processor.process_video(input_video_path, output_video_path)

Extracting frames from video...
Video properties: 29 FPS, 1920x1080


Extracting frames: 100%|██████████| 1050/1050 [00:16<00:00, 63.92it/s]


Total frames: 1050
Creating frame chunks...
Total chunks: 52
Processing chunks...


Processing chunks: 100%|██████████| 52/52 [08:02<00:00,  9.29s/it]



Creating output video...
Creating output video...
Adding labels to frames...


Processing frames: 100%|██████████| 1050/1050 [00:11<00:00, 90.32it/s] 

Video saved to /content/output_video.mp4





True