In [12]:
!pip install langchain langchain_community unstructured sentence_transformers tiktoken chromadb langchain_chroma langchain_groq
!pip install pypdf
!pip install openai
!pip install --upgrade huggingface_hub
!pip install langchain openai faiss-cpu tiktoken
!pip install ultralytics
!pip install pdfplumber
!pip install fpdf
!pip install inference-sdk




In [13]:
video_path = "/content/rear_collision_2.mp4"

In [14]:
import os
import cv2
import numpy as np
from inference_sdk import InferenceHTTPClient
from ultralytics import YOLO
from openai import OpenAI
import torch
import base64
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
import pdfplumber
import openai
import re


client = OpenAI(api_key="USE YOUR OWN OPENAI API KEY")

vehicle_detection_client = InferenceHTTPClient(
    api_url="https://detect.roboflow.com",
    api_key="A4Vt8QfBWpfdHpRiPcQo"
)
Accident_model = YOLO('/content/best (1).pt')


In [15]:
def calculate_iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    return interArea / float(boxAArea + boxBArea - interArea)


In [16]:
output_path = "/content/last_output.mp4"
frames_dir = "/content/frames"
if not os.path.exists(frames_dir):
    os.makedirs(frames_dir)

cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))


In [17]:
frame_count = 0
next_id = 1
tracked_objects = {}

highest_confidence = 0.0
highest_confidence_frame = None

BATCH_SIZE = 16
frames_buffer = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frames_buffer.append(frame)
    if len(frames_buffer) == BATCH_SIZE or not ret:
        processed_frames = []

        for frame in frames_buffer:
            img_path = os.path.join(frames_dir, f"frame_{frame_count}.jpg")
            cv2.imwrite(img_path, frame)
            result = vehicle_detection_client.infer(img_path, model_id="vehicle-detection-in-different-environmental-conditions-l6mgz-ufqld-xespf/3")

            current_detections = []
            if result.get("predictions"):
                for obj in result["predictions"]:
                    x_center = int(obj["x"])
                    y_center = int(obj["y"])
                    width = int(obj["width"])
                    height = int(obj["height"])
                    x1 = x_center - width // 2
                    y1 = y_center - height // 2
                    x2 = x_center + width // 2
                    y2 = y_center + height // 2
                    current_detections.append([x1, y1, x2, y2])

            new_tracked_objects = {}
            for detection in current_detections:
                assigned_id = None
                for obj_id, obj_box in tracked_objects.items():
                    iou = calculate_iou(detection, obj_box)
                    if iou > 0.3:
                        assigned_id = obj_id
                        break
                if assigned_id is None:
                    assigned_id = next_id
                    next_id += 1
                new_tracked_objects[assigned_id] = detection
                x1, y1, x2, y2 = detection
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
                label = f"ID: {assigned_id}"
                cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            tracked_objects = new_tracked_objects

            results = Accident_model(frame, conf=0.7)
            blurred_frame = cv2.GaussianBlur(frame, (51, 51), 0)
            mask = np.zeros_like(frame, dtype=np.uint8)
            if results:
                for obj in results[0].boxes:
                    confidence = obj.conf
                    x1, y1, x2, y2 = obj.xyxy[0].int().tolist()

                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3)
                    cv2.rectangle(mask, (x1, y1), (x2, y2), (255, 255, 255), -1)

                    if confidence > highest_confidence:
                        highest_confidence = confidence
                        highest_confidence_frame = frame.copy()

            mask_inv = cv2.bitwise_not(mask)
            unblurred = cv2.bitwise_and(frame, mask)
            blurred_background = cv2.bitwise_and(blurred_frame, mask_inv)
            final_frame = cv2.add(blurred_background, unblurred)

            processed_frames.append(final_frame)

        for processed_frame in processed_frames:
            out.write(processed_frame)
            frame_count += 1

        frames_buffer = []

cap.release()
out.release()

if highest_confidence_frame is not None:
    highest_confidence_image_path = "/content/highest_confidence_accident_frame.jpg"
    cv2.imwrite(highest_confidence_image_path, highest_confidence_frame)
    print(f"Highest-confidence accident frame saved at: {highest_confidence_image_path}")




