# Two-Wheeler Traffic Rule Violation Detection


## Detected Objects
- Helmet Detection Project: Two-wheeler/motorcyclist, Helmet, License Plate
- Face Detection: Human Face
- Two Wheeler Lane Detection: Front-facing motorcycle, Rear-facing motorcycle

## Violations
- Wrong Lane: Driving away from the camera.
- No Helmet: Any rider not wearing a helmet.
- Triple riding: More than two riders.


1. **Motorcycle Detection:**
   - Detects all two-wheelers/motorcycles in a frame.
2. **Bounding Box Extraction:**
   - For each detected motorcycle, extracts its bounding box.
3. **Orientation Check:**
   - Determines if the motorcycle is front-facing or rear-facing.
   - Flags a "Wrong Lane Violation" if the motorcycle is rear-facing.
4. **Face and Helmet Detection:**
   - Detects faces and helmets within the cropped image.
   - Counts the number of faces.
   - Reduces the face count if the detected face and helmet areas overlap by more than 60%.
5. **No Helmet Violation:**
   - Detects helmets again and counts them.
   - Flags a "No Helmet Violation" if no helmets are detected or if the number of faces is greater than 1.
6. **Triple Riding Violation:**
   - Sums up the final counts of helmets and faces.
   - Flags a "Triple Riding Violation" if the sum is greater than 2.
