In [2]:
import os
import pickle
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

In [3]:
with open("CUDatamap_line2.pkl","rb") as f:
    data_line_map = pickle.load(f)
with open("CUDatamap_stline2.pkl","rb") as f:
    data_stline_map = pickle.load(f)

In [4]:
y_col = np.linspace(400,580,19)

class LaneDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, data_line_map,data_stline_map,scale=None,reg_scale=10):

        self.new_data_keys = list(data_line_map.keys())
        self.data_line_map = data_line_map
        self.data_stline_map = data_stline_map
        self.scale = scale
        image_loc = self.new_data_keys[0]
        image = cv2.imread(image_loc)
        height,width,channel = image.shape
        self.height = height
        self.width = width
        self.reg_scale = reg_scale




    def __len__(self):
        return len(self.new_data_keys)




    def get_lane_angle(self,myline):
        global y_col #its actually row
        row = y_col[-1]
        col = myline[-1]

        new_col = col - self.width//2
        new_row = row + self.height

        angle = -np.arctan2(new_col,new_row)
        # if angle<0:
        #     angle = 2*np.pi+angle

        return angle

    def get_lane_bins(self,stline):
        angle = self.get_lane_angle(stline)
        angle = np.clip(angle,-1,1)
        mybin = int(round(angle/0.125))+8 #17 bins -1 to 1 as 0 to 17

        return mybin,angle


    def get_regression_confidence_mat(self,stlines,lines):
        regression_mat = np.zeros((19,17),dtype=np.float32) #19 rows and 17 for 17 bins
        confidence_mat = np.zeros((17,),dtype=np.float32)
        for stline,line in zip(stlines,lines):
            mybin,angle = self.get_lane_bins(stline=stline)
            regression_mat[:,mybin] = line
            confidence_mat[mybin] = 1
        return regression_mat.flatten(),confidence_mat

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        image_loc = self.new_data_keys[idx]
        lines = self.data_line_map[image_loc] #Since
        image = cv2.imread(image_loc)
        image_org = image.copy()

        if self.scale is not None:
            image = cv2.resize(image,self.scale)

        stlines = self.data_stline_map[image_loc]


        #visualize_bundle = (image,lines,stlines,idx) #use only for visualization


        image_tensor = torch.tensor(image,dtype=torch.float32)/255 #convert to tensor and normalise
        image_tensor2 = image_tensor.permute(2,0,1) #bring the channel to front

        regression_mat,confidence_mat  = self.get_regression_confidence_mat(stlines,lines)
        regression_mat_tensor = torch.tensor(regression_mat,dtype=torch.float32)
        regression_mat_tensor = torch.clip(regression_mat_tensor,-1500,2500)/self.reg_scale #Clip and also reduce the scale
        confidence_mat_tensor = torch.tensor(confidence_mat,dtype=torch.float32)

        #print(image_loc,sum(regression_mat))


        return image_tensor2,regression_mat_tensor,confidence_mat_tensor,image_org

In [11]:
lane_dataset = LaneDataset(data_line_map=data_line_map,data_stline_map=data_stline_map,scale=(224,224))
dataloader = DataLoader(lane_dataset, batch_size=1,shuffle=False) #One for one image

In [6]:
import torch.nn as nn
import torchvision.models as models
from torch.nn import functional as F
class VGGBlock(nn.Module):
    def __init__(self):
        super(VGGBlock, self).__init__()
        self.block1 = nn.Sequential(*list(vgg16.features.children())[:5])
        self.block1t4 = nn.Sequential(*list(vgg16.features.children())[5:16])  # Block 4 of VGG16

    def forward(self, x):
      x1_b = self.block1(x)  # Output will be of shape (b, 64, 112, 112)
      x2 = self.block1t4(x1_b)  # Output will be of shape (b, 256, 56, 56)
      return x1_b,x2