0: 256x416 (no detections), 7.7ms
Speed: 1.2ms preprocess, 7.7ms inference, 1.4ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 7.3ms
Speed: 1.7ms preprocess, 7.3ms inference, 0.6ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 7.1ms
Speed: 1.8ms preprocess, 7.1ms inference, 0.6ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 7.5ms
Speed: 1.8ms preprocess, 7.5ms inference, 0.6ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 9.4ms
Speed: 1.8ms preprocess, 9.4ms inference, 0.7ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 7.6ms
Speed: 1.4ms preprocess, 7.6ms inference, 0.8ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 7.3ms
Speed: 1.7ms preprocess, 7.3ms inference, 0.6ms postprocess per image at shape (1, 3, 256, 416)

0: 256x416 (no detections), 13.7ms
Speed: 1.8ms preprocess, 13.7ms inference, 0.8m

In [18]:
video = cv2.VideoCapture("/content/last_output.mp4")
base64Frames = []
while video.isOpened():
    success, frame = video.read()
    if not success:
        break
    _, buffer = cv2.imencode(".jpg", frame)
    base64Frames.append(base64.b64encode(buffer).decode("utf-8"))
video.release()

print(len(base64Frames), "frames read.")


160 frames read.


In [19]:
os.environ["OPENAI_API_KEY"] = "USE YOUR OWN OPENAI API KEY"


In [20]:
PROMPT_MESSAGES = [
    {
        "role": "user",
        "content": [
            """This is a video of a dashcam car accident, please analyze the video and describe how the accident happened by following these steps.
1-	Only provide real information extracted from the video.
2-	What’s your opinion of how the drivers drives there cars.
3-	You will get a procced video with bounding boxes and unique IDs for each car.
4-  always consider the dashcam car with ID 0.
4-	Follow the movement of the bounding boxes so you can understand the movement of cars.
5-	While describing the cars, refer to the cars based on the IDs.
6-	If there are any real extra details, you have to put them in the description.
7-	Avoid mention the frames and the car’s name or color.
8-	Do all the steps for all the videos I will provide you.
9-	Review your description and check if you missed something.
10-	Avoid replying  with anything except the description.
""",
            *map(lambda x: {"image": x, "resize": 768}, base64Frames[0::50]),
        ],
    },
]
params = {
    "model": "chatgpt-4o-latest",
    "messages": PROMPT_MESSAGES,
    "max_tokens": 500,
}
response = client.chat.completions.create(**params)

accident_description = response.choices[0].message.content
print("Accident Description:", accident_description)

Accident Description: Car ID 0 (dashcam vehicle) is following a line of vehicles on a two-lane road. The vehicle directly ahead of ID 0 is Car ID 1. The road is fairly narrow, with trees and some greenery on either side.

At a point in the video, Car ID 1 begins to slow down significantly, causing Car ID 0 to close the gap very quickly between the two. Car ID 0 does not appear to reduce speed sufficiently, resulting in a collision with the rear of Car ID 1.

In terms of driving behavior:
- The driver of Car ID 1 seems to be driving cautiously, slowing down for an unknown reason. This might suggest a defensive driving approach, though the sudden deceleration could indicate unawareness of the following traffic or something ahead that is not visible in the footage.
- The driver of Car ID 0 (the dashcam car) does not react quickly enough to Car ID 1’s reduction in speed, failing to maintain an appropriate braking distance. This lack of response or preparedness for potential hazards ahead l

In [21]:

pdf_path = '/content/cleaned_output_file.pdf'
extracted_text = ""

with pdfplumber.open(pdf_path) as pdf:
    for page in pdf.pages:
        extracted_text += page.extract_text()

cleaned_text_safe = extracted_text.encode('latin-1', 'replace').decode('latin-1')

text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
texts = text_splitter.split_text(cleaned_text_safe)

documents = [Document(page_content=t) for t in texts]

embeddings = OpenAIEmbeddings()

vectorstore = FAISS.from_documents(documents, embeddings)

