## Mini Project 2 - Road Accident Detection And Alert

# Submitted By

Course: MSc Computer Science With Data Analytics

Name(Reg No:)

NIDIN V NANDAN(223039)

ADARSH PS(223004)

ADHISH S SUJAN(223005)

In [1]:
#import necessary libraries
import cv2
import torch
import numpy as np
import torchvision
import time
import torch.nn as nn
import datetime
import requests

In [2]:
# Ensemble class for handling multiple YOLOv5 models

class Ensemble(nn.ModuleList):
    # Ensemble of models
    def __init__(self):
        super().__init__()

    def forward(self, x, augment=False, profile=False, visualize=False):
        y = [module(x, augment, profile, visualize)[0] for module in self]
       
        y = torch.cat(y, 1)  
        return y, None  

def attempt_load(weights, device=None, inplace=True, fuse=True):
    
    from models.yolo import Detect, Model

    model = Ensemble()
    for w in weights if isinstance(weights, list) else [weights]:
        ckpt = torch.load('o&a.pt', map_location='cpu') 
        ckpt = (ckpt.get('ema') or ckpt['model']).to(device).float() 

       
        if not hasattr(ckpt, 'stride'):
            ckpt.stride = torch.tensor([32.])
        if hasattr(ckpt, 'names') and isinstance(ckpt.names, (list, tuple)):
            ckpt.names = dict(enumerate(ckpt.names)) 

        model.append(ckpt.fuse().eval() if fuse and hasattr(ckpt, 'fuse') else ckpt.eval())  

    # Module updates
    for m in model.modules():
        t = type(m)
        if t in (nn.Hardswish, nn.LeakyReLU, nn.ReLU, nn.ReLU6, nn.SiLU, Detect, Model):
            m.inplace = inplace
            if t is Detect and not isinstance(m.anchor_grid, list):
                delattr(m, 'anchor_grid')
                setattr(m, 'anchor_grid', [torch.zeros(1)] * m.nl)
        elif t is nn.Upsample and not hasattr(m, 'recompute_scale_factor'):
            m.recompute_scale_factor = None
    # Return model
    if len(model) == 1:
        return model[-1]

    # Return detection ensemble
    print(f'Ensemble created with {weights}\n')
    for k in 'names', 'nc', 'yaml':
        setattr(model, k, getattr(model[0], k))
    model.stride = model[torch.argmax(torch.tensor([m.stride.max() for m in model])).int()].stride 
    assert all(model[0].nc == m.nc for m in model), f'Models have different class counts: {[m.nc for m in model]}'
    return model


In [3]:
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
    """
    Rescale bounding boxes from img1_shape to img0_shape.

    Args:
        img1_shape (tuple): Original image shape (height, width).
        boxes (torch.Tensor): Bounding boxes in format (x_center, y_center, width, height).
        img0_shape (tuple): Target image shape (height, width).
        ratio_pad (tuple): Optional padding ratio for rescaling.

    Returns:
        torch.Tensor: Rescaled bounding boxes.
    """
    if ratio_pad is None: 
        gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) 
        pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 
    else:
        gain = ratio_pad[0][0]
        pad = ratio_pad[1]

    boxes[..., [0, 2]] -= pad[0]  
    boxes[..., [1, 3]] -= pad[1]  
    boxes[..., :4] /= gain
    clip_boxes(boxes, img0_shape)
    return boxes

def clip_boxes(boxes, shape):
    """
    Clip bounding boxes to fit within image shape.

    Args:
        boxes (torch.Tensor): Bounding boxes in format (x1, y1, x2, y2).
        shape (tuple): Image shape (height, width).

    Returns:
        None
    """
    if isinstance(boxes, torch.Tensor):  
        boxes[..., 0].clamp_(0, shape[1])
        boxes[..., 1].clamp_(0, shape[0]) 
        boxes[..., 2].clamp_(0, shape[1]) 
        boxes[..., 3].clamp_(0, shape[0])  
    else:  
        boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) 
        boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0])
