In [1]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import glob
import open3d as o3d

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
image_files = sorted(glob.glob("data/img/*.png"))
point_files = sorted(glob.glob("data/velodyne/*.pcd"))
label_files = sorted(glob.glob("data/label/*.txt"))
calib_files = sorted(glob.glob("data/calib/*.txt"))

In [None]:
index = 0
pcd_file = point_files[index]
image = cv2.cvtColor(cv2.imread(image_files[index]), cv2.COLOR_BGR2RGB)
cloud = o3d.io.read_point_cloud(pcd_file)
points= np.asarray(cloud.points)

In [3]:
f, (ax1) = plt.subplots(1, 1, figsize=(20,10))
ax1.imshow(image)
ax1.set_title('Image', fontsize=30)

In [None]:
import pypotree 
cloudpath = pypotree.generate_cloud_for_display(points)
pypotree.display_cloud_colab(cloudpath)

/usr/local/lib/python3.8/dist-packages/bin/PotreeConverter .tmp.txt -f xyz -o point_clouds -p 7be179 --material ELEVATION --edl-enabled --overwrite
server on port 20582: thread <Thread(Thread-13, started 139811974719232)> 
https://localhost:20582/point_clouds/pointclouds/7be179


In [None]:
class LiDAR2Camera(object):
    def __init__(self, calib_file):
        calibs = self.read_calib_file(calib_file)
        self.P = calibs["P2"]
        self.P = np.reshape(self.P, [3, 4])
        # Rigid transform from Velodyne coord to reference camera coord
        self.valeo2cam = calibs["Tr_velo_to_cam"]
        self.valeo2cam = np.reshape(self.valeo2cam, [3,4])
        # Rotation from reference camera coord to rect camera coord
        self.R0 = calibs["R0_rect"]
        self.R0 = np.reshape(self.R0, [3,3])
    def read_calib_file(self, filepath):
        """ Read in a calibration file and parse into a dictionary.
        Ref: https://github.com/utiasSTARS/pykitti/blob/master/pykitti/utils.py
        """
        data = {}
        with open(filepath, "r") as f:
            for line in f.readlines():
                line = line.rstrip()
                if len(line) == 0:
                    continue
                key, value = line.split(":", 1)
                # The only non-float values in these files are dates, which
                # we don't care about anyway
                try:
                    data[key] = np.array([float(x) for x in value.split()])
                except ValueError:
                    pass
        return data

In [None]:
lidar2cam = LiDAR2Camera(calib_files[index])
print("P :"+str(lidar2cam.P))
print("-")
print("RO "+str(lidar2cam.R0))
print("-")
print("Velo 2 Cam " +str(lidar2cam.valeo2cam))
print("-")

P :[[7.215377e+02 0.000000e+00 6.095593e+02 4.485728e+01]
 [0.000000e+00 7.215377e+02 1.728540e+02 2.163791e-01]
 [0.000000e+00 0.000000e+00 1.000000e+00 2.745884e-03]]
-
RO [[ 0.9999239   0.00983776 -0.00744505]
 [-0.0098698   0.9999421  -0.00427846]
 [ 0.00740253  0.00435161  0.9999631 ]]
-
Velo 2 Cam [[ 7.533745e-03 -9.999714e-01 -6.166020e-04 -4.069766e-03]
 [ 1.480249e-02  7.280733e-04 -9.998902e-01 -7.631618e-02]
 [ 9.998621e-01  7.523790e-03  1.480755e-02 -2.717806e-01]]
-


In [None]:
def project_velo_to_image(self, pts_3d_velo):
    '''
    Input: 3D points in Velodyne Frame [nx3]
    Output: 2D Pixels in Image Frame [nx2]
    '''
    r0_homo = np.vstack((self.R0, [0,0,0]))
    r0_homo_2 = np.column_stack([r0_homo, [0, 0, 0, 1]])
    p_dot_R0 = np.dot(self.P, r0_homo_2)
    p_R0_rt = np.dot(p_dot_R0, np.vstack((self.valeo2cam, [0,0,0,1])))
    pts_3d_homo = np.column_stack([pts_3d_velo, np.ones((pts_3d_velo.shape[0],1))])
    p_r0_rt_x = np.dot(p_R0_rt, np.transpose(pts_3d_homo))
    pts_2d_homo = np.transpose(p_r0_rt_x)
    
    pts_2d_homo[:, 0] /= pts_2d_homo[:, 2]
    pts_2d_homo[:, 1] /= pts_2d_homo[:, 2]
    pts_2d = pts_2d_homo[:, 0:2]
    return pts_2d
  
LiDAR2Camera.project_velo_to_image = project_velo_to_image
# print(points[:5,:3])
# print("Euclidean Pixels "+str(lidar2cam.project_velo_to_image(points[:5,:3])))