7. **License Plate Detection:**
   - If any violation is detected, captures the license plate using the [OCR.Space API](https://ocr.space/OCRAPI).
8. **Saving Violation Data:**
   - Saves the violated motorcycle image along with its license plate image and text.
   - Records the list of violations for each image.

In [1]:
import os
from PIL import Image, ImageDraw
import cv2
from datetime import datetime, timezone, timedelta
import requests
import json
import re
from tqdm import tqdm
from decouple import config
from inference_sdk import InferenceHTTPClient, InferenceConfiguration

In [15]:
OCR_SPACE_API='K83477182488957'
ROBOFLOW_API_KEY='BvPPy1sFAKaldwdqJk55'

# `ocr_space_file`

1. **Function Definition**: The function `ocr_space_file` takes four parameters:
   - `filename`: The path to the image file from which text will be extracted.
   - `overlay`: A boolean that indicates whether overlay information should be included in the response.
   - `api_key`: The API key required for authenticating requests to the OCR.Space API.
   - `language`: A string representing the language code for OCR, such as 'eng' for English.

2. **Payload Preparation**: A dictionary `payload` is created to hold the parameters required for the API request. This includes the overlay requirement, the API key, the language, and the OCR engine version.

3. **File Handling**: The image file specified by `filename` is opened in binary mode (`'rb'`). The file is then sent in a POST request to the OCR.Space API.

4. **Response Handling**: The response is decoded and converted from JSON format into a Python dictionary. It checks whether the `ParsedResults` key exists and if it contains results.

5. **Text Extraction**: The function retrieves lines of text from the response and concatenates them into a single string representing the license plate number.

6. **Text Cleaning**: The extracted text is cleaned using a regular expression to remove any characters that are not alphanumeric.

7. **Return Value**: The cleaned license plate number is returned. If no text is found, the function returns `None`.

In [2]:
def ocr_space_file(filename, overlay, api_key, language):
    payload = {
                'isOverlayRequired': overlay,
                'apikey': api_key,
                'language': language,
                'OCREngine': 2,
            }
    with open(filename, 'rb') as f:
        r = requests.post('https://api.ocr.space/parse/image',
                        files={filename: f},
                        data=payload,
                        )
    data = json.loads(r.content.decode())

    lines = data["ParsedResults"][0]["TextOverlay"]["Lines"]

    lpnum = "".join(line["LineText"] for line in lines)
    lpnum = re.sub(r'[^a-zA-Z0-9]', '', lpnum)
    
    return lpnum

# `draw_detections`

1. **Function Definition**: 
   - The function `draw_detections` takes in three prediction dictionaries (`p1`, `p2`, `p3`) and an image (`img`) as inputs.
   - Each prediction dictionary contains a list of detected objects, each represented as a dictionary with properties for position, size, class, and confidence.

2. **Class Color Mapping**:
   - A dictionary `class_colors` maps detection classes (like helmets and motorcyclists) to specific colors. This helps visually distinguish different types of objects in the image.

3. **Drawing Context**:
   - The function creates a drawing context using `ImageDraw.Draw(img)` from the Pillow library, allowing the function to draw shapes and text on the image.

4. **Combining Predictions**:
   - The predictions from all three input dictionaries are combined into a single list for easier processing.

5. **Iterating Through Predictions**:
   - The function loops over each prediction to extract its bounding box coordinates (`x`, `y`, `width`, and `height`).
   - It calculates the corners of the bounding box (`x1`, `y1`, `x2`, `y2`) based on the center position (`x`, `y`) and dimensions.

6. **Label Positioning**:
   - Depending on the class of the detected object, the function determines where to draw the label background rectangle (above or below the bounding box) and sets the label position accordingly.

7. **Drawing Bounding Boxes**:
   - A rectangle is drawn around the detected object using `draw.rectangle`, outlining the bounding box in the color corresponding to the object's class.

8. **Labeling**:
   - The label text, which includes the class name and confidence score (formatted to two decimal places), is drawn on the image using the `draw.text` method.

9. **Return Value**:
   - Finally, the modified image, with all detections visualized, is returned.

In [3]:
def draw_detections(p1, p2, p3, img):
    class_colors = {
        'helmet': 'blue',
        'motorcyclist': 'green',
        'license_plate': 'red',
        'face': 'darkmagenta',
        'front': 'darkgoldenrod',
        'rear': 'darkorchid'
    }
    
    draw = ImageDraw.Draw(img)

    preds = {'predictions': p1['predictions'] + p2['predictions'] + p3['predictions']}

    for prediction in preds['predictions']:
        x, y, width, height = (
            prediction['x'],
            prediction['y'],
            prediction['width'],
            prediction['height']
        )
        
        x1 = x - width / 2
        y1 = y - height / 2
        x2 = x + width / 2
        y2 = y + height / 2
        
        class_name = prediction['class']
        confidence = prediction['confidence']
        
        label_color = class_colors.get(class_name, 'black')

        if class_name=='motorcyclist':
            draw.rectangle([x1, y1, x2, y1+14], fill=label_color)
            label_position = (x1 + 5, y1 + 2)
        else:
            draw.rectangle([x1, y1-14, x2, y1], fill=label_color)
            label_position = (x1 + 5, y1-12)
            
        draw.rectangle([x1, y1, x2, y2], outline=label_color, width=2)

        label = f"{class_name} ({confidence:.2f})"
        draw.text(label_position,label, fill='white')

    return img

In [4]:
# Roboflow API keys
roboflow_api_key = config("ROBOFLOW_API_KEY")

In [5]:
custom_configuration = InferenceConfiguration(confidence_threshold=0.4, iou_threshold=0.4)
client1 = InferenceHTTPClient(
    api_url="https://detect.roboflow.com",
    api_key=roboflow_api_key
)
client1.configure(custom_configuration)

<inference_sdk.http.client.InferenceHTTPClient at 0x1514e6bd0>

In [6]:
custom_configuration = InferenceConfiguration(confidence_threshold=0.4, iou_threshold=0.3)
client2 = InferenceHTTPClient(
    api_url="https://detect.roboflow.com",
    api_key=roboflow_api_key
)
client2.configure(custom_configuration)

<inference_sdk.http.client.InferenceHTTPClient at 0x156428d10>

In [7]:
custom_configuration = InferenceConfiguration(confidence_threshold=0.1, iou_threshold=0.1)
client3 = InferenceHTTPClient(
    api_url="https://detect.roboflow.com",
    api_key=roboflow_api_key
)
client3.configure(custom_configuration)

<inference_sdk.http.client.InferenceHTTPClient at 0x157926f50>

In [8]:
video_path = 'image_239_jpg.rf.d30643c4a7f593d85fe0a063af8061f7.jpg'
cap = cv2.VideoCapture(video_path)

In [9]:
# Get video details
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = cap.get(5)
total_frames = int(cap.get(7))

In [10]:
# Violate Date folder
current_date = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=5, minutes=30))).strftime("%d-%m-%Y")
folder_path = os.path.join(os.getcwd(), f"Violations/{current_date}")
os.makedirs(folder_path, exist_ok=True)

## Video Frame Processing for Violations Detection

1. **Frame Extraction**: The code processes every 180th frame of the video to reduce computational load. It reads the video frame using OpenCV and converts it into a format compatible with the Pillow library for further processing.

2. **Model Inference**:
   - The frame is saved temporarily and analyzed by three different models:
     - **Helmet Detection**: Checks for the presence of helmets on the motorcyclist.
     - **Face Detection**: Identifies the faces of the motorcyclist and any passengers.
     - **Lane Detection**: Determines if the motorcyclist is riding in the correct lane.

