# Config

In [1]:
class Config:
    """Configuration class to manage environment variables and settings."""

    # Email settings
    FROM_EMAIL = "vhp08072004@gmail.com"
    FROM_EMAIL_NICKNAME = "Traffic Management System (TMS)"
    EMAIL_PASSWORD = "wayp qcui muhs ietx"  # Ensure no spaces and is a valid app password
    TO_EMAIL = "vhp08071974@gmail.com"

    # Gemini API
    GEMINI_API_KEY = "AIzaSyBf1wGteqDPnWEjRaCM5JvVRptFe5UMUQg"

    # Model paths
    CNN_MODEL_PATH = "CNN_model.onnx"
    LSTM_MODEL_PATH = "LSTM_model.onnx"
    ULTRALYTICS_MODEL_PATH = "yolo11n.pt"

    # Speed Estimator Model Path
    SPEED_ESTIMATOR_MODEL_PATH = "yolo11n.pt"

    # Violation Thresholds
    VIOLATION_THRESHOLDS = {
        'large_vehicles': 10,
        'motorcycles': 15,
        'illegal_parking': 5,
        'red_light': 7
    }

    # Logging settings
    LOG_FILE = "system.log"
    LOG_MAX_BYTES = 5 * 1024 * 1024  # 5 MB
    LOG_BACKUP_COUNT = 5

In [2]:
import librosa
import torch
import onnxruntime as ort
import google.generativeai as genai
import re
import json
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


# Inference

In [None]:
def configure_gemini():
    """
    Configures the Gemini API using the provided API key.
    Returns the configured Gemini model.
    """
    try:
        genai.configure(api_key=Config.GEMINI_API_KEY)
        model = genai.GenerativeModel(model_name="gemini-1.5-flash-8b")
        print("Gemini API configured successfully.")
        return model
    except Exception as e:
        print(f"Error configuring Gemini API: {e}")
        return None

model_gemini = configure_gemini()

def parse_gemini_response(response_text):
    """
    Parses the Gemini API response to extract ambulance bounding box data.
    """
    try:
        # Extract JSON code block using regex
        json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
        if not json_match:
            return None
        json_str = json_match.group(0)
        data = json.loads(json_str)
        if "ambulance" in data:
            return data["ambulance"]
    except Exception as e:
        print(f"Error parsing Gemini response: {e}")
    return None

# --------------------- PyTorch Configuration --------------------- #

def configure_pytorch():
    """
    Configures PyTorch to use MPS, CUDA, or CPU.
    Returns the device being used.
    """
    if torch.backends.mps.is_available():
        device = torch.device('mps')
        print("Using device: MPS")
    elif torch.cuda.is_available():
        device = torch.device('cuda')
        cuda_name = torch.cuda.get_device_name(device)
        print(f"Using device: {cuda_name}")
    else:
        device = torch.device('cpu')
        print("Using device: CPU")
    return device

device_pt = configure_pytorch()

# --------------------- Feature Extraction and Prediction Functions --------------------- #

def features_extractor(audio, sample_rate):
    """
    Extracts MFCC features from audio data.
    """
    try:
        mfccs_features = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=80)
        mfccs_scaled_features = np.mean(mfccs_features.T, axis=0)
        return mfccs_scaled_features
    except Exception as e:
        print(f"Error extracting features: {e}")
        return None

def load_onnx_model(model_path):
    """
    Loads an ONNX model and returns an ONNX Runtime session.
    """
    try:
        session = ort.InferenceSession(model_path)
        print(f"Loaded ONNX model from {model_path}")
        return session
    except Exception as e:
        print(f"Error loading ONNX model: {e}")
        return None

# Load models
cnn_session = load_onnx_model(Config.CNN_MODEL_PATH)
lstm_session = load_onnx_model(Config.LSTM_MODEL_PATH)

def predict_cnn(features, session):
    """
    Predicts the class using a CNN ONNX model.
    """
    try:
        # Prepare input
        input_name = session.get_inputs()[0].name
        input_data = features.astype(np.float32).reshape(1, -1, 1)
        predictions = session.run(None, {input_name: input_data})
        predicted_class = np.argmax(predictions[0], axis=1)
        return predicted_class[0], predictions[0][0]
    except Exception as e:
        print(f"CNN prediction failed: {e}")
        return None, None

