In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.200-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.17-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F 
import torchvision.transforms as transforms
import cv2
import numpy as np
from PIL import Image
from collections import deque
from ultralytics import YOLO

In [2]:
# --- 1. Configuration ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CHECKPOINT_PATH = "/kaggle/input/new-trained-models/trained_model/best_model.pth.tar"
VIDEO_PATH = "/kaggle/input/test-clips/clip4.mp4"
OUTPUT_VIDEO_PATH = "/kaggle/working/final-object-output-clip4.mp4"

In [3]:
IMG_HEIGHT, IMG_WIDTH = 288, 512
THRESHOLD = 0.4
USE_IMAGENET_NORM = True
SMOOTHING_WINDOW = 7 # Frames to average over

In [4]:
# ===================== U-Net Model =====================
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
        super(UNet, self).__init__()
        self.downs = nn.ModuleList()
        self.ups = nn.ModuleList()
        self.pool = nn.MaxPool2d(2, 2)

        # Downsampling
        for feature in features:
            self.downs.append(self.double_conv(in_channels, feature))
            in_channels = feature

        # Bottleneck
        self.bottleneck = self.double_conv(features[-1], features[-1] * 2)

        # Upsampling
        for feature in reversed(features):
            self.ups.append(nn.ConvTranspose2d(feature * 2, feature, kernel_size=2, stride=2))
            self.ups.append(self.double_conv(feature * 2, feature))

        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]

        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx // 2]
            if x.shape != skip_connection.shape:
                x = F.interpolate(x, size=skip_connection.shape[2:])
            x = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx + 1](x)

        return self.final_conv(x)

    @staticmethod
    def double_conv(in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

In [5]:
# --- MODIFIED: Lane History now stores and averages polynomial fits ---
class LaneHistory:
    def __init__(self, queue_size=SMOOTHING_WINDOW, tolerance=10):
        self.left_fit = deque(maxlen=queue_size)
        self.right_fit = deque(maxlen=queue_size)
        self.missed_frames = 0
        self.tolerance = tolerance

    def add_fit(self, left_fit, right_fit):
        if left_fit is None or right_fit is None:
            self.missed_frames += 1
        else:
            self.left_fit.append(left_fit)
            self.right_fit.append(right_fit)
            self.missed_frames = 0

    def get_average_fit(self):
        if self.missed_frames > self.tolerance:
            self.left_fit.clear()
            self.right_fit.clear()
            return None, None
        if not self.left_fit or not self.right_fit:
            return None, None
            
        avg_left_fit = np.mean(self.left_fit, axis=0)
        avg_right_fit = np.mean(self.right_fit, axis=0)
        return avg_left_fit, avg_right_fit

In [6]:
# --- Load Model (same as your version) ---
print("=> Loading trained model...")
model = UNet(in_channels=3, out_channels=1).to(DEVICE)
checkpoint = torch.load(CHECKPOINT_PATH, map_location=DEVICE)
model.load_state_dict(checkpoint["state_dict"])
model.eval()
print("✅ Model loaded successfully!")

=> Loading trained model...
✅ Model loaded successfully!


In [7]:
# --- Load Object Detection Model (YOLOv8) ---
print("=> Loading YOLOv8 object detection model...")
yolo_model = YOLO("yolov8s.pt") 
print("✅ YOLOv8 model loaded successfully!")


=> Loading YOLOv8 object detection model...
✅ YOLOv8 model loaded successfully!


In [8]:
# --- Preprocessing ---
transform_list = [transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.ToTensor()]
if USE_IMAGENET_NORM:
    transform_list.append(transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                               std=[0.229, 0.224, 0.225]))
transform = transforms.Compose(transform_list)

In [9]:
# --- Perspective Transform ---
def get_perspective_transform(frame_w, frame_h):
    src = np.float32([
        [frame_w*0.45, frame_h*0.6],
        [frame_w*0.55, frame_h*0.6],
        [frame_w*0.1, frame_h*0.95],
        [frame_w*0.9, frame_h*0.95]
    ])
    dst = np.float32([
        [frame_w*0.25, 0],
        [frame_w*0.75, 0],
        [frame_w*0.25, frame_h],
        [frame_w*0.75, frame_h]
    ])
    return cv2.getPerspectiveTransform(src, dst), cv2.getPerspectiveTransform(dst, src)


In [10]:
def fit_polynomial(mask):
    h, w = mask.shape
    ys, xs = np.where(mask > 0)
    if len(ys) < 100:
        return None, None
    midpoint = w // 2
    left_ys, left_xs = ys[xs < midpoint], xs[xs < midpoint]
    right_ys, right_xs = ys[xs >= midpoint], xs[xs >= midpoint]
    left_fit, right_fit = None, None
    if len(left_ys) > 100:
        left_fit = np.polyfit(left_ys, left_xs, 2)
    if len(right_ys) > 100:
        right_fit = np.polyfit(right_ys, right_xs, 2)
    return left_fit, right_fit

In [11]:
# --- Main Video Loop (MODIFIED to use smoothing) ---
cap = cv2.VideoCapture(VIDEO_PATH)
frame_w, frame_h = int(cap.get(3)), int(cap.get(4))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fps = fps if fps > 0 else 30
out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_w, frame_h))

M, Minv = get_perspective_transform(frame_w, frame_h)
lane_history = LaneHistory()

