In [1]:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

^C
Note: you may need to restart the kernel to use updated packages.


## Training 



your_project/
│
├── data/
│   └── datasets.py    # Contains: class LaneDataset(Dataset): ...
│
├── models/
│   └── unet.py        # Contains: def create_unet_model(): ...
│
├── train.py           # Your training script (where you write: from data.datasets import LaneDataset)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from data.datasets import LaneDataset  # we need to implement this
from models.unet import create_unet_model  # function to create the model
import segmentation_models_pytorch as smp

# Hyperparameters
batch_size = 8
learning_rate = 0.001
num_epochs = 30
image_size = (256, 256)

# Dataset and DataLoader
train_dataset = LaneDataset('path/to/train', image_size=image_size, augment=True)
val_dataset = LaneDataset('path/to/val', image_size=image_size, augment=False)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = smp.Unet(
    encoder_name="mobilenet_v2",
    encoder_weights="imagenet",
    in_channels=3,
    classes=1,
    activation='sigmoid'
)
model = model.to(device)

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, masks in train_loader:
        images = images.to(device)
        masks = masks.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, masks)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

    # Validation
    model.eval()
    with torch.no_grad():
        # ... validation code ...

# Save the model
torch.save(model.state_dict(), 'lane_detection_model.pth')

## Datasets.py

In [None]:
import os
import json
import cv2
import numpy as np
from torch.utils.data import Dataset

class LaneDataset(Dataset):
    def __init__(self, root, image_size=(256, 256), augment=False):
        self.root = root
        self.image_size = image_size
        self.augment = augment

        # Collect all json files in the root
        self.annotations = []
        for file in os.listdir(root):
            if file.endswith('.json'):
                with open(os.path.join(root, file)) as f:
                    for line in f:
                        data = json.loads(line)
                        self.annotations.append(data)

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        data = self.annotations[idx]
        image_path = os.path.join(self.root, data['raw_file'])
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Create mask
        mask = np.zeros((image.shape[0], image.shape[1]), dtype=np.uint8)
        for lane in data['lanes']:
            points = list(zip(lane, data['h_samples']))
            points = [(x, y) for x, y in points if x >= 0]
            if len(points) < 2:
                continue
            points = np.array(points, dtype=np.int32)
            cv2.polylines(mask, [points], isClosed=False, color=1, thickness=5)

        # Resize image and mask
        image = cv2.resize(image, self.image_size)
        mask = cv2.resize(mask, self.image_size)

        # Normalize image
        image = image / 255.0
        mask = mask / 255.0  # mask will be 0 or 1

        # Augmentations (flip, rotate, etc.)
        if self.augment:
            # Example: horizontal flip
            if np.random.rand() < 0.5:
                image = np.fliplr(image).copy()
                mask = np.fliplr(mask).copy()

        # Convert to tensor
        image = torch.from_numpy(image).permute(2, 0, 1).float()
        mask = torch.from_numpy(mask).unsqueeze(0).float()

        return image, mask

## lane_detector.py

In [None]:
import cv2
import torch
import numpy as np
from models.unet import create_unet_model  # or directly use the model definition

class LaneDetector:
    def __init__(self, model_path, image_size=(256, 256)):
        self.image_size = image_size
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = smp.Unet(
            encoder_name="mobilenet_v2",
            encoder_weights=None,  # we are loading our own weights
            in_channels=3,
            classes=1,
            activation='sigmoid'
        )
        self.model.load_state_dict(torch.load(model_path, map_location=self.device))
        self.model.to(self.device)
        self.model.eval()

    def detect(self, image):
        # Preprocess
        original_size = image.shape[:2]
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, self.image_size)
        image = image / 255.0
        image = torch.from_numpy(image).permute(2, 0, 1).float().unsqueeze(0).to(self.device)

        with torch.no_grad():
            output = self.model(image)
        output = output.squeeze().cpu().numpy()
        output = (output > 0.5).astype(np.uint8)
        output = cv2.resize(output, (original_size[1], original_size[0]))
        return output

## controller.py

In [1]:
class PIDController:
    def __init__(self, kp, ki, kd):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.integral = 0
        self.prev_error = 0

    def compute(self, error, dt):
        self.integral += error * dt
        derivative = (error - self.prev_error) / dt
        output = self.kp * error + self.ki * self.integral + self.kd * derivative
        self.prev_error = error
        return output

## main.py

In [None]:
import cv2
from inference.lane_detector import LaneDetector
from inference.controller import PIDController

# Initialize lane detector and controller
detector = LaneDetector('lane_detection_model.pth')
pid = PIDController(kp=0.1, ki=0.0, kd=0.0)

cap = cv2.VideoCapture(0)  # webcam

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Detect lanes
    mask = detector.detect(frame)

    # Find lane points in the mask
    # We can use a histogram to find the base of the lane, then use sliding window to extract points.
    # Alternatively, we can use Hough transform or just take the non-zero points.

    # Simple method: take the bottom half of the mask and find the center of the lane.
    height, width = mask.shape
    half_height = height // 2
    bottom_half = mask[half_height:, :]

    # Find the x coordinates of the white pixels
    points = np.argwhere(bottom_half > 0)
    if len(points) > 0:
        # We'll fit a line to these points to get the lane center at the bottom
        x = points[:, 1]
        y = points[:, 0] + half_height  # adjust y coordinate

        # Fit a polynomial (linear for simplicity)
        coeffs = np.polyfit(y, x, 1)
        # Get the x value at the bottom of the image
        x_bottom = np.polyval(coeffs, height-1)

        # The center of the lane at the bottom is x_bottom
        # The center of the image is width/2
        error = (width/2 - x_bottom) / width  # normalized error

        # Use PID controller to compute steering angle
        dt = 1/30  # assuming 30 fps
        steering_angle = pid.compute(error, dt)

        # Clamp steering angle to [-1, 1] for example
        steering_angle = np.clip(steering_angle, -1, 1)

        # You can then send this steering_angle to the vehicle's control system.

    # Display the mask and frame
    cv2.imshow('Frame', frame)
    cv2.imshow('Mask', mask*255)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()