class UpSampleNeck(nn.Module):
    def __init__(self):
        super(UpSampleNeck, self).__init__()
        # Trainable 3x3 convolution to reduce features from 64 to 32
        self.conv3x3_1 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)  # BatchNorm after first conv

        # Reduce Block 4 features from 256 to 128
        self.reduce_conv_block4 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(128)  # BatchNorm after Block 4 reduction

        # Upsample
        self.upconv = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
        self.bn3 = nn.BatchNorm2d(64)  # BatchNorm after upconv

        # Convolution layers after concatenation
        self.conv3x3_2 = nn.Conv2d(in_channels=96, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(128)  # BatchNorm after concat
        self.avg_pool_2 = nn.AvgPool2d(kernel_size=2, stride=2)


    def forward(self,x1_b,x2):
        # Pass through the 3x3 convolution layer, batch norm, and ReLU
        x1 = self.conv3x3_1(x1_b)  # Output will be of shape (b, 32, 112, 112)
        x1 = self.bn1(x1)
        x1 = F.relu(x1)

        # Block 4 forward pass (without max-pooling)
        x2 = self.reduce_conv_block4(x2)
        x2 = self.bn2(x2)
        x2 = self.upconv(x2)  # Output will be of shape (b, 64, 112, 112)
        x2 = self.bn3(x2)
        x2 = F.relu(x2)

        # Concatenate x1 and x2 along the channel dimension
        x_concat = torch.cat((x1, x2), dim=1)  # Output will be of shape (b, 96, 112, 112)

        # Convolution layer after concatenation
        x3 = self.conv3x3_2(x_concat)
        x3 = self.bn4(x3)
        x3 = self.avg_pool_2(x3)  # Output will be of shape (b, 128, 56, 56)

        x5 = F.relu(x3) #Will be also used for confidence calculation

        return x5,x_concat


class LaneConfidenceHead(nn.Module):
    def __init__(self):
        super(LaneConfidenceHead, self).__init__()

        self.conv3x3_9 = nn.Conv2d(in_channels=128,out_channels=256,kernel_size=3,stride=1)
        self.bn11 = nn.BatchNorm2d(256)
        self.maxpool_5 = nn.MaxPool2d(kernel_size=3,stride=3)

        self.conv3x3_10 = nn.Conv2d(in_channels=256,out_channels=64,kernel_size=1,stride=1)
        self.bn12 = nn.BatchNorm2d(64)
        #self.maxpool_6 = nn.MaxPool2d(kernel_size=3,stride=3)

        self.conv3x3_11 = nn.Conv2d(in_channels=64,out_channels=8,kernel_size=1,stride=1)
        self.bn13 = nn.BatchNorm2d(8)
        #self.maxpool_7 = nn.MaxPool2d(kernel_size=3,stride=3)




        self.Linear_c1 = nn.Linear(in_features=2592,out_features=256)
        self.Linear_c2 = nn.Linear(in_features=256,out_features=17)

        self.flatten = nn.Flatten()


    def forward(self,x5):


      x5 = self.conv3x3_9(x5)
      x5 = self.bn11(x5)
      x5 = self.maxpool_5(x5)
      x5 = F.relu(x5)



      x5 = self.conv3x3_10(x5)
      x5 = self.bn12(x5)
      #x5 = self.maxpool_6(x5)
      x5 = F.relu(x5)



      x5 = self.conv3x3_11(x5)
      x5 = self.bn13(x5)

      #x5 = self.maxpool_7(x5)
      x5 = F.relu(x5)

      x5 = self.flatten(x5)



      x5 = self.Linear_c1(x5)
      x5 = F.sigmoid(x5)
      x5 = self.Linear_c2(x5)


      return x5

# Add positional encoding
class PositionalEncoding(nn.Module):
    def __init__(self,device, d_model, max_len=256):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(torch.log(torch.tensor(10000.0)) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)  # Add batch dimension
        self.encoding = self.encoding.to(device)




    def forward(self, x):

        return x + self.encoding[:, :x.size(1)]


class Regression_Head(nn.Module):
    def __init__(self,device):
        super(Regression_Head, self).__init__()
        self.conv3x3_12 = nn.Conv2d(in_channels=96,out_channels=12,kernel_size=1,stride=1,padding=1)
        # Patch extraction using unfold
        self.patch_size = 7
        self.unfold = nn.Unfold(kernel_size=self.patch_size, stride=self.patch_size)
        self.fold = nn.Fold(output_size=(112, 112), kernel_size=self.patch_size, stride=self.patch_size)
        num_patches = 256  # Number of patches
        embedding_dim = 588  # Each patch dimension
        channels = 12
        self.patch_embedding_layer = nn.Linear(embedding_dim, embedding_dim)

        self.positional_encoding = PositionalEncoding(device,embedding_dim, max_len=num_patches)

        # Define multi-head attention layer
        num_heads = 2
        self.attention_layer = nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_heads, dropout=0.1)


        self.Conv2d_13 = nn.Conv2d(in_channels=12,out_channels=6,kernel_size=2,stride=2)
        self.bn_14 = nn.BatchNorm2d(6)
        self.Conv2d_15 = nn.Conv2d(in_channels=6,out_channels=2,kernel_size=2,stride=2)
        self.bn_16 = nn.BatchNorm2d(2)

        self.flatten = nn.Flatten()
        self.linear_final = nn.Linear(in_features=1568,out_features=512)
        self.linear_final2 = nn.Linear(in_features=512,out_features=19*17)




    def forward(self,x_concat):
        x6 = self.conv3x3_12(x_concat)
        x6 = F.relu(x6)

        patches = self.unfold(x6) # Shape: [batch_size,channels * patch_size * patch_size, num_patches]
        #batch_size = patches.size(0)
        #channels = patches.size(1)
        #print(patches.shape)
        patches = patches.permute(0, 2,1)

        patches = self.patch_embedding_layer(patches)
        #print("After Patch Embedding",patches.shape)
        patches = self.positional_encoding(patches)
        #print("After Patch Embedding",patches.shape)

        # Apply multi-head attention
        attention_output, _ = self.attention_layer(patches, patches, patches)
        #print("After Attention",attention_output.shape)

        # Fold the attention output back to the original image size
        attention_output = attention_output.permute(0, 2, 1)
        transformed_feature_map = self.fold(attention_output)
        #print("After Fold",transformed_feature_map.shape)

        conv_out = self.Conv2d_13(transformed_feature_map)
        conv_out = self.bn_14(conv_out)
        cov_out = F.relu(conv_out)

        conv_out = self.Conv2d_15(cov_out)
        conv_out = self.bn_16(conv_out)
        conv_out = F.relu(conv_out)

        conv_out = self.flatten(conv_out)
        conv_out = self.linear_final(conv_out)
        conv_out = F.relu(conv_out)
        conv_out = self.linear_final2(conv_out)

        return conv_out




