In [None]:
%matplotlib inline

import sys

import matplotlib.pyplot as plt

import os

import matplotlib as mpl

import numpy as np

from PIL import Image

from pyquaternion import Quaternion

In [None]:
sys.path.append("/home/nils/data_fusion/nuscenes-devkit/python-sdk")

In [None]:
from nuscenes.nuscenes import NuScenes

nusc = NuScenes(version='v1.0-mini', dataroot='nuscenes', verbose=True)

In [None]:
from nuscenes.utils.data_classes import LidarPointCloud

from nuscenes.utils.geometry_utils import view_points


# Task 1

## Picking a random sample

In [None]:
my_sample = nusc.sample[10]

my_scene_token =my_sample['scene_token']

my_scene = nusc.get('scene', my_scene_token)

In [None]:
first_sample_token = my_scene['first_sample_token']

first_sample_from_scene = nusc.get('sample', first_sample_token)

## Picking the van in front by hand

In [None]:
vehicle_annotation_tokens = []
for i in range(len(first_sample_from_scene['anns'])):
    first_annotation_token_of_first_sample = first_sample_from_scene['anns'][i]
    if nusc.get('sample_annotation', first_annotation_token_of_first_sample)['category_name'] == 'vehicle.car':
        vehicle_annotation_tokens.append(first_annotation_token_of_first_sample)

# annotation of first frame
van_annotation_token = vehicle_annotation_tokens[7]

In [None]:
def get_lidar_pointcloud(sample_token):
    sample_record = nusc.get('sample', sample_token)
    pointsensor_channel = 'LIDAR_TOP'
    pointsensor_token = sample_record['data'][pointsensor_channel]
    pointsensor = nusc.get('sample_data', pointsensor_token)
    pcl_path = nusc.get('sample_data', pointsensor_token)['filename']
    pc =LidarPointCloud.from_file(os.path.join('nuscenes',pcl_path))
    return pointsensor_token, pc.points

In [None]:
def rotate_pointcloud(cloud, quaternion):
    def rot(an_array):
        return quaternion.rotate(an_array)
    return np.array(list(map(rot, cloud)))

In [None]:
def visualize_absolute(ax, absolute_points, ego_pose, van_translation, relativeboxvectors):
    #plt.axis('equal')
    axes = plt.gca()
    #axes.set_xlim([xmin,xmax])
    #axes.set_ylim([ymin,ymax])
    #ax.set(xlim=(300, 500), ylim=(1100, 1300))
    return ax.scatter(absolute_points[0], absolute_points[1], c='red', s=0.3), ax.scatter(ego_pose['translation'][0], ego_pose['translation'][1]),ax.plot([van_translation[0] + relativeboxvector[0] for relativeboxvector in relativeboxvectors], [van_translation[1] + relativeboxvector[1] for relativeboxvector in relativeboxvectors])[0]

In [None]:
def create_bounding_box(annotation_token):
    '''
    creates bounding box used for plotting
    '''
    van_annotation = nusc.get('sample_annotation', annotation_token)
    relativeboxvectors = [(van_annotation['size'][1]/2,van_annotation['size'][0]/2, 0),
                         (-van_annotation['size'][1]/2,van_annotation['size'][0]/2, 0),
                         (-van_annotation['size'][1]/2,-van_annotation['size'][0]/2, 0),
                         (van_annotation['size'][1]/2,-van_annotation['size'][0]/2, 0),
                         (van_annotation['size'][1]/2,van_annotation['size'][0]/2, 0)]
    return np.array(relativeboxvectors), van_annotation['translation']

In [None]:
#create list of annotation tokens
current_van_annotation_token = van_annotation_token
annotation_tokens = []
while current_van_annotation_token != '':
    annotation_tokens.append(current_van_annotation_token)
    #next annotaton token
    current_van_annotation_token = nusc.get('sample_annotation', current_van_annotation_token)['next']

## Create Video

In [None]:
# same as last time. Now we just rotate back.

