In [26]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import cv2

In [27]:
points = np.load("../sp_data/Full_data/points.npy")
width  = 64
height = 256

In [60]:
def apply_perspective_transform(frame, points,width,height):
        
        assert len(points)==4,"no of points not equal to 4"

        # Define the destination points for the perspective transform
        
        
        dst_points = np.float32([[0, 0], [width, 0], [width, height], [0, height]])

        # Convert points to numpy array format
        src_points = np.float32(points)

        # Compute the perspective transform matrix
        Mat = cv2.getPerspectiveTransform(src_points, dst_points)

        # Apply the perspective transform to the frame
        warped_image = cv2.warpPerspective(frame, Mat, (width, height))

        warped_image  =cv2.cvtColor(warped_image, cv2.COLOR_BGR2GRAY)

        return warped_image

def process_input(old_frame,new_frame):
        old = apply_perspective_transform(old_frame,points,width,height)
        new = apply_perspective_transform(new_frame,points,width,height)
        np_input = np.array([[old,new]]) #To create a tensor 1 x 2 x H x W
        input_tensor =torch.tensor(np_input,dtype=torch.float32)/255 
        return input_tensor,new

def load_label_data():
        labels = []
        with open("../sp_data/train.txt", 'r') as file:
             for line in file:
                line.replace("\n","")
                labels.append(float(line))

        return labels

In [29]:
import torch
import torch.nn as nn

class SpeedNet2(nn.Module):
    def __init__(self, in_channels, f1,f2,f3,f4,f5):
        super(SpeedNet2, self).__init__()
        
        self.batchnorm1 = nn.BatchNorm3d(f1)
        self.batchnorm2 = nn.BatchNorm3d(f2)
        self.batchnorm3 = nn.BatchNorm3d(f3)
        self.batchnorm4 = nn.BatchNorm2d(f4)
        self.batchnorm5 = nn.BatchNorm2d(f5)
        # First 3D convolution layer
        self.conv3d_1 = nn.Conv3d(in_channels, f1, kernel_size=(1, 5, 5))
        self.pool3d_1 = nn.MaxPool3d(kernel_size=(1, 1, 3), stride=(1, 1, 2))
        
        # Second 3D convolution layer
        self.conv3d_2 = nn.Conv3d(f1, f2, kernel_size=(1, 5, 5))
        self.pool3d_2 = nn.MaxPool3d(kernel_size=(1, 1, 2), stride=(1, 1, 2))
        
        # Third 3D convolution layer
        self.conv3d_3 = nn.Conv3d(f2, f3, kernel_size=(2, 5, 5))
        
        # 2D convolution layer
        self.conv2d_1 = nn.Conv2d(f3, f4, kernel_size=(10, 7))
        self.pool2d_1 = nn.MaxPool2d(kernel_size=(9, 1), stride=(9, 2))

        self.conv2d_2 = nn.Conv2d(f4, f5, kernel_size=(5, 1))
        self.pool2d_2 = nn.MaxPool2d(kernel_size=(5, 1), stride=(5, 1))

        



        self.fc1 = nn.Linear(1024, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32,1)
        
    def deb_forward(self, x):
        # Reshape input tensor to add the single channel dimension
        print("Before unsqueeze:", x.shape)
        x = x.unsqueeze(1)
        print("After unsqueeze:", x.shape)
        
        # First 3D convolution and pooling
        x = self.conv3d_1(x)
        print("After conv3d_1:", x.shape)
        x = self.pool3d_1(x)
        print("After pool3d_1:", x.shape)
        
        
        # Second 3D convolution and pooling
        x = self.conv3d_2(x)
        print("After conv3d_2:", x.shape)
        x = self.pool3d_2(x)
        print("After pool3d_2:", x.shape)
        
        # Third 3D convolution
        x = self.conv3d_3(x)
        print("After conv3d_3:", x.shape)
        
        # Squeeze the third dimension
        x = x.squeeze(2)
        print("After squeeze:", x.shape)
        
        # First 2D convolution and pooling
        x = self.conv2d_1(x)
        print("After conv2d_1:", x.shape)
        x = self.pool2d_1(x)
        print("After pool2d_1:", x.shape)

        # Second 2D convolution and pooling
        x = self.conv2d_2(x)
        print("After conv2d_2:", x.shape)

        x = self.pool2d_2(x)
        print("After pool2d_2:", x.shape)

        
        # Flatten the data
        x = x.view(x.size(0), -1)
        print("After flatten:", x.shape)

        # x = F.relu(self.fc1(x))
        # x = F.relu(self.fc2(x))fter conv2d_1: torch.Size([2, 128, 235, 2
        # x = self.fc3(x)
        
        
        return x
    
    def forward(self, x):
        # Reshape input tensor to add the single channel dimension
        #print("Before unsqueeze:", x.shape)
        x = x.unsqueeze(1)
        #print("After unsqueeze:", x.shape)
        
        # First 3D convolution and pooling
        x = self.conv3d_1(x)
        #print("After conv3d_1:", x.shape)
        x = self.pool3d_1(x)
        #print("After pool3d_1:", x.shape)
        x = self.batchnorm1(x)
        
        # Second 3D convolution and pooling
        x = self.conv3d_2(x)
        #print("After conv3d_2:", x.shape)
        x = self.pool3d_2(x)
        #print("After pool3d_2:", x.shape)
        x = self.batchnorm2(x)
        # Third 3D convolution
        x = self.conv3d_3(x)
        #print("After conv3d_3:", x.shape)
        x = self.batchnorm3(x)
        # Squeeze the third dimension
        x = x.squeeze(2)
        #print("After squeeze:", x.shape)
        
        # First 2D convolution and pooling
        x = self.conv2d_1(x)
        #print("After conv2d_1:", x.shape)
        x = self.pool2d_1(x)
        x = self.batchnorm4(x)
        #print("After pool2d_1:", x.shape)

        # Second 2D convolution and pooling
        x = self.conv2d_2(x)
        x = self.pool2d_2(x)
        x = self.batchnorm5(x)
        #print("After conv2d_2:", x.shape)
        
        
        # Flatten the data
        x = x.view(x.size(0), -1)
        #print("After flatten:", x.shape)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        
        
        return x