class LaneDetection_2(nn.Module):
    def __init__(self,device):
        super(LaneDetection_2, self).__init__()

        self.vggblock = VGGBlock()
        self.upsample_neck = UpSampleNeck()
        self.laneconfidence_head = LaneConfidenceHead()

        self.regression_head = Regression_Head(device)





    def forward(self, x):
        # Block 1 forward pass
        x1_b,x2 = self.vggblock(x)

        x5,x_concat = self.upsample_neck(x1_b,x2)

        #print(x5.shape,x_concat.shape)

        x6 = self.laneconfidence_head(x5)

        x7 = self.regression_head(x_concat)

        return x6,x7


  from .autonotebook import tqdm as notebook_tqdm


In [7]:
model = torch.load('LaneViT_regression.pt', map_location=torch.device('cpu'))

In [8]:
assert False,"Stop here"

AssertionError: Stop here

In [15]:
from operator import itemgetter
count = 10000
sig = nn.Sigmoid()
for input,_,_,image in dataloader:
    print("Got one more")
    confidence_pred,regression_pred = model(input)
    confidence_pred = sig(confidence_pred) #Get the confidence as its a logit 
    colors = {0:(255,0,0),1:(0,255,0),2:(0,0,255),3:(255,255,255)}
    lines = []
    mybins = []
    confidence = list(confidence_pred.detach().numpy()[0,:].round())
    regression = regression_pred.detach().numpy()[0,:].reshape(19,17)*lane_dataset.reg_scale
    regco = []
    for i in range(len(confidence)):
        regco.append([confidence[i],regression[:,i]])
    
    regco.sort(key=itemgetter(0))

    for i in range(len(confidence)):
        if round(regco[i][0])==1:
            lines.append(regco[i][1])
            mybins.append(i)
    
    image = image.detach().numpy()[0].astype(np.uint8)

    print("Model predicted, procedding to draw",mybins)

    for j,l in enumerate(lines[0:4]):
        my_line = l.astype(int)
        bin_no = mybins[j]
        line_length = len(l)
        for i,p in enumerate(my_line):
            cv2.circle(image, (p, int(y_col[i])), i+1, colors[j], -1)
            if(i==(line_length-10)):
                cv2.putText(image,str(bin_no),org=(p,int(y_col[i])),fontFace=cv2.FONT_HERSHEY_COMPLEX,fontScale=3,color=(255,255,0),thickness=3)


    print("Drew",input.shape)
  
    count+=1
    cv2.imwrite("results/"+str(count)+".jpg",image)
#The thing finally works, I am so happy :)))))))))))))))))
#https://huggingface.co/docs/transformers/model_doc/detr we will try to use this

Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got 

