In [1]:
import pandas as pd
from os.path import join
import glob
from scipy.spatial.transform import Rotation
import numpy as np
import laspy
import os
from scipy.spatial.transform import Rotation
from PIL import Image
import matplotlib.pyplot as plt
import math
import json

In [2]:
DEBUG_MODE = False
VALID_HEIGHT_ONLY = False
CLASSIFY_CATEGORY = False

POINT_ATTR = {
    'X': 0,
    'Y': 1,
    'Z': 2,
    'INTENSITY': 3,
    'GPS': 4,
    'INDEX': 5,
    'R':6,
    'G':7,
    'B':8,
    'CATEGORY':9
}

SENSOR_TYPE = {
    'CAMERA': 0,
    'LIDAR': 1
}

SENSOR = {
    'FC': {'id': 0, 'name':'ring_front_center', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[1125, 1825]},
    'FL': {'id': 1, 'name':'ring_front_left', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[825, 1550]},
    'FR': {'id': 2, 'name':'ring_front_right', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[825, 1550]},
    'RL': {'id': 3, 'name':'ring_rear_left', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[885, 1285]},
    'RR': {'id': 4, 'name':'ring_rear_right', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[885, 1285]},
    'SL': {'id': 5, 'name':'ring_side_left', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[825, 1550]},
    'SR': {'id': 6, 'name':'ring_side_right', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[825, 1550]},
    'SFL': {'id': 7, 'name':'stereo_front_left', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[1125, 1825]},
    'SFR': {'id': 8, 'name':'stereo_front_right', 'type':SENSOR_TYPE['CAMERA'], 'valid_height':[1125, 1825]},
    'UL': {'id': 9, 'name':'up_lidar', 'type':SENSOR_TYPE['LIDAR']},
    'DL': {'id': 10, 'name':'down_lidar', 'type':SENSOR_TYPE['LIDAR']},
}

CATEGORY = {
    'NONE': 0,
    'REGULAR_VEHICLE': 1,
    'PEDESTRIAN': 2,
    'BOLLARD': 3,
    'CONSTRUCTION_CONE': 3,
    'CONSTRUCTION_BARREL': 3,
    'STOP_SIGN': 4,
    'BICYCLE': 1,
    'LARGE_VEHICLE': 1,
    'WHEELED_DEVICE': 1,
    'BUS': 1,
    'BOX_TRUCK': 1,
    'SIGN': 5,
    'TRUCK': 1,
    'MOTORCYCLE': 1,
    'BICYCLIST': 2,
    'VEHICULAR_TRAILER': 1,
    'TRUCK_CAB': 1,
    'MOTORCYCLIST': 2,
    'DOG': 2,
    'SCHOOL_BUS': 1,
    'WHEELED_RIDER': 2,
    'STROLLER': 1,
    'ARTICULATED_BUS': 1,
    'MESSAGE_BOARD_TRAILER': 1,
    'MOBILE_PEDESTRIAN_SIGN': 5,
    'WHEELCHAIR': 1,
    'RAILED_VEHICLE': 1,
    'OFFICIAL_SIGNALER': 2,
    'TRAFFIC_LIGHT_TRAILER': 1,
    'ANIMAL': 2
}

MIN_DISTANCE = 1.0
MAX_DISTANCE = 50.0

In [3]:

def get_timestamp_from_path(path:str)->int:
    filename = os.path.basename(path)
    return int(filename.split('.')[0])

def get_rotation_matrix(w:float, x:float, y:float, z:float)->np.ndarray:
    return Rotation([x, y, z, w]).as_matrix()

def get_transform_matrix(w:float, x:float, y:float, z:float, tx:float, ty:float, tz:float)->np.ndarray:
    mat = np.eye(4)
    mat[:3,:3] = get_rotation_matrix(w, x, y, z)
    mat[:,3] = [tx, ty, tz, 1.0]
    
    return mat
    
def create_points(num_points)->np.ndarray:
    return np.zeros((num_points, len(POINT_ATTR)))

def transform_points(points:np.ndarray, calibration:np.ndarray):
    size = points.shape[0]
    coords = np.concatenate((points[:,:3].T, np.ones((1, size))))
    coords = np.matmul(calibration, coords)
    
    return coords[:3,:].T

def image_to_numpy(path:str):
    image = Image.open(path)
    np_image = np.array(image)
    
    return np_image.transpose(2, 0, 1)