In [None]:
ims = []
fig = plt.figure()
ax = fig.add_subplot(111)  # fig and axes created once
frontimages = []
raw_measurements = []
timestamps = []
annotated_positions = []
# Display points and bounding box in global coordinates over whole scene
for i,current_van_annotation_token in enumerate(annotation_tokens):

    # filter out points outside bounding box


    ## Grab the data

    van_annotation = nusc.get('sample_annotation', current_van_annotation_token)

    #get respective frame token
    sample_token = van_annotation['sample_token']

    timestamps.append(nusc.get('sample', sample_token)['timestamp'])


    pointsensor_token, points = get_lidar_pointcloud(sample_token)
    calibrated_sensor_token = nusc.get('sample_data', pointsensor_token)['calibrated_sensor_token']
    calibrated_sensor = nusc.get('calibrated_sensor', calibrated_sensor_token)
    ego_pose = nusc.get('ego_pose', nusc.get('sample_data', pointsensor_token)['ego_pose_token'])

    ## translate points to global reference frame

    ### Rotate points around sensor and ego rotation

    pointsensor_quaternion = Quaternion(calibrated_sensor['rotation'])
    ego_quaternion = Quaternion(ego_pose['rotation'])

    sensor_rotated_points = np.dot(pointsensor_quaternion.rotation_matrix, points[:3,:])
    rotated_points = np.dot(ego_quaternion.rotation_matrix, sensor_rotated_points)

    # use broadcasting to add translation to x and y dimension
    absolute_points = rotated_points+np.array(ego_pose['translation'][:3]).reshape(-1,1)


    ### rotate points by inverse bounding box rotation (translation to its reference frame unnecessary for only filtering)
    reverse_van_quaternion = Quaternion(w=-1*van_annotation['rotation'][0], x=van_annotation['rotation'][1], y=van_annotation['rotation'][2], z=van_annotation['rotation'][3])
    points_bounding_rotation = np.dot(reverse_van_quaternion.rotation_matrix, absolute_points)

    #new stuff
    relativeboxvectors, position = create_bounding_box(current_van_annotation_token)

    annotated_positions.append(position)
    rotated_box = relativeboxvectors+reverse_van_quaternion.rotate(position)

    x_max = rotated_box[:,0].max()
    x_min = rotated_box[:,0].min()

    y_max = rotated_box[:,1].max()
    y_min = rotated_box[:,1].min()

    ## Remove points that are outside the bounding box
    mask = np.ones(points_bounding_rotation.shape[1], dtype=bool)

    mask = np.logical_and(mask, points_bounding_rotation[0] > x_min)
    mask = np.logical_and(mask, points_bounding_rotation[0] < x_max)

    mask = np.logical_and(mask, points_bounding_rotation[1] > y_min)
    mask = np.logical_and(mask, points_bounding_rotation[1] < y_max)

    points_filtered_rotated = points_bounding_rotation[:,mask]

    ### rotate points back

    van_quaternion = Quaternion(van_annotation['rotation'])
    points_filtered_absolute = np.dot(van_quaternion.rotation_matrix, points_filtered_rotated)
    # append list of plots to movie
    rotated_relativeboxvectors = rotate_pointcloud(relativeboxvectors, van_quaternion)
    plotlist = visualize_absolute(ax, points_filtered_absolute, ego_pose, van_annotation['translation'], rotated_relativeboxvectors)
    raw_measurements.append(points_filtered_absolute)
    ims.append(plotlist)     

ax.axis('equal')


## Create Movie

In [None]:
ani = mpl.animation.ArtistAnimation(fig, ims, repeat=False)
ani.save('filtered_im.gif', writer='pillow')

# Task 2

In [None]:
def prediction_step(state_vector, covariance_matrix, sigma_v, t):
    F = np.array(((1,0,t,0),(0,1,0,t),(0,0,1,0),(0,0,0,1)))
    transition_error = np.array((0.5*t**2, 0.5*t**2, t, t))
    Q = sigma_v**2*np.dot(transition_error.reshape(-1,1),transition_error.reshape(-1,1).T)
    
    state_vector_next = np.dot(F, state_vector)
    C_next = np.dot(np.dot(F, covariance_matrix), F.T) + Q
    return state_vector_next, C_next

## b)

In [None]:
def measuring_step(state_vector, C, measurement, R):
    H = np.array(((1, 0, 0, 0), (0,1,0,0)))
    S = np.dot(np.dot(H,C), H.T) + R
    K = np.dot(np.dot(C, H.T), np.linalg.inv(S))
    C_next = C - np.dot(np.dot(K, S),K.T)
    state_vector_next = state_vector + np.dot(K,np.array(measurement) - np.dot(H, state_vector))
    return state_vector_next, C_next

## c)

In [None]:
measurements = []
t_last = timestamps[0]
for r, t in zip(raw_measurements, timestamps):
    x_mean = r[0].mean()
    y_mean = r[1].mean()
    measurements.append((x_mean, y_mean, (t-t_last)/1000000))
    t_last = t
# throw first 11 away because some are empty
cleaned_measurements = measurements[11:]

In [None]:
def run_filter(C,sigma_v,R):
    initial_position = np.array(cleaned_measurements[1][:2])
    initial_velocity = (np.array(cleaned_measurements[1][:2]) - np.array(cleaned_measurements[0][:2]))/cleaned_measurements[0][2]
    state_vector = np.append(initial_position,initial_velocity)

    estimated_positions = []
    for i,m in enumerate(cleaned_measurements):
        state_vector, C = prediction_step(state_vector, C, sigma_v, t = m[2])
        estimated_positions.append(state_vector)
        state_vector, C = measuring_step(state_vector, C, m[:2], R)
    return estimated_positions


In [None]:
def squared_distance(v1, v2):
    assert len(v1) == len(v2), "vectors have different lengths"
    summe = 0
    for i in range(len(v1)):
        summe += (v1[i][0]-v2[i][0])**2
        summe += (v1[i][1]-v2[i][1])**2
    return summe

In [None]:
C = 0.3*np.eye(4)
sigma_v= 0.8
r = 0.01
R = np.array(((r,0),(0,r)))

estimated_positions = run_filter(C,sigma_v,R)

print(f'error:{squared_distance(estimated_positions, annotated_positions[11:])}')

fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot([pos[0] for pos in estimated_positions], [pos[1] for pos in estimated_positions])
plt.plot([pos[0] for pos in annotated_positions[11:]], [pos[1] for pos in annotated_positions[11:]])
ax.axis('equal')