In [7]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
import numpy as np
import cv2
import glob
from PIL import Image
from collections import deque
import supervision as sv
from inference import get_model
from time import time



In [2]:
detection_api_key = "V12deDQVnSZqs4PkLX0Y"

In [8]:

class ViolenceDetectionModel(nn.Module):
    def __init__(self, sequence_length, hidden_size=256):
        super(ViolenceDetectionModel, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove final classification layer
        self.sequence_length = sequence_length

        self.lstm = nn.LSTM(input_size=512, hidden_size=hidden_size, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)  # Final classification layer

    def forward(self, x):
        batch_size, seq_len, c, h, w = x.size()
        x = x.view(batch_size * seq_len, c, h, w)
        x = self.resnet(x)
        x = x.view(batch_size, seq_len, -1)
        lstm_out, _ = self.lstm(x)
        out = lstm_out[:, -1, :]
        out = self.fc(out)
        return torch.sigmoid(out)

In [9]:
# Assume the model loading and detection code is as follows:
classification_model = ViolenceDetectionModel(sequence_length=7, hidden_size=512)
classification_model.load_state_dict(torch.load(r"best_violence_detection_model.pth", map_location=torch.device('cpu')))
classification_model.eval()

detection_model = get_model(model_id="emergency-response/1", api_key=detection_api_key)

# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Buffer to store frames for classification
frame_buffer = deque(maxlen=30)  # 5 seconds buffer at 30 FPS
violence_detected = False
frames_to_check = 30  # Number of frames corresponding to 5 seconds before & 5 seconds after

def classify_frames(frame_buffer):
    frames = list(frame_buffer)
    frames_tensor = torch.stack([transform(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) for frame in frames])
    frames_tensor = frames_tensor.unsqueeze(0)  # Add batch dimension

    with torch.no_grad():
        outputs = classification_model(frames_tensor)
        predicted = (outputs > 0.5).float()

    return predicted.item() == 1  # True if violence detected

def detect_violence(frame, detection_model):
    # Run detection model on the current frame
    results = detection_model.infer(frame)[0]
    detections = sv.Detections.from_inference(results)

    # Annotate frame
    bounding_box_annotator = sv.BoxAnnotator()
    label_annotator = sv.LabelAnnotator()

    annotated_frame = bounding_box_annotator.annotate(scene=frame, detections=detections)
    annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections)

    return annotated_frame

# Function to capture video from the camera and process frames
def process_live_camera():
    global violence_detected
    frame_buffer.clear()  # Clear buffer

    # Capture live video from the default camera (0)
    cap = cv2.VideoCapture(0)

    # Store processed video output
    output_frames = []  
    start_time = time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Add frame to buffer
        frame_buffer.append(frame)

        # Classify if buffer is full (check violence for every 5 seconds buffer)
        if len(frame_buffer) == frame_buffer.maxlen:
            if classify_frames(frame_buffer):
                violence_detected = True

        # If violence detected, process frames for detection
        if violence_detected:
            print(f"Violence detected! Processing the last {frames_to_check} frames...")
            for buffered_frame in list(frame_buffer)[-frames_to_check:]:
                annotated_frame = detect_violence(buffered_frame, detection_model)
                output_frames.append(annotated_frame)
            violence_detected = False  # Reset after detection

        # Display live video feed (without annotations)
        cv2.imshow('Live Feed', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
            break

    cap.release()
    cv2.destroyAllWindows()

    # Save output video with annotations (if any violence detected)
    if output_frames:
        output_video_path = "output_annotated_live.mp4"
        height, width, layers = output_frames[0].shape
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, 30, (width, height))

        for frame in output_frames:
            out.write(frame)

        out.release()
        print(f"Annotated video saved as {output_video_path}")




In [10]:

# Run live camera processing
process_live_camera()

qt.qpa.plugin: Could not find the Qt platform plugin "wayland" in "/home/jivan-acharya/Documents/Code/Emergency-Response/venv/lib/python3.10/site-packages/cv2/qt/plugins"


In [7]:
import cv2

cap = cv2.VideoCapture(0)  # or try 1, 2, etc.

if not cap.isOpened():
    print("Cannot open camera")
    exit()

while True:
    ret, frame = cap.read()
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break

    cv2.imshow('Camera Feed', frame)

    if cv2.waitKey(1) == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [13]:
import cv2
import streamlit as st
import torch
from collections import deque
import numpy as np
from PIL import Image
from torchvision import transforms
import os
from datetime import datetime
import gradio as gr

# Violence detection and detection models
classification_model = ViolenceDetectionModel(sequence_length=7, hidden_size=512)
classification_model.load_state_dict(torch.load(r"best_violence_detection_model.pth", map_location=torch.device('cpu')))
classification_model.eval()

detection_model = get_model(model_id="emergency-response/1", api_key=detection_api_key)

# Define transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

frame_buffer = deque(maxlen=30)
violence_detected = False
logs = []  # Log for timestamps of detected violence
saved_clips = []  # Store file paths of saved clips

st.title("Real-Time Violence Detection")
st.write("Live video feed and logging detected violence instances")

# Live video section
video_placeholder = st.empty()
logs_placeholder = st.empty()

def classify_frames(frame_buffer):
    frames = list(frame_buffer)
    frames_tensor = torch.stack([transform(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) for frame in frames])
    frames_tensor = frames_tensor.unsqueeze(0)

    with torch.no_grad():
        outputs = classification_model(frames_tensor)
        predicted = (outputs > 0.5).float()

    return predicted.item() == 1

def save_clip(frames, clip_path):
    height, width, _ = frames[0].shape
    video_writer = cv2.VideoWriter(clip_path, cv2.VideoWriter_fourcc(*'XVID'), 30, (width, height))
    for frame in frames:
        video_writer.write(frame)
    video_writer.release()

def detect_violence(frame):
    # Placeholder function to simulate detection, you would put your model here
    return frame

# Video capture setup
cap = cv2.VideoCapture(0)  # 0 for webcam
buffer_frames = []
fps = 30  # Assuming 30 FPS
clip_duration = 15  # Duration of clip in seconds

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_buffer.append(frame)

    # Display live video in Streamlit
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    video_placeholder.image(frame_rgb, channels="RGB")

    if classify_frames(frame_buffer):
        if not violence_detected:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            logs.append(f"Violence detected at {timestamp}")
            violence_detected = True

            # Save 15 seconds of footage
            clip_name = f'violence_clip_{len(saved_clips) + 1}.avi'
            clip_path = os.path.join("saved_clips", clip_name)
            save_clip(list(frame_buffer), clip_path)
            saved_clips.append(clip_path)

    if violence_detected and len(buffer_frames) >= clip_duration * fps:
        violence_detected = False

    # Update logs in Streamlit
    logs_placeholder.text("\n".join(logs))

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# Pass saved clips to Gradio
def show_clips():
    def load_clip(clip_path):
        return clip_path
    
    gr.Interface(fn=load_clip, inputs="file", outputs="video", examples=saved_clips).launch()


2024-10-06 21:04:37.305 
  command:

    streamlit run /home/jivan-acharya/Documents/Code/Emergency-Response/venv/lib/python3.10/site-packages/ipykernel_launcher.py [ARGUMENTS]


KeyboardInterrupt: 