# transform points from world to camera pixel
def point_to_pixel(points:np.ndarray, to_sensor_calib:np.ndarray, intrinsic:np.ndarray):
    coords = transform_points(points, to_sensor_calib)
    size = coords.shape[0]
    depth = coords[:,2]
    
    pixel = np.matmul(intrinsic, coords.T)
    pixel = pixel / pixel[2, :]

    return pixel.T, depth

def get_rgb_bilinear(img:np.ndarray, pixel:np.ndarray):
    x = pixel[:,0]
    y= pixel[:,1]
    
    x0 = np.floor(x).astype(np.int32)
    y0 = np.floor(y).astype(np.int32)
    x1 = np.ceil(x).astype(np.int32)
    y1 = np.ceil(y).astype(np.int32)

    lt = img[:, y0, x0]
    lb = img[:, y1, x0]
    rt = img[:, y0, x1]
    rb = img[:, y1, x1]
    
    wa = np.expand_dims(((x1.astype(np.float16) - x) * (y1.astype(np.float16) - y)), 0)
    wb = np.expand_dims(((x1.astype(np.float16) - x) * (y - y0.astype(np.float16))), 0)
    wc = np.expand_dims(((x - x0.astype(np.float16)) * (y1.astype(np.float16) - y)), 0)
    wd = np.expand_dims(((x - x0.astype(np.float16)) * (y - y0.astype(np.float16))), 0)
    
    return (lt * wa + lb * wb + rt * wc + rb * wd).transpose(1, 0)

def get_color_from_image(image:str, pixel:np.ndarray, depth:np.ndarray, min_height=1, max_height=0):
    np_image = image_to_numpy(image)
    _, h, w = np_image.shape
    size = pixel.shape[0]
    
    if min_height > max_height:
        max_height = h
    
    x = pixel[:, 0]
    y = pixel[:, 1]
    mask = np.ones(size, dtype=bool)
    mask = np.logical_and(mask, depth > 0)
    mask = np.logical_and(mask, x > 1)
    mask = np.logical_and(mask, x < w - 1)
    mask = np.logical_and(mask, y > min_height)
    mask = np.logical_and(mask, y < max_height - 1)
    rgb = get_rgb_bilinear(np_image, pixel[mask,:2])
    
    return rgb, mask

def draw_points_on_image(image:str, pixel:np.ndarray):
    np_image = image_to_numpy(image).transpose(1, 2, 0)
    x = pixel[:,0]
    y = pixel[:,1]
    
    plt.imshow(np_image)
    plt.scatter(x, y, c='blue', marker='x', s=1)
    plt.show()
    
def write_las(path:str, points:np.ndarray):
    header = laspy.LasHeader(point_format=3, version="1.4")
    header.offsets = np.min(points[:,:3], axis=0)
    header.scales = np.array([0.01, 0.01, 0.01])
    # header.vlrs
    
    points_type = points.shape[1]
    
    las = laspy.LasData(header)
    las.x = points[:,0]
    las.y = points[:,1]
    las.z = points[:,2]
    las.intensity = points[:,POINT_ATTR['INTENSITY']]
    if points_type >= POINT_ATTR['GPS']:
        las.gps_time = points[:,POINT_ATTR['GPS']].astype(np.uint64)
    
    if points_type >= POINT_ATTR['B']:
        las.red = points[:,POINT_ATTR['R']].astype(np.uint8)
        las.green = points[:,POINT_ATTR['G']].astype(np.uint8)
        las.blue = points[:,POINT_ATTR['B']].astype(np.uint8)
    if points_type >= POINT_ATTR['CATEGORY']:
        las.classification = points[:,POINT_ATTR['CATEGORY']].astype(np.uint8)
    las.write(path)

In [4]:

class EgoPoseParser():
    def __init__(self, path:str):
        feather = pd.read_feather(path)
        
        qw = feather['qw']
        qx = feather['qx']
        qy = feather['qy']
        qz = feather['qz']
        tx = feather['tx_m']
        ty = feather['ty_m']
        tz = feather['tz_m']
        
        self.timestamps = feather['timestamp_ns']
        self.ego_calibrations = np.stack(
            [get_transform_matrix(qw[i],qx[i],qy[i],qz[i],tx[i],ty[i],tz[i]) for i in range(len(self.timestamps))], axis=0
        )
    
    def get_ego_calibration(self, timestamp: int)->np.ndarray:
        return self.ego_calibrations[self.timestamps == timestamp].squeeze()
                
    