model = SpeedNet2(1, 8,16,32,128,256)  # Increased features to 128 
model.load_state_dict(torch.load("Models/SpeedNet_128.pth"))

<All keys matched successfully>

In [30]:
speed_data = np.array(load_label_data())

In [79]:
#cap.release()
cap = cv2.VideoCapture("../sp_data/train.mp4")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("Number of frame is ",frame_count)
start_frame = 5000
end_frame = 6500
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) 
cv2.namedWindow("Result Calculated Speed")
cv2.namedWindow("What Neural Net Sees")
ret,old_frame = cap.read()
count = start_frame
old_speed = 0
while True:
    ret,frame = cap.read()
    input_tensor,pers_transformed_image = process_input(old_frame,frame)
    #print(input_tensor.shape)
    speed_tensor = model.forward(input_tensor)
    speed = (speed_tensor.item()+1)*max(speed_data)/2
    speed = old_speed*0.95 + 0.05*speed
    old_speed = speed*1
    text = "Calculated: "+ str(round(speed,1))+", Actual: "+str(round(speed_data[count-1:count].mean(),1))+",Frame "+str(count)
    (text_width, text_height), baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_COMPLEX, 0.7,1)
    
    margin = 10
    box_width = text_width + 2 * margin
    box_height = text_height + 2 * margin
    box_top_left = (10, 40-box_height+margin)
    box_bottom_right = (box_top_left[0] + box_width, box_top_left[1] + box_height)
    cv2.rectangle(frame, box_top_left, box_bottom_right, (0, 0, 0), -1)
    cv2.putText(frame,text, (20,40), cv2.FONT_HERSHEY_COMPLEX,0.7, (0,255,0),1)
    cv2.imshow("Result Calculated Speed",frame)
    cv2.imshow("What Neural Net Sees",pers_transformed_image)
    
    #print("Calculated :",(speed_tensor.item()+1)*max(speed_data)/2,"Actual : ",speed_data[count])
    old_frame = frame.copy()
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q') or count>end_frame:
            break
    count+=1
cv2.destroyAllWindows()

Number of frame is  20400
