In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q supervision ultralytics opencv-python opencv-python-headless


In [None]:
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
from collections import defaultdict, deque
from ultralytics import YOLO
import supervision as sv
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib

# Email sending function
def send_email(to_email, subject, message):
    sender_email = "dhruv.1si21ad019@gmail.com"
    sender_password = "nsys jspr ifxc wztn"

    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = to_email
    msg['Subject'] = subject

    msg.attach(MIMEText(message, 'plain'))

    try:
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()
        server.login(sender_email, sender_password)
        server.sendmail(sender_email, to_email, msg.as_string())
        server.quit()
        print(f"Email sent to {to_email}")
    except Exception as e:
        print(f"Failed to send email to {to_email}: {e}")

# Load the dataset
data_path = "Vehicle_Data.xlsx"
df = pd.read_excel(data_path)

# Check the columns to verify the presence of 'Tracked ID'
print(df.columns)  # This will help identify the exact column name

# Video paths and parameters
SOURCE_VIDEO_PATH = "Sequence 03.mp4"
TARGET_VIDEO_PATH = "vehicles-result.mp4"
CONFIDENCE_THRESHOLD = 0.3
IOU_THRESHOLD = 0.5
MODEL_NAME = "yolov8x.pt"
MODEL_RESOLUTION = 1280

# Polygon source and target coordinates
SOURCE = np.array([
    [ 100 ,  598],  # bottom-left
  [1890 ,  605],  # bottom-right
  [1300 ,  198],  # top-right
  [ 850 ,  198]
])
TARGET_WIDTH = 25
TARGET_HEIGHT = 250
TARGET = np.array([
    [0, 0],
    [TARGET_WIDTH - 1, 0],
    [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
    [0, TARGET_HEIGHT - 1],
])

# Define speed limit (km/h)
SPEED_LIMIT = 90

# ViewTransformer class
class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

# Initialize components
view_transformer = ViewTransformer(source=SOURCE, target=TARGET)
model = YOLO(MODEL_NAME)
video_info = sv.VideoInfo.from_video_path(video_path=SOURCE_VIDEO_PATH)

frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)
byte_track = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=CONFIDENCE_THRESHOLD)

thickness = sv.calculate_optimal_line_thickness(resolution_wh=video_info.resolution_wh)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)
box_annotator = sv.BoxAnnotator(thickness=thickness)
label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
trace_annotator = sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps * 2, position=sv.Position.BOTTOM_CENTER)
polygon_zone = sv.PolygonZone(polygon=SOURCE)

coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))

# Set to track emailed tracker IDs
emailed_ids = set()

# Open target video
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame in tqdm(frame_generator, total=video_info.total_frames):
        result = model(frame, imgsz=MODEL_RESOLUTION, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(result)

        # Filter detections
        detections = detections[detections.confidence > CONFIDENCE_THRESHOLD]
        detections = detections[detections.class_id != 0]
        detections = detections[polygon_zone.trigger(detections)]
        detections = detections.with_nms(IOU_THRESHOLD)
        detections = byte_track.update_with_detections(detections=detections)

        points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
        points = view_transformer.transform_points(points=points).astype(int)

        if detections.tracker_id is not None and len(points) > 0:
            for tracker_id, [_, y] in zip(detections.tracker_id, points):
                coordinates[tracker_id].append(y)

        labels = []
        for tracker_id in detections.tracker_id:
            if len(coordinates[tracker_id]) < video_info.fps / 2:
                labels.append(f"#{tracker_id}")
            else:
                coordinate_start = coordinates[tracker_id][-1]
                coordinate_end = coordinates[tracker_id][0]
                distance = abs(coordinate_start - coordinate_end)
                time = len(coordinates[tracker_id]) / video_info.fps
                speed = distance / time * 3.6

                label = f"#{tracker_id} {int(speed)} km/h"

                if speed > SPEED_LIMIT and tracker_id not in emailed_ids:
                    label += " OVER SPEEDING"

                    # Fetch driver details and send email
                    driver_info = df[df['Tracked ID'] == tracker_id]
                    if not driver_info.empty:
                        name = driver_info.iloc[0]['Name']
                        email = driver_info.iloc[0]['Email ID']
                        subject = "Overspeeding Alert"
                        message = (
                            f"Dear {name},\n\n"
                            f"Our system detected that your vehicle (ID: {tracker_id}) was overspeeding at {int(speed)} km/h, "
                            f"exceeding the limit of {SPEED_LIMIT} km/h. Please adhere to speed limits for safety.\n\n"
                            "Thank you."
                        )
                        send_email(email, subject, message)

                        # Add tracker ID to emailed_ids
                        emailed_ids.add(tracker_id)

                labels.append(label)

        # Annotate frame
        annotated_frame = frame.copy()
        annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)
        annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)
        annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

        sink.write_frame(annotated_frame)

print("Processing complete. Video saved and emails sent for overspeeding vehicles.")