class CalibrationParser():
    def __init__(self, path:str):
        feather = pd.read_feather(path)
        
        qw = feather['qw']
        qx = feather['qx']
        qy = feather['qy']
        qz = feather['qz']
        tx = feather['tx_m']
        ty = feather['ty_m']
        tz = feather['tz_m']
        
        self.sensor_calibrations = np.stack(
            [get_transform_matrix(qw[i],qx[i],qy[i],qz[i],tx[i],ty[i],tz[i]) for i in range(len(qw))], axis=0
        )
    
    def get(self, sensor_id:int)->np.ndarray:
        return self.sensor_calibrations[sensor_id].squeeze()
        

class IntrinsicParser():
    def __init__(self, path:str) -> None:
        feather = pd.read_feather(path)
        
        fx_px = feather['fx_px']
        fy_px = feather['fy_px']
        cx_px = feather['cx_px']
        cy_px = feather['cy_px']
        
        self.intrinsics = np.stack([np.eye(3) for _ in range(len(fx_px))], axis=0)
        self.intrinsics[:, 0, 0] = fx_px
        self.intrinsics[:, 1, 1] = fy_px
        self.intrinsics[:, 0, 2] = cx_px
        self.intrinsics[:, 1, 2] = cy_px
    
    def get(self, sensor_id:int)->np.ndarray:
        return self.intrinsics[sensor_id]


class LogMapArchive():
    def __init__(self, path) -> None:
        with open(path) as f:
            data = json.load(f)
        
        self.lane_segments = []
        self.pedestrian_crossings = []
        
        # drivable_area
        self.drivable_areas = {}
        for id in data['drivable_areas']:
            coords = []
            for boundary in data['drivable_areas'][id]['area_boundary']:
                z = 0 if math.isnan(boundary['z']) else boundary['z']
                coords.append(np.array([boundary['x'], boundary['y'], z]))
            coords = np.stack(coords, axis=0)
            self.drivable_areas[id] = coords
            
    def get_drivable_areas(self)->str:
        keys = []
        for key, _ in self.drivable_areas.items():
            keys.append(key)
        
        return keys
                
    def get_drivable_points(self, id:str)->np.ndarray:
        coords = self.drivable_areas[id]
        points = create_points(coords.shape[0])
        points[:,:3] = coords
        points[:,POINT_ATTR['INTENSITY']] = 255
        
        return points


class Annotation():
    def __init__(self, category:int, length:float, width:float, height:float, transform_mat:np.ndarray, inner_points:int) -> None:
        self.category = category
        self.length = length/2 + 5e-2
        self.width = width/2 + 5e-2
        self.height = height/2 + 5e-2
        self.transform_mat = np.linalg.inv(transform_mat)
        self.inner_points = inner_points
    
    def classify_points(self, points:np.ndarray) -> None:
        size = points.shape[0]
        coords = points[:,:3]
        coords = transform_points(coords, self.transform_mat)
        
        xs = coords[:,0]
        ys = coords[:,1]
        zs = coords[:,2]
        
        mask = xs >= -self.length
        mask = np.logical_and(mask, xs<=self.length)
        mask = np.logical_and(mask, ys>=-self.width)
        mask = np.logical_and(mask, ys<=self.width)
        mask = np.logical_and(mask, zs>=-self.height)
        mask = np.logical_and(mask, zs<=self.height)
        
        categories = np.zeros((size))
        categories[mask] = self.category
        
        diff = self.inner_points - categories[mask].shape[0]
        # if diff > 1:
        #     print(self.category, diff)
        
        return categories
        

class AnnotationParser():
    def __init__(self, path:str) -> None:
        feather = pd.read_feather(path)
        
        timestamps = feather['timestamp_ns']
        categories = feather['category']
        lengths = feather['length_m']
        widths = feather['width_m']
        heights = feather['height_m']
        qw = feather['qw']
        qx = feather['qx']
        qy = feather['qy']
        qz = feather['qz']
        tx = feather['tx_m']
        ty = feather['ty_m']
        tz = feather['tz_m']
        inner_points = feather['num_interior_pts']
        
        num_data = len(timestamps)
        transform_matrix = [get_transform_matrix(qw[i],qx[i],qy[i],qz[i],tx[i],ty[i],tz[i]) for i in range(num_data)]
        
        self.annotations = {}
        for i in range(num_data):
            timestamp = timestamps[i]
            if not timestamp in self.annotations:
                self.annotations[timestamp] = []
            self.annotations[timestamp].append(Annotation(CATEGORY[categories[i]], lengths[i], widths[i], heights[i], transform_matrix[i], inner_points[i]))
            
    def get_annotations(self, timestamp:int)->list[Annotation]:
        return self.annotations[timestamp]
        