KeyboardInterrupt: 

### Loading custom video

In [24]:


class LaneDataset2(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, video_file:str):
                # Load the video
    
        self.cap = cv2.VideoCapture(video_file)
        if not self.cap.isOpened():
            print("Error: Could not open video.")
            exit()





    def __len__(self):
        return int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))


    def __del__(self):
        self.cap.release()



    def __getitem__(self, idx):
        self.cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = self.cap.read()
        if not ret:
            raise FileNotFoundError
        resized_224x224 = cv2.resize(frame, (224, 224))
        resized_big = cv2.resize(frame, (1640,590))

        image_tensor = torch.tensor(resized_224x224,dtype=torch.float32)/255 #convert to tensor and normalise
        image_tensor2 = image_tensor.permute(2,0,1) #bring the channel to front


        return image_tensor2,resized_big

In [25]:
lane_dataset = LaneDataset2("lanehighway.mp4")
dataloader = DataLoader(lane_dataset, batch_size=1,shuffle=False) #One for one image

In [38]:
from operator import itemgetter
count = 10000
sig = nn.Sigmoid()
for input,image in dataloader:
    print("Got one more")
    confidence_pred,regression_pred = model(input)
    confidence_pred = sig(confidence_pred) #Get the confidence as its a logit 
    colors = {0:(255,0,0),1:(0,255,0),2:(0,0,255),3:(255,255,255)}
    lines = []
    mybins = []
    confidence = list(confidence_pred.detach().numpy()[0,:].round())
    regression = regression_pred.detach().numpy()[0,:].reshape(19,17)*10
    regco = []
    for i in range(len(confidence)):
        regco.append([confidence[i],regression[:,i]])
    
    regco.sort(key=itemgetter(0),reverse=True)

    for i in range(len(confidence)):
        #if round(regco[i][0])==1:
        lines.append(regco[i][1])
        mybins.append(i)
    
    image = image.detach().numpy()[0].astype(np.uint8)

    print("Model predicted, procedding to draw",mybins)

    for j,l in enumerate(lines[0:4]):
        my_line = l.astype(int)
        bin_no = mybins[j]
        line_length = len(l)
        for i,p in enumerate(my_line):
            cv2.circle(image, (p, int(y_col[i])), i+1, colors[j], -1)
            if(i==(line_length-10)):
                cv2.putText(image,str(bin_no),org=(p,int(y_col[i])),fontFace=cv2.FONT_HERSHEY_COMPLEX,fontScale=3,color=(255,255,0),thickness=3)


    print("Drew",input.shape)
  
    count+=1
    cv2.imwrite("results/"+str(count)+".jpg",image)
#The thing finally works, I am so happy :)))))))))))))))))
#https://huggingface.co/docs/transformers/model_doc/detr we will try to use this

Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got one more
Model predicted, procedding to draw [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
Drew torch.Size([1, 3, 224, 224])
Got on

In [36]:
regco.sort(key=itemgetter(0),reverse=True)

In [37]:
regco

[[1.0,
  array([ 830.0215 ,  860.8435 ,  893.28094,  925.56323,  957.8702 ,
          990.82336, 1023.1421 , 1054.845  , 1086.2734 , 1118.5061 ,
         1150.8606 , 1183.5614 , 1217.0872 , 1249.9808 , 1284.5808 ,
         1319.8224 , 1354.7776 , 1392.0265 , 1426.2522 ], dtype=float32)],
 [1.0,
  array([696.24133, 686.55975, 678.3297 , 668.6559 , 659.2444 , 647.9546 ,
         637.929  , 626.19617, 614.451  , 601.91644, 591.09973, 579.4304 ,
         566.83856, 555.2695 , 542.89215, 530.8552 , 518.73376, 508.47995,
         496.97742], dtype=float32)],
 [0.0,
  array([1010.458 , 1090.1995, 1076.5541, 1095.7008, 1176.4377, 1334.9443,
         1509.8718, 1592.3712, 1675.6603, 1757.9766, 1840.3389, 1921.0837,
         2001.5713, 2080.555 , 2159.3896, 2239.1887, 2302.2327, 2355.9617,
         2403.0679], dtype=float32)],
 [0.0,
  array([ 814.176 ,  887.2794,  955.8307, 1021.2803, 1089.9142, 1157.8887,
         1227.6104, 1297.8502, 1368.7085, 1442.8003, 1521.5297, 1594.6251,
         1672.