retriever = vectorstore.as_retriever()
qa_chain = RetrievalQA.from_chain_type(
    llm=ChatOpenAI(model="gpt-4o"),
    chain_type="stuff",
    retriever=retriever
)


query = f"""
Based on the following accident description, please determine the percentage of error for each car involved in the incident:
{accident_description}
Provide the percentage of fault for each car and explain the reasoning behind your decision, citing the relevant traffic regulations or laws.

Make your reply short.
Reply on the following template.

Based on the traffic regulations we decide the following :

Dashcam car responsibility :
Other car responsibility :

Reasoning :
"""

response = qa_chain.invoke(query)

if isinstance(response, dict) and 'result' in response:
    result = response['result']

    car_id_1_fault = re.search(r'Car\s*ID\s*1\s*:\s*(\d+)%', result, re.IGNORECASE)
    car_id_2_fault = re.search(r'Car\s*ID\s*2\s*:\s*(\d+)%', result, re.IGNORECASE)
    source = re.search(r'(Article\s*\d+|Regulation\s*\d+|Section\s*\d+)', result, re.IGNORECASE)

    if car_id_1_fault and car_id_2_fault and source:
        print(f"Car ID 1 Error: {car_id_1_fault.group(1)}%")
        print(f"Car ID 2 Error: {car_id_2_fault.group(1)}%")
        print(f"Source: {source.group(1)}")
    else:
        print("Could not extract fault percentages or source from the response.")
        print("Raw result:", result)
else:
    print("Invalid response format:", response)


Could not extract fault percentages or source from the response.
Raw result: Based on the traffic regulations we decide the following :

Dashcam car responsibility : 100%
Other car responsibility : 0%

Reasoning : According to Article 60/2 and 60/2/1, a driver must maintain a safe following distance to prevent collisions. Car ID 0 failed to do so, directly causing the accident by not reacting appropriately to Car ID 1's deceleration. Car ID 1's cautious driving does not violate any regulations and does not contribute to the accident.


In [22]:
from fpdf import FPDF
from datetime import datetime

def clean_text_for_pdf(text):
    replacements = {
        '’': "'",
        '‘': "'",
        '“': '"',
        '”': '"',
        '–': '-',
        '—': '-',
        '…': '...',
        '\n': ' ',
    }
    for key, value in replacements.items():
        text = text.replace(key, value)
    return text

class PDF(FPDF):
    def header(self):
        self.image('/content/SDAIA logo.png', x=10, y=8, w=25)
        self.image('/content/Tuwaiq logo.png', x=170, y=8, w=25)
        self.set_font('Arial', 'B', 16)
        self.cell(0, 10, 'Technical Committee Decision', ln=True, align='C')
        self.ln(10)

    def footer(self):
        self.set_y(-15)
        self.set_font('Arial', 'I', 10)
        self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C')

def create_accident_report(decision_details, accident_image_path, output_path):
    pdf = PDF()
    pdf.add_page()

    pdf.set_font('Arial', '', 10)

    cleaned_decision_details = clean_text_for_pdf(decision_details)

    current_date = datetime.now().strftime('%B %d, %Y')
    pdf.cell(0, 10, f'Date: {current_date}', ln=True, align='L')
    pdf.ln(5)

    if accident_image_path:
        pdf.set_font('Arial', 'B', 12)
        pdf.cell(0, 10, 'Accident Photo', ln=True, align='L')
        pdf.ln(5)
        pdf.image(accident_image_path, x=10, w=100)
        pdf.ln(10)

    pdf.set_font('Arial', 'B', 12)
    pdf.cell(0, 10, 'Committee Decision Details', ln=True, align='L')
    pdf.set_font('Arial', '', 10)
    pdf.multi_cell(0, 10, cleaned_decision_details, align='L')
    pdf.ln(10)

    pdf.cell(0, 10, 'If you disagree with the decision, you may submit an appeal.', ln=True, align='L')

    pdf.output(output_path)
    print(f"PDF created at: {output_path}")

output_pdf = "/content/accident_report.pdf"
accident_image = "/content/highest_confidence_accident_frame.jpg"


create_accident_report(result, accident_image, output_pdf)

PDF created at: /content/accident_report.pdf