class Lidar():
    def __init__(self, timestamp:int, path:str, ego_calibration:np.ndarray, ul_calibration:np.ndarray, dl_calibration:np.ndarray, annotations:list[Annotation]):
        self.timestamp = timestamp
        self.path = path
        self.ego_calibration = ego_calibration
        self.ul_calibration = ul_calibration
        self.dl_calibration = dl_calibration
        self.annotations = annotations
        self.is_init = False
        
    def init(self)->None:

        feather = pd.read_feather(self.path)
        xs = feather['x']
        ys = feather['y']
        zs = feather['z']

        coordinates = np.stack([xs, ys, zs], axis=1)
        laser_number = feather['laser_number']
        intensities = feather['intensity']
        gps_times = feather['offset_ns']
        
        # # distance filter
        # distance = np.sum(np.square(coordinates), axis=1)
        # distance_filter = distance <= MAX_DISTANCE**2
        # coordinates = coordinates[distance_filter]
        # laser_number = laser_number[distance_filter]
        # intensities = intensities[distance_filter]
        # gps_times = gps_times[distance_filter]
        
        num_points = coordinates.shape[0]
        categories = np.zeros((num_points))
        point_index = np.arange(num_points)
        
        
        # transform coordinates from lidar to world
        up_index = laser_number[laser_number >= 32]
        down_index = laser_number[laser_number < 32]
        # lidar to ego vehicle
        coordinates[up_index] = transform_points(coordinates[up_index], self.ul_calibration)
        coordinates[down_index] = transform_points(coordinates[down_index], self.dl_calibration)
        
        # classify points with annotations
        if CLASSIFY_CATEGORY:
            for annotation in self.annotations:
                classified = annotation.classify_points(coordinates)
                valid = classified > 0
                categories[valid] = classified[valid]

        # ego vehicle to world
        coordinates = transform_points(coordinates, self.ego_calibration)
        
        self.coordinates = coordinates
        self.intensities = intensities
        self.gps_times = gps_times + self.timestamp
        self.categories = categories
        self.point_index = point_index
        self.is_init = True
    
    # x, y, z, intensity, gps_time, point-index
    def get_points(self)->np.ndarray:
        if not self.is_init:
            self.init()
        
        points = create_points(self.coordinates.shape[0])
        points[:,:3] = self.coordinates
        points[:,POINT_ATTR['INTENSITY']] = self.intensities
        points[:,POINT_ATTR['GPS']] = self.gps_times
        points[:,POINT_ATTR['INDEX']] = self.point_index
        points[:,POINT_ATTR['CATEGORY']] = self.categories
        
        return points

class Camera():
    def __init__(self, timestamp:int, images:list[str], ego_calibration:np.ndarray, sensor_calibrations:CalibrationParser, intrinsics:IntrinsicParser) -> None:
        self.timestamp = timestamp
        self.images = images
        self.ego_calibraion = ego_calibration
        self.sensor_calibrations = sensor_calibrations
        self.intrinsics = intrinsics
        
    def paint_points(self, points:np.ndarray) -> np.ndarray:
        
        painted_points = []
        # world to vehicle
        coords = transform_points(points, np.linalg.inv(self.ego_calibraion))
        
        for _, item in SENSOR.items():
            sensor_id = item['id']
            if sensor_id >= SENSOR['SFL']['id']:
                continue

            valid_height = item['valid_height'] if VALID_HEIGHT_ONLY else [1, 0]
            image = self.images[sensor_id]
            cam_calib = self.sensor_calibrations.get(sensor_id)
            intrinsic = self.intrinsics.get(sensor_id)
            # project points into camera
            pixel, depth = point_to_pixel(coords, np.linalg.inv(cam_calib), intrinsic)
            rgb, mask = get_color_from_image(image, pixel, depth, valid_height[0], valid_height[1])
            points[mask,POINT_ATTR['R']:POINT_ATTR['R']+3] = rgb
            
            painted_points.append(points[mask])
            points = points[np.logical_not(mask)]
            coords = coords[np.logical_not(mask)]
            
            if DEBUG_MODE:
                draw_points_on_image(image, pixel[mask])
                
        points = np.concatenate(painted_points, 0)
        
        return points
    