def xywh2xyxy(x):
    """
    Convert bounding boxes from [x, y, w, h] to [x1, y1, x2, y2].

    Args:
        x (torch.Tensor): Bounding boxes in format (x, y, w, h).

    Returns:
        torch.Tensor: Converted bounding boxes.
    """
    y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
    y[..., 0] = x[..., 0] - x[..., 2] / 2  
    y[..., 1] = x[..., 1] - x[..., 3] / 2 
    y[..., 2] = x[..., 0] + x[..., 2] / 2 
    y[..., 3] = x[..., 1] + x[..., 3] / 2 
    return y
def box_iou(box1, box2, eps=1e-7):
    
    """
    Compute Intersection over Union (IoU) between two sets of bounding boxes.

    Args:
        box1 (torch.Tensor): Bounding boxes set 1.
        box2 (torch.Tensor): Bounding boxes set 2.
        eps (float): Epsilon value to avoid division by zero.

    Returns:
        torch.Tensor: IoU between the two sets of bounding boxes.
    """

    
    (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2)
    inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp(0).prod(2)

    # IoU = inter / (area1 + area2 - inter)
    return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps)
def non_max_suppression(
        prediction,
        conf_thres=0.25,
        iou_thres=0.45,
        classes=None,
        agnostic=False,
        multi_label=False,
        labels=(),
        max_det=300,
        nm=0,  # number of masks
):
    """
    Perform Non-Maximum Suppression (NMS) on bounding box predictions.

    Args:
        prediction (torch.Tensor): Model predictions.
        conf_thres (float): Confidence threshold.
        iou_thres (float): IoU threshold for NMS.
        classes (list): List of classes to consider.
        agnostic (bool): If True, NMS will be class-agnostic.
        multi_label (bool): If True, allows multiple labels per box.
        labels (tuple): A tuple of labels for autolabelling.
        max_det (int): Maximum number of detections to keep after NMS.
        nm (int): Number of masks.

    Returns:
        list: List of tensors, each containing selected bounding boxes after NMS.
    """

    
    assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
    assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
    if isinstance(prediction, (list, tuple)):  
        prediction = prediction[0]  

    device = prediction.device
    mps = 'mps' in device.type  
    if mps:  
        prediction = prediction.cpu()
    bs = prediction.shape[0]  # batch size
    nc = prediction.shape[2] - nm - 5  # number of classes
    xc = prediction[..., 4] > conf_thres  # candidates

    
    max_wh = 7680  
    max_nms = 30000  
    time_limit = 0.5 + 0.05 * bs 
    redundant = True 
    multi_label &= nc > 1  
    merge = False 

    t = time.time()
    mi = 5 + nc 
    output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs
    for xi, x in enumerate(prediction):  
        
        x = x[xc[xi]]  

        # Cat apriori labels if autolabelling
        if labels and len(labels[xi]):
            lb = labels[xi]
            v = torch.zeros((len(lb), nc + nm + 5), device=x.device)
            v[:, :4] = lb[:, 1:5]
            v[:, 4] = 1.0 
            v[range(len(lb)), lb[:, 0].long() + 5] = 1.0  
            x = torch.cat((x, v), 0)

        if not x.shape[0]:
            continue

        # Compute conf
        x[:, 5:] *= x[:, 4:5]  # conf = obj_conf * cls_conf

        # Box/Mask
        box = xywh2xyxy(x[:, :4]) 
        mask = x[:, mi:] 

        if multi_label:
            i, j = (x[:, 5:mi] > conf_thres).nonzero(as_tuple=False).T
            x = torch.cat((box[i], x[i, 5 + j, None], j[:, None].float(), mask[i]), 1)
        else: 
            conf, j = x[:, 5:mi].max(1, keepdim=True)
            x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres]

        if classes is not None:
            x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]

  
        n = x.shape[0] 
        if not n: 
            continue
        x = x[x[:, 4].argsort(descending=True)[:max_nms]] 

        c = x[:, 5:6] * (0 if agnostic else max_wh) 
        boxes, scores = x[:, :4] + c, x[:, 4] 
        i = torchvision.ops.nms(boxes, scores, iou_thres) 
        i = i[:max_det]  
        if merge and (1 < n < 3E3):  
            iou = box_iou(boxes[i], boxes) > iou_thres  
            weights = iou * scores[None]  
            x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) 
            if redundant:
                i = i[iou.sum(1) > 1]  

        output[xi] = x[i]
        if mps:
            output[xi] = output[xi].to(device)
        
    return output