In [None]:
def get_lidar_in_image_fov(self,pc_velo, xmin, ymin, xmax, ymax, return_more=False, clip_distance=2.0):
    """ Filter lidar points, keep those in image FOV """
    pts_2d = self.project_velo_to_image(pc_velo)
    fov_inds = (
        (pts_2d[:, 0] < xmax)
        & (pts_2d[:, 0] >= xmin)
        & (pts_2d[:, 1] < ymax)
        & (pts_2d[:, 1] >= ymin)
    )
    fov_inds = fov_inds & (pc_velo[:, 0] > clip_distance) #Remove points that are closer than the clip distance
    imgfov_pc_velo = pc_velo[fov_inds, :]
    if return_more:
        return imgfov_pc_velo, pts_2d, fov_inds
    else:
        return imgfov_pc_velo
    
LiDAR2Camera.get_lidar_in_image_fov = get_lidar_in_image_fov

In [None]:
def show_lidar_on_image(self, pc_velo, img, debug="False"):
    """ Project LiDAR points to image """
    imgfov_pc_velo, pts_2d, fov_inds = self.get_lidar_in_image_fov(
        pc_velo, 0, 0, img.shape[1], img.shape[0], True
    )

    self.imgfov_pts_2d = pts_2d[fov_inds, :]
    # print(self.imgfov_pts_2d)

    cmap = plt.cm.get_cmap("hsv", 256)
    cmap = np.array([cmap(i) for i in range(256)])[:, :3] * 255
    
    self.imgfov_pc_velo = imgfov_pc_velo
    
    for i in range(self.imgfov_pts_2d.shape[0]):
        depth = imgfov_pc_velo[i,0]
        # if depth is 2, then 510 / 2 will be  255, white color
        # 510 has been calculated according to the clip distance, usually clip distance is 2
        # therefore we get depth color in the range (0, 255)
        color = cmap[int( 510.0 / depth), :]
        cv2.circle(
            img,(int(np.round(self.imgfov_pts_2d[i, 0])), int(np.round(self.imgfov_pts_2d[i, 1]))),1,
            color=tuple(color),
            thickness=-1,
        )
    return img

LiDAR2Camera.show_lidar_on_image = show_lidar_on_image

In [4]:
#img_3 = lidar2cam.show_lidar_on_image(points[:,:3], image)
img_3 = image.copy()
img_3 = lidar2cam.show_lidar_on_image(points[:,:3], img_3)
plt.figure(figsize=(14,7))
plt.imshow(img_3)
plt.show()

In [None]:
video_images = sorted(glob.glob("data/video 4/images/*.png"))
video_points = sorted(glob.glob("data/video 4/points/*.pcd"))

# Build a LiDAR2Cam object
lidar2cam_video = LiDAR2Camera(calib_files[0])

result_video = []

for idx, img in enumerate(video_images):
    image = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = np.asarray(o3d.io.read_point_cloud(video_points[idx]).points)
    result_video.append(lidar2cam_video.show_lidar_on_image(point_cloud, image))

out = cv2.VideoWriter('output/out_video4.avi',cv2.VideoWriter_fourcc(*'DIVX'), 15, (image.shape[1],image.shape[0]))
 
for i in range(len(result_video)):
    out.write(cv2.cvtColor(result_video[i], cv2.COLOR_BGR2RGB))
    #out.write(result_video[i])
out.release()

