In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import pickle
from sklearn.model_selection import train_test_split
import os

# Load and preprocess data
def load_data():
    with open("/content/drive/MyDrive/full_CNN_train.p", "rb") as f:
        train_images = pickle.load(f)
    with open("/content/drive/MyDrive/full_CNN_labels.p", "rb") as f:
        labels = pickle.load(f)

    X = np.array(train_images, dtype=np.uint8)
    y = np.array(labels, dtype=np.uint8)

    # Normalize
    X = X.astype("float32") / 255.0
    y = y.astype("float32") / 255.0

    return train_test_split(X, y, test_size=0.2, random_state=42)

# Build the LaneNet model in Keras
def build_model():
    inputs = layers.Input(shape=(80, 160, 3))  # Adjust input size if needed

    x1 = layers.Conv2D(32, 3, padding="same", activation="relu")(inputs)
    p1 = layers.MaxPooling2D()(x1)

    x2 = layers.Conv2D(64, 3, padding="same", activation="relu")(p1)
    p2 = layers.MaxPooling2D()(x2)

    x3 = layers.Conv2D(128, 3, padding="same", activation="relu")(p2)

    u1 = layers.Conv2DTranspose(64, 2, strides=2, padding="same")(x3)
    c1 = layers.Concatenate()([u1, x2])
    x4 = layers.Conv2D(64, 3, padding="same", activation="relu")(c1)

    u2 = layers.Conv2DTranspose(32, 2, strides=2, padding="same")(x4)
    c2 = layers.Concatenate()([u2, x1])
    x5 = layers.Conv2D(32, 3, padding="same", activation="relu")(c2)

    x6 = layers.Conv2D(16, 3, padding="same", activation="relu")(x5)
    output = layers.Conv2D(1, 1, padding="same")(x6)  # No sigmoid (use loss)

    return models.Model(inputs, output)

# Train the model
def train():
    X_train, X_val, y_train, y_val = load_data()

    model = build_model()
    model.compile(optimizer='adam', loss=tf.keras.losses.BinaryCrossentropy(from_logits=True))

    model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=30, batch_size=32)
    model.save("lane_model.h5")

    return model

# Convert to TFLite
def convert_to_tflite(model):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    tflite_model = converter.convert()

    with open("lane_model.tflite", "wb") as f:
        f.write(tflite_model)

    print("✅ TFLite model saved as 'lane_model.tflite'")

# Run everything
model = train()
convert_to_tflite(model)


Epoch 1/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 81ms/step - loss: 0.2887 - val_loss: 0.0999
Epoch 2/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 56ms/step - loss: 0.0738 - val_loss: 0.0699
Epoch 3/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 57ms/step - loss: 0.0560 - val_loss: 0.0477
Epoch 4/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 57ms/step - loss: 0.0466 - val_loss: 0.0416
Epoch 5/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 58ms/step - loss: 0.0410 - val_loss: 0.0389
Epoch 6/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 59ms/step - loss: 0.0391 - val_loss: 0.0377
Epoch 7/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 58ms/step - loss: 0.0368 - val_loss: 0.0364
Epoch 8/30
[1m320/320[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 58ms/step - loss: 0.0364 - val_loss: 0.0356
Epoch 9/30
[1m320/320[



Saved artifact at '/tmp/tmp187ixxgm'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 80, 160, 3), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 80, 160, 1), dtype=tf.float32, name=None)
Captures:
  137824935276240: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935277584: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935280272: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935279120: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935279312: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935278160: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935276816: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935280080: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824935278736: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824928597584: TensorSpec(shape=(), dtype=tf.resource, name=None)
  137824

In [None]:
import cv2
import numpy as np
import tensorflow as tf
from moviepy.editor import VideoFileClip

# Load TFLite model
interpreter = tf.lite.Interpreter(model_path="lane_model.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_height = input_details[0]['shape'][1]
input_width = input_details[0]['shape'][2]

class LaneDetector:
    def __init__(self):
        self.recent_preds = []
        self.kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))

    def preprocess(self, frame):
        resized = cv2.resize(frame, (input_width, input_height))
        norm = resized.astype(np.float32) / 255.0
        return np.expand_dims(norm, axis=0)

    def postprocess(self, output, orig_shape):
        pred = output.squeeze()
        pred = 1 / (1 + np.exp(-pred))  # Apply sigmoid manually
        pred = (pred * 255).astype(np.uint8)
        pred = cv2.morphologyEx(pred, cv2.MORPH_OPEN, self.kernel)
        pred = cv2.GaussianBlur(pred, (5, 5), 0)

        _, mask = cv2.threshold(pred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

        contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        clean_mask = np.zeros_like(mask)
        for cnt in contours:
            if cv2.contourArea(cnt) > 100:
                cv2.drawContours(clean_mask, [cnt], -1, 255, -1)

        # Temporal smoothing
        self.recent_preds.append(clean_mask)
        if len(self.recent_preds) > 3:
            self.recent_preds.pop(0)
        avg_mask = np.mean(self.recent_preds, axis=0).astype(np.uint8)

        # Resize to original size
        overlay = np.zeros((input_height, input_width, 3), dtype=np.uint8)
        overlay[..., 1] = avg_mask  # Green channel

        overlay = cv2.resize(overlay, (orig_shape[1], orig_shape[0]), interpolation=cv2.INTER_LANCZOS4)

        return overlay

    def process_frame(self, frame):
        orig_shape = frame.shape
        input_tensor = self.preprocess(frame)

        interpreter.set_tensor(input_details[0]['index'], input_tensor)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details[0]['index'])

        overlay = self.postprocess(output, orig_shape)

        # Limit overlay to road area (optional)
        road_mask = cv2.inRange(cv2.cvtColor(frame, cv2.COLOR_BGR2HSV),
                                (0, 0, 100), (180, 50, 255))
        overlay[road_mask == 0] = 0

        return cv2.addWeighted(frame, 0.8, overlay, 0.4, 0)

def process_video(input_path, output_path):
    detector = LaneDetector()
    clip = VideoFileClip(input_path)
    processed_clip = clip.fl_image(detector.process_frame)
    processed_clip.write_videofile(output_path, audio=False, codec='libx264')

# Example usage
process_video("/content/drive/MyDrive/lanes_clip.mp4", "lanes_output_tflite.mp4")
print("✅ TFLite Inference complete!")


  if event.key is 'enter':

  pred = 1 / (1 + np.exp(-pred))  # Apply sigmoid manually



Moviepy - Building video lanes_output_tflite.mp4.
Moviepy - Writing video lanes_output_tflite.mp4





Moviepy - Done !
Moviepy - video ready lanes_output_tflite.mp4
✅ TFLite Inference complete!