In [4]:
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):
    """
    Resize and pad an image while meeting stride-multiple constraints.

    Args:
        im (numpy.ndarray): Input image.
        new_shape (tuple or int): Target shape for the image after resizing.
        color (tuple): RGB color value for padding.
        auto (bool): If True, compute minimum rectangle for padding.
        scaleFill (bool): If True, stretch the image to fit the target shape.
        scaleup (bool): If True, allow scaling up for better validation mAP.
        stride (int): Stride for constraint.

    Returns:
        numpy.ndarray: Resized and padded image.
        tuple: Width and height ratios between original and new shapes.
        tuple: Width and height padding applied to the image.
    """
    shape = im.shape[:2]  
    if isinstance(new_shape, int):
        new_shape = (new_shape, new_shape)

    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    if not scaleup:  
        r = min(r, 1.0)

    ratio = r, r 
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] 
    if auto: 
        dw, dh = np.mod(dw, stride), np.mod(dh, stride) 
    elif scaleFill:
        dw, dh = 0.0, 0.0
        new_unpad = (new_shape[1], new_shape[0])
        ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] 

    dw /= 2 
    dh /= 2

    if shape[::-1] != new_unpad:  # resize
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) 
    return im, ratio, (dw, dh)

In [5]:

def plot_one_box(x, img, color=None, label=None, line_thickness=None):
    """
    Draw a bounding box with an optional label on an image.

    Args:
        x (list or tuple): Coordinates of the bounding box in the format [x1, y1, x2, y2].
        img (numpy.ndarray): Input image.
        color (list): RGB color value for the bounding box and label background.
        label (str): Optional label text.
        line_thickness (int): Line and font thickness.

    Returns:
        None
    """
    tl = line_thickness or round(0.002 * max(img.shape[0:2])) + 1  
    color = color or [random.randint(0, 255) for _ in range(3)]
    c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
    cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
    if label:
        tf = max(tl - 1, 1)  
        t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
        c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3
        cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) 
        cv2.putText(
            img,
            label,
            (c1[0], c1[1] - 2),
            0,
            tl / 3,
            [225, 255, 255],
            thickness=tf,
            lineType=cv2.LINE_AA,
        )


# Part 1- Code For Detecting accident Without Sending alert message

In [6]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
weights = 'o&a.pt'  # Path to model weights
model = attempt_load(weights)
model.to(device)  
stride = int(model.stride.max())  
imgsz = 640  # Input image size 

cap = cv2.VideoCapture("accidentvideo.mp4")  
accident_frames_threshold = 3 
accident_frames = 0
alert_displayed = False