In [None]:
class Detector():
    def __init__(self, conf_threshold=0.4, classes_to_detect=None, nms_threshold=0.4, input_size=(416, 416), scale=1.0/255):
        self.conf_threshold = conf_threshold
        self.classes_to_detect = classes_to_detect
        self.nms_threshold = nms_threshold
        self.input_size = input_size
        self.scale = scale
        self.model = None
        self.ln = None
        self.names = None
        cmap = plt.cm.get_cmap("hsv", 256)
        self.cmap = np.array([cmap(i) for i in range(256)])[:, :3] * 255

    def get_layers(self):
        ln = self.model.getLayerNames()
        return [ln[i - 1] for i in self.model.getUnconnectedOutLayers()]

    def read_names(self, names_path):
        f = open(names_path, "r")
        return [n.split('\n')[0] for n in f.readlines()]

    def load_weights(self, weights_path, config_path):
        return cv2.dnn.readNet(weights_path, config_path)

    def load_model(self, weights_path, config_path, names_path):
        self.model = self.load_weights(weights_path, config_path)
        self.ln = self.get_layers()
        self.names = self.read_names(names_path)
        a = 0

    def detect(self, image, draw_bboxes=False, display_labels=False):
        img = image.copy()
        (H, W) = img.shape[:2]
        blob = cv2.dnn.blobFromImage(img, self.scale, self.input_size, swapRB=True, crop=False)
        self.model.setInput(blob)
        layerOutputs = self.model.forward(self.ln)

        boxes = []
        confidences = []
        classIDs = []
        for output in layerOutputs:
            # loop over each of the detections
            for detection in output:
                # extract the class ID and confidence (i.e., probability) of
                # the current object detection
                scores = detection[5:]
                classID = np.argmax(scores)
                confidence = scores[classID]
                # filter out weak predictions by ensuring the detected
                # probability is greater than the minimum probability
                if confidence > self.conf_threshold:
                    # scale the bounding box coordinates back relative to the
                    # size of the image, keeping in mind that YOLO actually
                    # returns the center (x, y)-coordinates of the bounding
                    # box followed by the boxes' width and height
                    box = detection[0:4] * np.array([W, H, W, H])
                    (centerX, centerY, width, height) = box.astype("int")
                    # use the center (x, y)-coordinates to derive the top and
                    # and left corner of the bounding box
                    x = int(centerX - (width / 2))
                    y = int(centerY - (height / 2))
                    # update our list of bounding box coordinates, confidences,
                    # and class IDs
                    if self.classes_to_detect is None or self.names[classID] in self.classes_to_detect:
                        boxes.append([x, y, int(width), int(height)])
                        confidences.append(float(confidence))
                        classIDs.append(classID)

        idxs = cv2.dnn.NMSBoxes(boxes, confidences, self.conf_threshold, self.nms_threshold)

        detections = []
        # ensure at least one detection exists
        if len(idxs) > 0:
            # loop over the indexes we are keeping
            for i in idxs.flatten():
                class_id = classIDs[i]
                conf = confidences[i]
                # extract the bounding box coordinates
                (x, y) = (boxes[i][0], boxes[i][1])
                (w, h) = (boxes[i][2], boxes[i][3])
                bbox = [x, y, w, h]
                # draw a bounding box rectangle and label on the image
                if draw_bboxes:
                    color = self.cmap[int(255.0 / (class_id + 1)), :]
                    cv2.rectangle(img, (x, y), (x + w, y + h), color=tuple(color), thickness=2)
                    if display_labels:
                        cv2.putText(img, str(self.names[class_id]), (int(x), int(y)), cv2.FONT_HERSHEY_PLAIN, 1,
                                    (0, 0, 255), 2, 16)
                        cv2.putText(img, str(self.names[class_id]), (int(x), int(y)), cv2.FONT_HERSHEY_PLAIN, 1,
                                    (255, 255, 255), 1, 16)

                detections.append([class_id, bbox, conf])

        return np.array(detections), img

In [None]:

weights = "/content/drive/MyDrive/visual_fusion/data/yolo/yolov4.weights"
config = "/content/drive/MyDrive/visual_fusion/data/yolo/yolov4.cfg"
names = "/content/drive/MyDrive/visual_fusion/data/yolo/coco.names"

detector =Detector(0.4)
detector.load_model(weights, config, names)
detections, image = detector.detect(image, True, True)

In [None]:
def rectContains(rect, pt, shrink_factor=0.0):
    x_min = rect[0]
    y_min = rect[1]
    width = rect[2]
    height = rect[3]

    center_x = x_min + width * 0.5
    center_y = y_min + height * 0.5

    new_width = width * (1 - shrink_factor)
    new_height = height * (1 - shrink_factor)

    x1 = int(center_x - new_width * 0.5)
    y1 = int(center_y - new_height * 0.5)
    x2 = int(center_x + new_width * 0.5)
    y2 = int(center_y + new_height * 0.5)

    return x1 < pt[0] < x2 and y1 < pt[1] < y2

In [None]:
import statistics
import random

def filter_outliers(distances):
    inliers = []
    mu = statistics.mean(distances)
    std = statistics.stdev(distances)
    for x in distances:
      if abs(x-mu) < std:
            # This is an INLIER
            inliers.append(x)
    return inliers


In [None]:
def get_best_distance(distances, technique="closest"):
    if technique == "closest":
        return min(distances)
    elif technique =="average":
        return statistics.mean(distances)
    elif technique == "random":
        return random.choice(distances)
    else:
        return statistics.median(sorted(distances))

In [None]:
def lidar_camera_fusion(self, detections, lid_image):
    img_bis = lid_image.copy()
    pred_bboxes = detections[:, 1]
    # print(pred_bboxes)
    cmap = plt.cm.get_cmap("hsv", 256)
    cmap = np.array([cmap(i) for i in range(256)])[:, :3] * 255
    distances = []
    for box in pred_bboxes:
        distances = []
        # print(box)
        for i in range(self.imgfov_pts_2d.shape[0]):
            #depth = self.imgfov_pc_rect[i, 2]
            depth = self.imgfov_pc_velo[i,0]
            if (rectContains(box, self.imgfov_pts_2d[i], shrink_factor=0.1)==True):
                distances.append(depth)
                # print(distances)
                color = cmap[int(510.0 / depth), :]
                cv2.circle(img_bis,(int(np.round(self.imgfov_pts_2d[i, 0])), int(np.round(self.imgfov_pts_2d[i, 1]))),2,color=tuple(color),thickness=-1,)
        h, w, _ = img_bis.shape
        if (len(distances)>2):
            distances = filter_outliers(distances)
            best_distance = get_best_distance(distances, technique="average")
            cv2.putText(img_bis, '{0:.2f} m'.format(best_distance), (int(box[0]*w),int(box[1]*h)), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 0, 0), 3, cv2.LINE_AA)    
        distances_to_keep = []
    
    return img_bis, distances
  
      

