In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import cv2
import os
from sklearn.model_selection import train_test_split

# Set random seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

# 1. Dataset Preparation
def load_culane_data(data_dir, img_size=(288, 800)):
    """
    Load and preprocess CULane dataset images and lane annotations.
    Args:
        data_dir: Path to CULane dataset directory
        img_size: Target image size (height, width)
    Returns:
        images: List of preprocessed images
        masks: List of corresponding lane masks
    """
    images = []
    masks = []
    
    # Assuming dataset structure: data_dir/images and data_dir/lanes
    img_dir = os.path.join(data_dir, 'images')
    mask_dir = os.path.join(data_dir, 'lanes')
    
    for img_file in os.listdir(img_dir):
        if img_file.endswith('.jpg'):
            # Load image
            img_path = os.path.join(img_dir, img_file)
            img = cv2.imread(img_path)
            img = cv2.resize(img, img_size[::-1])  # Resize to (width, height)
            img = img / 255.0  # Normalize to [0, 1]
            
            # Load corresponding mask
            mask_file = img_file.replace('.jpg', '.png')
            mask_path = os.path.join(mask_dir, mask_file)
            if os.path.exists(mask_path):
                mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
                mask = cv2.resize(mask, img_size[::-1])  # Resize mask
                mask = (mask > 0).astype(np.float32)  # Binarize mask
                mask = np.expand_dims(mask, axis=-1)  # Add channel dimension
                
                images.append(img)
                masks.append(mask)
    
    return np.array(images), np.array(masks)

# 2. Model Development: Simplified SCNN-inspired Model
def build_lane_detection_model(input_shape=(288, 800, 3)):
    """
    Build a CNN model for lane detection inspired by SCNN.
    Args:
        input_shape: Shape of input images (height, width, channels)
    Returns:
        model: Compiled Keras model
    """
    inputs = layers.Input(shape=input_shape)
    
    # Encoder: Convolutional layers
    x = layers.Conv2D(64, (3, 3), padding='same', activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)  # 144x400
    
    x = layers.Conv2D(128, (3, 3), padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)  # 72x200
    
    x = layers.Conv2D(256, (3, 3), padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((2, 2))(x)  # 36x100
    
    # Spatial CNN-like layer: Simplified message passing
    x = layers.Conv2D(256, (3, 3), padding='same', activation='relu')(x)
    x = layers.Conv2D(256, (1, 9), padding='same', activation='relu')(x)  # Horizontal message passing
    x = layers.Conv2D(256, (9, 1), padding='same', activation='relu')(x)  # Vertical message passing
    
    # Decoder: Upsampling
    x = layers.Conv2DTranspose(128, (3, 3), strides=(2, 2), padding='same', activation='relu')(x)  # 72x200
    x = layers.BatchNormalization()(x)
    
    x = layers.Conv2DTranspose(64, (3, 3), strides=(2, 2), padding='same', activation='relu')(x)  # 144x400
    x = layers.BatchNormalization()(x)
    
    x = layers.Conv2DTranspose(32, (3, 3), strides=(2, 2), padding='same', activation='relu')(x)  # 288x800
    x = layers.BatchNormalization()(x)
    
    # Output layer: Binary segmentation
    outputs = layers.Conv2D(1, (1, 1), padding='same', activation='sigmoid')(x)
    
    model = models.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                 loss='binary_crossentropy',
                 metrics=['accuracy'])
    
    return model

# 3. Training the Model
def train_model(model, images, masks, batch_size=8, epochs=10, val_split=0.2):
    """
    Train the lane detection model.
    Args:
        model: Compiled Keras model
        images: Preprocessed images
        masks: Corresponding lane masks
        batch_size: Batch size for training
        epochs: Number of training epochs
        val_split: Fraction of data for validation
    """
    # Split data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(images, masks, test_size=val_split, random_state=42)
    
    # Data augmentation
    data_gen = tf.keras.preprocessing.image.ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    )
    
    # Train the model
    model.fit(
        data_gen.flow(X_train, y_train, batch_size=batch_size),
        steps_per_epoch=len(X_train) // batch_size,
        validation_data=(X_val, y_val),
        epochs=epochs,
        verbose=1
    )
    
    return model

# 4. Evaluation
def evaluate_model(model, images, masks):
    """
    Evaluate the model on test data.
    Args:
        model: Trained Keras model
        images: Test images
        masks: Ground truth masks
    Returns:
        loss, accuracy: Evaluation metrics
    """
    loss, accuracy = model.evaluate(images, masks, verbose=0)
    print(f"Test Loss: {loss:.4f}")
    print(f"Test Accuracy: {accuracy:.4f}")
    
    # Additional metrics: IoU
    predictions = model.predict(images)
    predictions = (predictions > 0.5).astype(np.float32)
    intersection = np.logical_and(masks, predictions).sum()
    union = np.logical_or(masks, predictions).sum()
    iou = intersection / (union + 1e-7)
    print(f"Mean IoU: {iou:.4f}")
    
    return loss, accuracy, iou

# 5. Main Execution
if __name__ == "__main__":
    # Dataset path (update with your local path to CULane dataset)
    DATA_DIR = "/path/to/culane/dataset"
    
    # Load and preprocess data
    print("Loading CULane dataset...")
    images, masks = load_culane_data(DATA_DIR)
    print(f"Loaded {len(images)} images and masks.")
    
    # Build model
    print("Building model...")
    model = build_lane_detection_model()
    model.summary()
    
    # Train model
    print("Training model...")
    model = train_model(model, images, masks, batch_size=8, epochs=10)
    
    # Evaluate model
    print("Evaluating model...")
    loss, accuracy, iou = evaluate_model(model, images, masks)
    
    # Save model
    model.save("lane_detection_model.h5")
    print("Model saved as lane_detection_model.h5")