while True:
    ret, frame = cap.read()
    if not ret:
        break

    img0 = letterbox(frame, new_shape=imgsz)[0]
    img = img0[:, :, ::-1].transpose(2, 0, 1) 
    img = np.ascontiguousarray(img)

    img = torch.from_numpy(img).to(device)
    img = img.float() 
    img /= 255.0  
    if img.ndimension() == 3:
        img = img.unsqueeze(0)

    pred = model(img)[0]
    pred = non_max_suppression(pred, 0.4, 0.5, classes=None, agnostic=False)

    accident_detected = False

    for det in pred:
        if det is not None and len(det):
            det[:, :4] = scale_boxes(img.shape[2:], det[:, :4], frame.shape).round()
            for *xyxy, conf, cls in det:
                label = f'{model.names[int(cls)]} {conf:.2f}'
                plot_one_box(xyxy, frame, label=label, color=(0, 255, 0), line_thickness=2)
                if model.names[int(cls)] == 'accident':
                    accident_detected = True
                    break

    if accident_detected:
        accident_frames += 1
        if accident_frames >= accident_frames_threshold and not alert_displayed:
            cv2.putText(frame, 'Accident Detected!', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            alert_displayed = True
    else:
        accident_frames = 0
        alert_displayed = False

    cv2.imshow('ACCIDENT DETECTOR', frame)
    if cv2.waitKey(50) == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


Fusing layers... 
Model summary: 157 layers, 7023610 parameters, 0 gradients, 15.8 GFLOPs


Here the model will detect the presence of an accident scenario in the video but it will not send the alert here.

# Part 2 -Code to Detect Accident and send the accident message along with the accident image and time to a Telegram Channel

In [12]:
weights = 'o&a.pt'  # Path to model weights
model = attempt_load(weights)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)  
stride = int(model.stride.max()) 
imgsz = 640  # Input image size

# Load the video capture
cap = cv2.VideoCapture("accidentvideo.mp4")
# create function to send alert to telegram
bot_token = 'Github will not permit the use of private token. So we are not able to provide that'
channel_id = "@PROJECT_520"
def send_telegram_message_to_channel(bot_token, chat_id, message, photo=None):
    url = f"https://api.telegram.org/bot{bot_token}/sendPhoto" if photo else f"https://api.telegram.org/bot{bot_token}/sendMessage"
    params = {
        "chat_id": chat_id,
        "caption": message
    }
    files = {'photo': ('accident.jpg', photo)} if photo else None
    response = requests.post(url, params=params, files=files)
    if response.status_code == 200:
        print("Message sent successfully!")
    else:
        print(f"Failed to send message. Status code: {response.status_code}")
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Preprocess the frame
    img0 = letterbox(frame, new_shape=imgsz)[0]
    img = img0[:, :, ::-1].transpose(2, 0, 1) 
    img = np.ascontiguousarray(img)


    img = torch.from_numpy(img).to(device)
    img = img.float()  
    img /= 255.0 
    if img.ndimension() == 3:
        img = img.unsqueeze(0)

    
    pred = model(img)[0]
    pred = non_max_suppression(pred, 0.4, 0.5, classes=None, agnostic=False)

    # Display the results and send message if accident detected
    for det in pred:
        if det is not None and len(det):
            det[:, :4] = scale_boxes(img.shape[2:], det[:, :4], frame.shape).round()
            for *xyxy, conf, cls in det:
                label = f'{model.names[int(cls)]} {conf:.2f}'
                if model.names[int(cls)] == 'accident' and conf > 0.80:
                    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    message = f"Accident Detected at {current_time}!"

                    # Capture the accident frame
                    accident_frame = frame.copy()

                    # Save the accident frame as an image
                    cv2.imwrite("accident.jpg", accident_frame)

                    # Send the message and photo
                    send_telegram_message_to_channel(bot_token, channel_id, message, photo=open("accident.jpg", "rb"))

                   

                plot_one_box(xyxy, frame, label=label, color=(0, 255, 0), line_thickness=2)

    cv2.imshow('ACCIDENT DETECTOR', frame)
    if cv2.waitKey(50) == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

Fusing layers... 
Model summary: 157 layers, 7023610 parameters, 0 gradients, 15.8 GFLOPs


Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!
Message sent successfully!


Here the model will detect the presence of an accident in the video and if the accident detected in the frame have a probability of more than 80% then it will capture the image of the accident occuring frame and save it to accident.jpg and then it will utilize the telegram api to send the "Accident Detected" message along with the captured image and the time of the accident occured to the corresponding channel in the telegram