In [None]:
def pipeline (self, image, point_cloud):
    "For a pair of 2 Calibrated Images"
    # img = image.copy()
    # Run obstacle detection in 2D
    detections, result = detector.detect(image, True, True)
    # Show LidAR on Image
    lidar_img = self.show_lidar_on_image(point_cloud[:,:3], result)
    # Fuse Point Clouds & Bounding Boxes
    img_final, _ = self.lidar_camera_fusion(detections, lidar_img)
    fig_keeping = plt.figure(figsize=(14, 7))
    ax_keeping = fig_keeping.subplots()
    ax_keeping.imshow(final_result)
    plt.show()
    return img_final

LiDAR2Camera.pipeline = pipeline

In [5]:
image_files = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/img/*.png"))
point_files = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/velodyne/*.pcd"))
label_files = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/label/*.txt"))
calib_files = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/calib/*.txt"))
index = 0
pcd_file = point_files[index]
image = cv2.cvtColor(cv2.imread(image_files[index]), cv2.COLOR_BGR2RGB)
cloud = o3d.io.read_point_cloud(pcd_file)
points= np.asarray(cloud.points)

lidar2cam = LiDAR2Camera(calib_files[index])
cloud = o3d.io.read_point_cloud(pcd_file)
points= np.asarray(cloud.points)


image = cv2.cvtColor(cv2.imread(image_files[index]), cv2.COLOR_BGR2RGB)

plt.figure(figsize=(14,7))
final_result = lidar2cam.pipeline(image, points)
plt.imshow(final_result)
plt.show()

In [None]:
video_images = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/video 2/images/*.png"))
video_points = sorted(glob.glob("/content/drive/MyDrive/visual_fusion/data/video 2/points/*.pcd"))

# Build a LiDAR2Cam object
lidar2cam_video = LiDAR2Camera(calib_files[0])

result_video = []

for idx, img in enumerate(video_images):
    image = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = np.asarray(o3d.io.read_point_cloud(video_points[idx]).points)
    result_video.append(lidar2cam_video.pipeline(image, point_cloud))

#out = cv2.VideoWriter('output/out.avi',cv2.VideoWriter_fourcc(*'DIVX'), 15, (image.shape[1],image.shape[0]))
out = cv2.VideoWriter('/content/drive/MyDrive/visual_fusion/output/early_fusion_2.mp4',cv2.VideoWriter_fourcc(*'MP4V'), 15, (image.shape[1],image.shape[0]))

for i in range(len(result_video)):
    out.write(cv2.cvtColor(result_video[i], cv2.COLOR_BGR2RGB))
    #out.write(result_video[i])
out.release()

  return np.array(detections), img


[[0 list([1105, 175, 96, 143]) 0.988852858543396]
 [0 list([769, 166, 104, 196]) 0.9884436130523682]
 [2 list([299, 168, 151, 118]) 0.9043078422546387]
 [1 list([888, 200, 68, 36]) 0.782404899597168]
 [9 list([20, 92, 57, 39]) 0.7820451259613037]
 [11 list([120, 219, 35, 67]) 0.5969938039779663]
 [0 list([223, 192, 17, 26]) 0.5824441909790039]
 [9 list([934, 118, 20, 26]) 0.5664930939674377]
 [26 list([1163, 238, 25, 31]) 0.5137186050415039]
 [9 list([124, 20, 44, 109]) 0.4420454204082489]
 [0 list([49, 196, 24, 29]) 0.4159887731075287]
 [1 list([744, 191, 36, 41]) 0.4146764576435089]]
[[0 list([778, 163, 106, 199]) 0.9852901101112366]
 [0 list([1139, 168, 76, 144]) 0.9758712649345398]
 [1 list([903, 198, 79, 38]) 0.8154870271682739]
 [9 list([4, 84, 65, 41]) 0.7979365587234497]
 [1 list([757, 190, 35, 41]) 0.7894946932792664]
 [2 list([296, 166, 156, 116]) 0.737259566783905]
 [1 list([785, 270, 110, 101]) 0.516626238822937]
 [26 list([1184, 236, 34, 28]) 0.42031389474868774]
 [0 list(