def predict_lstm(features, session):
    """
    Predicts the class using an LSTM ONNX model.
    """
    try:
        # Prepare input
        input_name = session.get_inputs()[0].name
        input_data = features.astype(np.float32).reshape(1, -1, 80)
        predictions = session.run(None, {input_name: input_data})
        predicted_class = np.argmax(predictions[0], axis=1)
        return predicted_class[0], predictions[0][0]
    except Exception as e:
        print(f"LSTM prediction failed: {e}")
        return None, None

def ensemble_predictions(cnn_probs, lstm_probs):
    """
    Averages the probabilities from CNN and LSTM models for ensemble prediction.
    """
    try:
        ensemble_probs = (cnn_probs + lstm_probs) / 2  # Average probabilities
        return ensemble_probs
    except Exception as e:
        print(f"Ensemble prediction failed: {e}")
        return None

In [None]:
class GeminiDetectionThread(QThread):
    """
    Thread to handle ambulance detection using the Gemini API.
    Emits a signal indicating whether bounding boxes were detected.
    """
    bounding_boxes_detected = pyqtSignal(bool)  # Emits True if bounding boxes are detected

    def __init__(self, video_file, region3, start_frame, fps, parent=None):
        super().__init__(parent)
        self.video_file = video_file
        self.region3 = region3
        self.start_frame = start_frame
        self.fps = fps
        self._run_flag = True
        self.model = model_gemini
        self.logger = logging.getLogger(self.__class__.__name__)

    def run(self):
        if not self.model:
            self.logger.error("Gemini model not initialized.")
            self.bounding_boxes_detected.emit(False)
            return

        cap = cv2.VideoCapture(self.video_file)
        if not cap.isOpened():
            self.logger.error("Failed to open video file.")
            self.bounding_boxes_detected.emit(False)
            return

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_interval = int(self.fps * 3 / 5)
        frames_to_extract = [self.start_frame + i * frame_interval for i in range(5)]
        responses = []

        for frame_num in frames_to_extract:
            if frame_num >= total_frames:
                break
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
            ret, frame = cap.read()
            if not ret:
                continue
            roi = self.crop_to_region(frame, self.region3)
            has_bbox = self.send_to_gemini(roi)
            responses.append(has_bbox)

        cap.release()

        bbox_count = sum(responses)
        no_bbox_count = len(responses) - bbox_count
        if bbox_count > no_bbox_count:
            # Switch traffic light to green
            self.bounding_boxes_detected.emit(True)
            self.logger.info("Bounding boxes detected more than non-detections.")
            # Continue detection every 2 seconds
            while self._run_flag:
                time.sleep(2)
                next_frame = min(self.start_frame + int(self.fps * 3), total_frames - 1)
                cap = cv2.VideoCapture(self.video_file)
                cap.set(cv2.CAP_PROP_POS_FRAMES, next_frame)
                ret, frame = cap.read()
                cap.release()
                if not ret:
                    break
                roi = self.crop_to_region(frame, self.region3)
                has_bbox = self.send_to_gemini(roi)
                if has_bbox:
                    continue  # Continue looping
                else:
                    # No bounding box detected, reset traffic light
                    self.bounding_boxes_detected.emit(False)
                    self.logger.info("No bounding boxes detected in subsequent frames.")
                    break
        else:
            # Not enough bounding boxes detected
            self.bounding_boxes_detected.emit(False)
            self.logger.info("Bounding boxes not detected sufficiently.")

    def crop_to_region(self, frame, region):
        pts = np.array(region, np.int32)
        mask = np.zeros(frame.shape[:2], dtype=np.uint8)
        cv2.fillPoly(mask, [pts], 255)
        roi = cv2.bitwise_and(frame, frame, mask=mask)
        x, y, w, h = cv2.boundingRect(pts)
        roi_cropped = roi[y:y+h, x:x+w]
        return roi_cropped

    def send_to_gemini(self, image):
        """
        Sends the cropped region to the Gemini API for ambulance detection.
        """
        if image.size == 0:
            self.logger.warning("Empty ROI image, skipping Gemini detection.")
            return False

        pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        try:
            response = self.model.generate_content(
                [PROMPT, pil_image],
            ).text
            bbox = parse_gemini_response(response)
            if bbox:
                self.logger.info("Bounding box detected by Gemini.")
                return True
            else:
                self.logger.info("No bounding box detected by Gemini.")
                return False
        except Exception as e:
            self.logger.error(f"Gemini API call failed: {e}")
            return False

    def stop(self):
        """
        Stops the thread gracefully.
        """
        self._run_flag = False
        self.wait()