class Argoverse():
    def __init__(self, root:str) -> None:
        self.root = root
        
        self.sensor_calibrations = CalibrationParser(join(root, 'calibration', 'egovehicle_SE3_sensor.feather'))
        self.intrinsics = IntrinsicParser(join(root, 'calibration', 'intrinsics.feather'))
        
        annotations = AnnotationParser(join(root, 'annotations.feather'))
        ego_poses = EgoPoseParser(join(root, 'city_SE3_egovehicle.feather'))
        self.sensors = self.sync_lidar_camera(self.parse_lidars(ego_poses, annotations), self.parse_cameras(ego_poses))
        
        # parse map
        map_files = glob.glob(join(root, 'map', '*'))
        for file in map_files:
            if 'log_map_archive' in file:
                self.log_map_archive = LogMapArchive(file)
    
    def parse_cameras(self, ego_poses:EgoPoseParser)->list[Camera]:
        num_images = 10e4
        cameras = []
        num_cams = SENSOR['SR']['id'] + 1
        image_files = [[] for _ in range(num_cams)]
        
        # get all jpg from each sensors dir
        for _, item in SENSOR.items():
            if item['id'] >= num_cams:
                continue
            image_files[item['id']] = glob.glob(join(self.root, 'sensors', 'cameras', item['name'], '*.jpg'))
            num_images = min(num_images, len(image_files[item['id']]))
 
        # create Camera instances
        for i in range(num_images):
            timestamp = get_timestamp_from_path(image_files[0][i])
            ego_calibration = ego_poses.get_ego_calibration(timestamp)
            images= []
            for j in range(num_cams):
                images.append(image_files[j][i])
            
            cameras.append(Camera(timestamp, images, ego_calibration, self.sensor_calibrations, self.intrinsics))
        
        return cameras

        
    def parse_lidars(self, ego_poses:EgoPoseParser, annotations:AnnotationParser)->list[Lidar]:
        ul_calibration = self.sensor_calibrations.get([SENSOR['UL']['id']])
        dl_calibration = self.sensor_calibrations.get([SENSOR['DL']['id']])
        
        lidar_files = glob.glob(join(self.root, 'sensors', 'lidar', '*.feather'))
        lidars = []
        for lidar_file in lidar_files:
            timestamp = get_timestamp_from_path(lidar_file)
            ego_calibration = ego_poses.get_ego_calibration(timestamp)
            lidars.append(Lidar(timestamp, lidar_file, ego_calibration, ul_calibration, dl_calibration, annotations.get_annotations(timestamp)))
            
        return lidars
    
    def sync_lidar_camera(self, lidars:list[Lidar], cameras:list[Camera])->list(tuple((Lidar, Camera))):
        sensors = []

        for lidar in lidars:
            timestamp = lidar.timestamp
            
            for camera in cameras:
                if timestamp < camera.timestamp:
                    sensors.append((lidar, camera))
                    break
        
        return sensors

    
    def get_points(self)->np.ndarray:
        num_points = 0
        points_list = []
        
        for sensors in self.sensors:
            lidar:Lidar = sensors[0]
            points = lidar.get_points()
            points[:,POINT_ATTR['INDEX']] += num_points
            num_points += points[-1,POINT_ATTR['INDEX']]
            
            camera:Camera = sensors[1]
            points = camera.paint_points(points)
            points_list.append(points)
        
        # points = self.log_map_archive.get_drivable_points()
        # points[:,POINT_ATTR['CATEGORY']] = 7
        # points_list.append(points)
        
        return np.concatenate(points_list, axis=0)
        

In [5]:
path = '/mnt/d/argoverse/WDC/389069d7-e6db-3d22-9328-e228c002bf75'
argoverse = Argoverse(path)

In [9]:
ids = argoverse.log_map_archive.get_drivable_areas()
for id in ids:
    points = argoverse.log_map_archive.get_drivable_points(id)
    points[:,POINT_ATTR['INDEX']] = np.arange(points.shape[0])
    points[:,POINT_ATTR['INTENSITY']] = 254
    write_las(join(path, f'{id}.las'), points)
    break

In [6]:
write_las(join(path, 'test.las'), argoverse.get_points())