print("\nProcessing video with temporal smoothing...")
frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # print(f"Processing frame {frame_count}...")

    # --- Preprocess + Predict (same as before) ---
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image_tensor = transform(pil_image).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        pred = torch.sigmoid(model(image_tensor))
        pred = (pred > THRESHOLD).float()
    pred_mask = pred.squeeze().cpu().numpy()
    pred_mask = cv2.resize(pred_mask, (frame_w, frame_h), interpolation=cv2.INTER_NEAREST)

    # print("Prediction done.")

    # --- Warp, Fit, and Smooth ---
    warped_mask = cv2.warpPerspective(pred_mask, M, (frame_w, frame_h))
    current_left_fit, current_right_fit = fit_polynomial(warped_mask)

    # print(f"  - Current fits: L={current_left_fit is not None}, R={current_right_fit is not None}")

    lane_history.add_fit(current_left_fit, current_right_fit)
    avg_left_fit, avg_right_fit = lane_history.get_average_fit()

    # print(f"  - Average fits: L={avg_left_fit is not None}, R={avg_right_fit is not None}")

    # --- Draw overlay and warnings using the SMOOTHED fit ---
    final_frame = frame.copy()
    y_range = np.linspace(0, frame_h - 1, frame_h)

    overlay = frame.copy()
    lane_detected = False

    if avg_left_fit is not None:
        left_avg_x = avg_left_fit[0]*y_range**2 + avg_left_fit[1]*y_range + avg_left_fit[2]
        pts_left = np.array([np.transpose(np.vstack([left_avg_x, y_range]))]).astype(np.float32)
        unwarped_left = cv2.perspectiveTransform(pts_left, Minv).astype(np.int32)
        cv2.polylines(overlay, [unwarped_left], isClosed=False, color=(0, 255, 255), thickness=8)
        lane_detected = True

    if avg_right_fit is not None:
        right_avg_x = avg_right_fit[0]*y_range**2 + avg_right_fit[1]*y_range + avg_right_fit[2]
        pts_right = np.array([np.transpose(np.vstack([right_avg_x, y_range]))]).astype(np.float32)
        unwarped_right = cv2.perspectiveTransform(pts_right, Minv).astype(np.int32)
        cv2.polylines(overlay, [unwarped_right], isClosed=False, color=(255, 255, 0), thickness=8)
        lane_detected = True

    if avg_left_fit is not None and avg_right_fit is not None:
        # Draw filled polygon between lanes
        pts_left = np.array([np.transpose(np.vstack([left_avg_x, y_range]))])
        pts_right = np.array([np.flipud(np.transpose(np.vstack([right_avg_x, y_range])))])
        pts = np.hstack((pts_left, pts_right)).astype(np.float32)
        unwarped_pts = cv2.perspectiveTransform(pts, Minv).astype(np.int32)
        cv2.fillPoly(overlay, [unwarped_pts], (0, 255, 0))

        # Lane departure warning
        y_bottom = frame_h - 1
        left_x_bottom = np.polyval(avg_left_fit, y_bottom)
        right_x_bottom = np.polyval(avg_right_fit, y_bottom)
        lane_center = (left_x_bottom + right_x_bottom) / 2
        deviation = abs(frame_w/2 - lane_center)
        lane_width = abs(right_x_bottom - left_x_bottom)

        warning_text = "Safe" if deviation <= lane_width * 0.35 else "⚠ Lane Departure!"
        color = (0, 0, 255) if "⚠" in warning_text else (0, 255, 0)
    elif lane_detected:
        # Only one lane detected → partial support
        warning_text = "Partial Lane Detected"
        color = (0, 165, 255)  # orange
    else:
        warning_text = "WARNING: Lane Lost!"
        color = (0, 0, 255)
    


    # Blend overlay
    final_frame = cv2.addWeighted(frame, 1, overlay, 0.4 if lane_detected else 0, 0)
        # --- C. OBSTACLE DETECTION ---
    obstacle_results = yolo_model.predict(frame, imgsz=640, conf=0.4, verbose=False)

    for r in obstacle_results:
        for box in r.boxes:
            class_name = yolo_model.names[int(box.cls[0])]

            if class_name in ['car', 'truck', 'bus', 'motorcycle']:
                x1, y1, x2, y2 = map(int, box.xyxy[0])

                if avg_left_fit is not None and avg_right_fit is not None:
                    # Take bottom-center of the vehicle in image space
                    vehicle_bottom_center = np.array([[(x1 + x2) / 2, y2]], dtype=np.float32)

                    # Transform to bird’s-eye-view
                    vehicle_bev = cv2.perspectiveTransform(vehicle_bottom_center[None, :, :], M)
                    vx, vy = int(vehicle_bev[0][0][0]), int(vehicle_bev[0][0][1])

                    # Lane boundaries at the vehicle’s y-coordinate
                    left_x = np.polyval(avg_left_fit, vy)
                    right_x = np.polyval(avg_right_fit, vy)

                    # Check lane occupancy
                    if vx < left_x or vx > right_x:
                        # Vehicle is outside ego lane → obstacle in adjacent lane
                        cv2.rectangle(final_frame, (x1, y1), (x2, y2), (255, 165, 0), 2) # Orange
                        cv2.putText(final_frame, f"OBSTACLE: {class_name.upper()}",
                                    (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                                    0.9, (255, 165, 0), 2)
                    else:
                        # Optional: draw vehicles in ego lane with green box
                        cv2.rectangle(final_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(final_frame, class_name, (x1, y1 - 10),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # Draw warning box + text
    cv2.rectangle(final_frame, (40, 15), (420, 70), (0, 0, 0), -1)
    cv2.putText(final_frame, warning_text, (50, 55),
                cv2.FONT_HERSHEY_SIMPLEX, 1.3, color, 3, cv2.LINE_AA)

    # print("Overlay and warnings drawn.\n")
    out.write(final_frame)
    # print("Frame written to output.")
    frame_count += 1

cap.release()
out.release()
print(f"✅ Final video processing complete.")



Processing video with temporal smoothing...
✅ Final video processing complete.
