<a href="https://colab.research.google.com/github/sameerdewan/DecentralizedHousingMarket/blob/master/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [77]:
pip install requests opencv-python opencv-python-headless numpy pytz supervision inference flask pyngrok



In [85]:
import os
import signal
import sys
import time
from datetime import datetime
import threading
import cv2
import numpy as np
import requests
import pytz
import supervision as sv
from inference import get_model
import matplotlib.pyplot as plt
from flask import Flask, Response, jsonify, stream_with_context
from pyngrok import ngrok

# Configuration
IMAGE_URL = 'https://webcams.nyctmc.org/api/cameras/04e09ed5-2d97-4e29-8438-b87748850dbb/image'
FRAME_RATE = .5
VIDEO_SIZE = (640, 480)
OUTPUT_FILE = "annotated_timelapse.mp4"
NGROK_AUTHTOKEN = "2hqxnQqMdvTNdeJpBd7G5OgRlIs_4x1G5iRf4bHwWLk9CZQFk"

# Initialize model
model = get_model(model_id="yolov8n-640")

# Initialize data collection
detection_data = {
    'timestamp': [],
    'person': [],
    'car': [],
    'bus': [],
    'truck': [],
    'motorcycle': [],
    'bike': []
}

def process_image(image_url):
    # Perform inference
    results = model.infer(
        image=image_url,
        confidence=0.1,
        iou_threshold=.5
    )

    detections = results[0].predictions

    # Filter detections for vehicles or people
    valid_classes = {'person', 'car', 'bus', 'truck', 'motorcycle', 'bike'}
    filtered_detections = [det for det in detections if det.class_name in valid_classes]

    # Fetch the image to annotate
    response = requests.get(image_url, stream=True)
    if response.status_code == 200:
        response.raw.decode_content = True
        image = np.asarray(bytearray(response.raw.read()), dtype="uint8")
        image = cv2.imdecode(image, cv2.IMREAD_COLOR)
    else:
        print(f"Failed to fetch image for annotation. Status code: {response.status_code}")
        return None

    # Annotate frame
    for det in filtered_detections:
        x1 = int(det.x - det.width / 2)
        y1 = int(det.y - det.height / 2)
        x2 = int(det.x + det.width / 2)
        y2 = int(det.y + det.height / 2)
        label = f"{det.class_name}: {det.confidence:.2f}"
        if det.class_name == 'person':
            color = (0, 0, 255)  # Red color for people
        else:
            color = (0, 255, 0)  # Green color for vehicles
        cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    return image, filtered_detections

def setup_video_writer(filename, size, fps):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(filename, fourcc, fps, size)
    return writer

def setup_signal_handler(writer):
    def signal_handler(sig, frame):
        writer.release()
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)

def main():
    writer = setup_video_writer(OUTPUT_FILE, VIDEO_SIZE, FRAME_RATE)
    setup_signal_handler(writer)

    try:
        while True:
            # Process the image URL directly
            annotated_image, detections = process_image(IMAGE_URL)

            if annotated_image is not None:
                # Resize the image to match video size
                annotated_image = cv2.resize(annotated_image, VIDEO_SIZE)

                # Write the frame to the video file
                writer.write(annotated_image)

                # Collect data for the detected objects
                timestamp = datetime.now(pytz.utc)
                detection_data['timestamp'].append(timestamp)
                detection_data['person'].append(sum(1 for det in detections if det.class_name == 'person'))
                detection_data['car'].append(sum(1 for det in detections if det.class_name == 'car'))
                detection_data['bus'].append(sum(1 for det in detections if det.class_name == 'bus'))
                detection_data['truck'].append(sum(1 for det in detections if det.class_name == 'truck'))
                detection_data['motorcycle'].append(sum(1 for det in detections if det.class_name == 'motorcycle'))
                detection_data['bike'].append(sum(1 for det in detections if det.class_name == 'bike'))

            # Wait for the next frame
            time.sleep(1 / FRAME_RATE)
    finally:
        writer.release()

# Flask App
app = Flask(__name__)

@app.route('/detection_data', methods=['GET'])
def get_detection_data():
    return jsonify(detection_data)

@app.route('/video_feed')
def video_feed():
    def generate():
        cap = cv2.VideoCapture(OUTPUT_FILE)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            _, buffer = cv2.imencode('.mp4', frame)
            frame = buffer.tobytes()
            yield (b'--frame\r\n'
                   b'Content-Type: video/mp4\r\n\r\n' + frame + b'\r\n')
        cap.release()
    return Response(stream_with_context(generate()), mimetype='multipart/x-mixed-replace; boundary=frame')

def run_flask():
    port = 1000
    ngrok.set_auth_token(NGROK_AUTHTOKEN)
    public_url = ngrok.connect(port).public_url
    print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{port}\"")
    app.config["BASE_URL"] = public_url
    app.run(port=port, use_reloader=False)

# Start the Flask app in a separate thread
threading.Thread(target=run_flask).start()

if __name__ == "__main__":
    main()


 * ngrok tunnel "https://80cb-34-125-70-188.ngrok-free.app" -> "http://127.0.0.1:1000"
 * Serving Flask app '__main__'
 * Debug mode: off


Address already in use
Port 1000 is in use by another program. Either identify and stop that program, or start the server with a different port.


SystemExit: 0