3. **Violation Detection**:
   - After extracting predictions from the models, the code checks for various conditions:
     - If the motorcyclist is detected without a helmet.
     - If there are faces detected within the bounds of the motorcyclist's area.
     - If the motorcyclist is in the wrong lane as indicated by lane detection.
     - If the total number of detected individuals exceeds two (indicating triple riding).

4. **License Plate Recognition**:
   - If any violations are detected, the code looks for a license plate in the frame.
   - It crops the detected license plate region and uses an OCR service to extract the license plate number.
   - The results are saved along with the images of the motorcyclist and the license plate in a structured folder.

5. **File Management**: The code creates directories to organize the output based on the detected violations and cleans up temporary files after processing.

### Code Structure
- **Image Drawing Function**: The `draw_detections` function visually annotates the detections on the motorcyclist’s image, highlighting the detected classes (helmet, face, etc.) and their confidence scores.
- **Detection Logic**: Nested loops handle detection logic, ensuring correct identification and preventing overlap between detected faces and helmets.


In [13]:
# Process every 60th frame
for frame_number in tqdm(range(0, total_frames, 180), desc="Processing frames", unit="frames"):
    ret, frame = cap.read()
    if not ret:
        break

    pil_frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    image_path = "temp_frame.jpg"
    pil_frame.save(image_path)

    r1 = client1.infer(image_path, model_id="helmet-detection-project/13")
    pred1 = r1['predictions']

    for pr1 in pred1:
        helmet_detected = False
        face_detected = False
        rear_detected = False
        more_than_two_detected = False
        num_faces_detected = 0
        num_helmets_detected = 0

        if pr1['class'] == 'motorcyclist':
            motorcyclist_x, motorcyclist_y, motorcyclist_width, motorcyclist_height = pr1['x'], pr1['y'], pr1['width'], pr1['height']
            
            motorcyclist_x1, motorcyclist_y1 = int(motorcyclist_x - motorcyclist_width / 2), int(motorcyclist_y - motorcyclist_height / 2)
            motorcyclist_x2, motorcyclist_y2 = int(motorcyclist_x + motorcyclist_width / 2), int(motorcyclist_y + motorcyclist_height / 2)
            
            motorcyclist_image = pil_frame.crop((motorcyclist_x1, motorcyclist_y1, motorcyclist_x2, motorcyclist_y2))
            motorcyclist_image.save("temp_motorcyclist_image.jpg")

            # Lane check
            r3 = client3.infer("temp_motorcyclist_image.jpg", model_id="two-wheeler-lane-detection/3")
            lane = r3

            if lane['predictions']:
                max_conf = max(lane['predictions'], key=lambda x: x['confidence'])
                lane['predictions'] = [max_conf]
            
            pred3 = lane['predictions']
            
            for lane_prediction in pred3:
                if lane_prediction['class'] == 'rear':
                    rear_x, rear_y, rear_width, rear_height = lane_prediction['x'], lane_prediction['y'], lane_prediction['width'], lane_prediction['height']

                    if motorcyclist_x1 < rear_x < motorcyclist_x2 and motorcyclist_y1 < rear_y < motorcyclist_y2:
                        rear_detected = True
                        break

            # Face detected
            r2 = client2.infer("temp_motorcyclist_image.jpg", model_id="face-detection-mik1i/21")
            pred2 = r2['predictions']

            for face_prediction in pred2:
                if face_prediction['class'] == 'face':
                    face_x, face_y, face_width, face_height = face_prediction['x'], face_prediction['y'], face_prediction['width'], face_prediction['height']

                    if motorcyclist_x1 < face_x < motorcyclist_x2 and motorcyclist_y1 < face_y < motorcyclist_y2:
                        num_faces_detected += 1

                        # Avoid detecting helmet and face in same area and calculating number of people incorrectly
                        for helmet_prediction in pred1:
                            if helmet_prediction['class'] == 'helmet':
                                helmet_x, helmet_y, helmet_width, helmet_height = helmet_prediction['x'], helmet_prediction['y'], helmet_prediction['width'], helmet_prediction['height']
                                
                                face_x1 = face_x - face_width / 2
                                face_y1 = face_y - face_height / 2
                                face_x2 = face_x + face_width / 2
                                face_y2 = face_y + face_height / 2

                                helmet_x1 = helmet_x - helmet_width / 2
                                helmet_y1 = helmet_y - helmet_height / 2
                                helmet_x2 = helmet_x + helmet_width / 2
                                helmet_y2 = helmet_y + helmet_height / 2

                                overlap_x1 = max(face_x, helmet_x)
                                overlap_y1 = max(face_y, helmet_y)
                                overlap_x2 = min(face_x + face_width, helmet_x + helmet_width)
                                overlap_y2 = min(face_y + face_height, helmet_y + helmet_height)

                                overlap_width = max(0, overlap_x2 - overlap_x1)
                                overlap_height = max(0, overlap_y2 - overlap_y1)

                                overlap_area = overlap_width * overlap_height

                                face_area = face_width * face_height

                                if overlap_area / face_area > 0.6:
                                    num_faces_detected -= 1
                                    break

            if num_faces_detected > 0:
                face_detected = True

            # Helmet check
            for helmet_prediction in pred1:
                if helmet_prediction['class'] == 'helmet':
                    helmet_x, helmet_y, helmet_width, helmet_height = helmet_prediction['x'], helmet_prediction['y'], helmet_prediction['width'], helmet_prediction['height']

                    if motorcyclist_x1 < helmet_x < motorcyclist_x2 and motorcyclist_y1 < helmet_y < motorcyclist_y2:
                        helmet_detected = True
                        num_helmets_detected += 1

            # More than two riding
            if num_faces_detected + num_helmets_detected > 2:
                more_than_two_detected = True

            # r4 = m1.predict("temp_motorcyclist_image.jpg", confidence=60, overlap=40)
            r4 = client1.infer("temp_motorcyclist_image.jpg", model_id="helmet-detection-project/13")
            colored_motorcycle = draw_detections(r4, r2, lane, motorcyclist_image)
            
            # Violated license plate
            if not helmet_detected or face_detected or rear_detected or more_than_two_detected:
                
                violation_names = []
                if not helmet_detected or face_detected:
                    violation_names.append('no_helmet')
                if rear_detected:
                    violation_names.append('wrong_lane')
                if more_than_two_detected:
                    violation_names.append('triple_riding')

                timestamp = datetime.now(timezone.utc).astimezone(timezone(timedelta(hours=5, minutes=30))).strftime("%d-%m-%Y %H %M %S")
                image_name = ", ".join(violation_names) + f" - {timestamp}"
                lp_detected = False

                for pr11 in pred1:
                    if pr11['class'] == 'license_plate':
                        license_plate_x, license_plate_y, license_plate_width, license_plate_height = pr11['x'], pr11['y'], pr11['width'], pr11['height']
                        if motorcyclist_x1 < license_plate_x < motorcyclist_x2 and motorcyclist_y1 < license_plate_y < motorcyclist_y2:
                            license_plate_x1, license_plate_y1 = int(license_plate_x - license_plate_width / 2), int(license_plate_y - license_plate_height / 2)
                            license_plate_x2, license_plate_y2 = int(license_plate_x + license_plate_width / 2), int(license_plate_y + license_plate_height / 2)
            
                            license_plate_image = pil_frame.crop((license_plate_x1, license_plate_y1, license_plate_x2, license_plate_y2))
                            
                            license_plate_image.save("temp_lp.jpg")
                            lpnum = ocr_space_file(filename="temp_lp.jpg", overlay=False, api_key=config("OCR_SPACE_API"), language='eng')   

                            if lpnum.strip():
                                image_name = lpnum + " - " + image_name
                            else:
                                image_name = image_name
                            image_folder_path = os.path.join(folder_path, image_name)
                            os.makedirs(image_folder_path, exist_ok=True)

                            violated_motorcycle_image_path = os.path.join(image_folder_path, f"{lpnum} - motorcyclist.jpg")
                            colored_motorcycle.save(violated_motorcycle_image_path)

                            violated_motorcycle_lp_image_path = os.path.join(image_folder_path, f"{lpnum} - license_plate.jpg")
                            license_plate_image.save(violated_motorcycle_lp_image_path)

                            lp_text_path = os.path.join(image_folder_path, f"{lpnum} - license_plate_number.txt")
                            with open(lp_text_path, 'w') as file:
                                file.write(f"Violated License Plate Number - {lpnum}")

                            lp_detected = True

                            if os.path.exists("temp_lp.jpg"):
                                os.remove("temp_lp.jpg")
                            break

                if not lp_detected:
                    image_folder_path = os.path.join(folder_path, image_name)
                    os.makedirs(image_folder_path, exist_ok=True)
                    violated_motorcycle_image_path = os.path.join(image_folder_path, f"motorcyclist.jpg")

                    colored_motorcycle.save(violated_motorcycle_image_path)


if os.path.exists("temp_motorcyclist_image.jpg"):
    os.remove("temp_motorcyclist_image.jpg")
cap.release()

print("Video processing completed.")

Processing frames:   0%|          | 0/1 [00:00<?, ?frames/s]

Video processing completed.





---