In [None]:
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt

################ USER INPUT ####################
# input numpy file
npy_name = '~/ev_snn_percept/spinnaker/sample_pavement/output_16000_5:1_5:225_10:1_20_L24_G3.0.npy'
# directory containing the annotated images
annot_dir = '~/ev_snn_percept/binary/sample_pavement/vis_annot/'

# tracking algorithm parameters
T_HIDDEN = 50
T_VISIBLE = 10000
D_THRES = 10
ALPHA = 0.7
K = 9999

# For faster development and testing of the detection accuracy set REAL_DETECTION = False.
# This provides hardcoded initial values to the detected pavement, to skip this stage.
# If you want to check the initial detection, too, you need to set REAL_DETECTION = True.
REAL_DETECTION = False
ON_R_INIT      = 34
ON_THETA_INIT  = 43
OFF_R_INIT     = 34
OFF_THETA_INIT = 43

# C = height/2 of the trapezoid bounding the detected area
C = 5

# input dimension is RETINA_DIM x RETINA_DIM
RETINA_DIM = 128
# xy dimension is RETINA_DIM/SCALEDOWN x RETINA_DIM/SCALEDOWN
SCALEDOWN = 4
# planar dimension is THETA_NUM x R_NUM
THETA_NUM = 90
R_NUM = 60  

################################################
# do not change
HORIZ_THREAS = -1

X_NUM = round(RETINA_DIM/SCALEDOWN)
Y_NUM = round(RETINA_DIM/SCALEDOWN)
P_NUM = 2

# the directory used to store the outcome (predictions + visualization + metrics)
pred_dir = npy_name.replace('output_', 'pred_')
pred_dir = pred_dir.replace('.npy','_images') + '_TH' + \
                    str(T_HIDDEN) + '_TV' + str(T_VISIBLE) + '_DT' + \
                    str(D_THRES) + '_A' + str(ALPHA) + '_K' + str(K) + '/'

In [None]:
# FUNCTIONS

def add_line_to_img(r, theta, img, color, thickness):
    if theta >= 1.55: # theta E [89, 90] degrees
        y_0 = r
        y_1 = r
        x_0 = 0
        x_1 = RETINA_DIM - 1
    elif theta <= -1.55: # theta E [-89, -90] degrees
        y_0 = r
        y_1 = r
        x_0 = 0
        x_1 = RETINA_DIM - 1
    else:
        y_0 = 0
        y_1 = RETINA_DIM - 1
        x_0 = (r - y_0*np.sin(theta))/np.cos(theta)
        x_1 = (r - y_1*np.sin(theta))/np.cos(theta)

    start_point = (round(x_0), round(y_0))
    end_point   = (round(x_1), round(y_1))
    img = cv2.line(img, start_point, end_point, color, thickness)

    return img

def quant_to_real_128(r, theta):
    # r, theta are quantized here
                
    # r, theta real
    # r in +-32*sqrt(2) pixel range  
    r = (r - R_NUM/2)*(2*X_NUM*np.sqrt(2)/R_NUM)
    theta = (theta - THETA_NUM/2)*(np.pi/THETA_NUM)

    # r in +-128*sqrt(2) pixel range
    r = (RETINA_DIM/X_NUM) * r

    return r, theta

In [None]:
# TRACKING ALGORITHM

spikes_planar_np = np.load(npy_name)
spikes_planar_sorted_t = spikes_planar_np[np.argsort(spikes_planar_np[:, 1])]

ON_clusters = []
OFF_clusters = []

if not REAL_DETECTION:
    init_ON_cluster = {
        'id': -444,
        "t": 0,
        "r": ON_R_INIT,
        "theta": ON_THETA_INIT,
        "t_exp": 0 + T_VISIBLE,
        "n": K
    }

    init_OFF_cluster = {
        'id': -444,
        "t": 0,
        "r": OFF_R_INIT,
        "theta": OFF_THETA_INIT,
        "t_exp": 0 + T_VISIBLE,
        "n": K
    }
    ON_clusters.append(init_ON_cluster)
    OFF_clusters.append(init_OFF_cluster)

id = -1

pavement = {
    'detected' : False,
    'r': 0,
    'theta': 0,
}
pred_pav_list = []

for spike in spikes_planar_sorted_t:

    # ON EVENTS    
    if spike[0] < THETA_NUM*R_NUM:
        
        t = spike[1]
        r = spike[0]//THETA_NUM
        theta = spike[0]%THETA_NUM

        # deals with 0 and THETA_NUM actually referring to lines with same angle (-pi/2 and pi/2)
        # mathematicaly this means theta E (-pi/2, pi/2]
        if theta <= HORIZ_THREAS:
            theta = THETA_NUM
            r = 2*abs(R_NUM/2 - r) + r
        
        if not pavement['detected']:
            for ON_cluster in ON_clusters:
                if t > ON_cluster['t_exp']:
                    ON_clusters.remove(ON_cluster)

        if len(ON_clusters) != 0:    

            ds = []
            for ON_cluster in ON_clusters:       
                d = np.sqrt((ON_cluster['r']-r)*(ON_cluster['r']-r) + 
                            (ON_cluster['theta']-theta)*(ON_cluster['theta']-theta))
                ds.append(d)
            d_min = min(ds)
            m = np.argmin(ds)

            if d_min <= D_THRES:
                ON_clusters[m]['r'] = (1 - ALPHA)*ON_clusters[m]['r'] + ALPHA*r
                ON_clusters[m]['theta'] = (1 - ALPHA)*ON_clusters[m]['theta'] + ALPHA*theta

                if ON_clusters[m]['n'] >= K - 1:
                    ON_clusters[m]['n'] = K
                    ON_clusters[m]['t_exp'] = t + T_VISIBLE
                else:
                    ON_clusters[m]['n'] += 1
                    ON_clusters[m]['t_exp'] = t + T_HIDDEN
            else:
                if not pavement['detected']:
                    id += 1
                    ON_cluster = {
                        'id': id,
                        "t": t,
                        "r": r,
                        "theta": theta,
                        "t_exp": t + T_HIDDEN,
                        "n": 0
                    }
                    ON_clusters.append(ON_cluster)
        else:
            if not pavement['detected']:
                id += 1  
                ON_cluster = {
                    'id': id,
                    "t": t,
                    "r": r,
                    "theta": theta,
                    "t_exp": t + T_HIDDEN,
                    "n": 0
                }
            ON_clusters.append(ON_cluster)

        for ON_cluster in ON_clusters:
            ON_cluster['t'] = t
    
    # OFF EVENTS 
    else:
        
        t = spike[1]
        r = spike[0]//THETA_NUM - R_NUM
        theta = spike[0]%THETA_NUM

        # deals with 0 and THETA_NUM actually referring to lines with same angle (-pi/2 and pi/2)
        # mathematicaly this means theta E (-pi/2, pi/2]
        if theta <= HORIZ_THREAS:
            theta = THETA_NUM
            r = 2*abs(R_NUM/2 - r) + r
        
        if not pavement['detected']:
            for OFF_cluster in OFF_clusters:
                if t > OFF_cluster['t_exp']:
                    OFF_clusters.remove(OFF_cluster)

        if len(OFF_clusters) != 0:    

            ds = []
            for OFF_cluster in OFF_clusters:    
                d = np.sqrt((OFF_cluster['r']-r)*(OFF_cluster['r']-r) + 
                            (OFF_cluster['theta']-theta)*(OFF_cluster['theta']-theta))   
                ds.append(d)
            d_min = min(ds)
            m = np.argmin(ds)

            if d_min <= D_THRES:
                OFF_clusters[m]['r'] = (1 - ALPHA)*OFF_clusters[m]['r'] + ALPHA*r
                OFF_clusters[m]['theta'] = (1 - ALPHA)*OFF_clusters[m]['theta'] + ALPHA*theta

                if OFF_clusters[m]['n'] >= K - 1:
                    OFF_clusters[m]['n'] = K
                    OFF_clusters[m]['t_exp'] = t + T_VISIBLE
                else:
                    OFF_clusters[m]['n'] += 1
                    OFF_clusters[m]['t_exp'] = t + T_HIDDEN
            else:
                if not pavement['detected']:
                    id += 1
                    OFF_cluster = {
                        'id': id,
                        "t": t,
                        "r": r,
                        "theta": theta,
                        "t_exp": t + T_HIDDEN,
                        "n": 0
                    }
                    OFF_clusters.append(OFF_cluster)
        else:
            if not pavement['detected']:
                id += 1
                OFF_cluster = {
                    'id': id,
                    "t": t,
                    "r": r,
                    "theta": theta,
                    "t_exp": t + T_HIDDEN,
                    "n": 0
                }
                OFF_clusters.append(OFF_cluster)

        for OFF_cluster in OFF_clusters:
            OFF_cluster['t'] = t

    ###################### higher level detection (pavement) #########################
    if not pavement['detected']:
        # the logic below assumes that the first detected pair of ON-OFF 
        # clusters is the desired pavement
        for ON_cluster in ON_clusters:
            if ON_cluster['n'] == K:
                for OFF_cluster in OFF_clusters:
                    if OFF_cluster['n'] == K:
                        pavement['detected'] = True
                        pav_id_on = ON_cluster['id']
                        pav_id_off = OFF_cluster['id']
        if pavement['detected']:
            print('Pavement detected: ON id =', pav_id_on, ', OFF id =', pav_id_off) 
    
    if pavement['detected']:
        for ON_cluster in ON_clusters:
            if ON_cluster['id'] == pav_id_on:
                for OFF_cluster in OFF_clusters:
                    if OFF_cluster['id'] == pav_id_off:
                        pavement['r'] = (ON_cluster['r'] + OFF_cluster['r'])/2
                        pavement['theta'] = (ON_cluster['theta'] + OFF_cluster['theta'])/2    
    
    r, theta = quant_to_real_128(pavement['r'], pavement['theta'])
    pred_pav_list.append((r, theta, t))

    ################################# visualization #################################
    if (not os.path.exists(pred_dir)):
        os.mkdir(pred_dir)
            
    img = (RETINA_DIM - 1)*np.ones((RETINA_DIM, RETINA_DIM, 3), dtype = "uint8") 

    # ON clusters visualization
    for ON_cluster in ON_clusters:
        if ON_cluster['n'] == K:
            r, theta = quant_to_real_128(ON_cluster['r'], ON_cluster['theta'])
            img = add_line_to_img(r, theta, img, (0,255,0), 2)

    # OFF clusters visualization
    for OFF_cluster in OFF_clusters:
        if OFF_cluster['n'] == K:
            r, theta = quant_to_real_128(OFF_cluster['r'], OFF_cluster['theta'])
            img = add_line_to_img(r, theta, img, (0,0,255), 2)

    if pavement['detected']:
        # pavement visualization
        r, theta = quant_to_real_128(pavement['r'], pavement['theta'])
        img = add_line_to_img(r, theta, img, (0,0,0), 1)

    img = cv2.resize(img, (256, 256), interpolation = cv2.INTER_AREA)
    cv2.imwrite(pred_dir + str(int(t)).zfill(6) + '.png', img)

# store the timestamped pavement detections {r(pixels), theta(rad), t(msec)} in numpy format
np.save(pred_dir + '__pred_pav__.npy', pred_pav_list)

In [None]:
# FEATURES EXTRACTION

pred_pav_feature_list = []

for i, pred_pav in enumerate(pred_pav_list):

    r = pred_pav[0]
    theta = pred_pav[1]
    t = pred_pav[2]

    # avoid divisions with 0
    if np.cos(theta) == 0 or np.sin(theta) == 0:
        continue

    r_big = r + C
    r_small = r - C

    ps = []
    distances = []
    best_fits = []

    for r in [r_small, r_big]: 
        for y in [0, 127]:
            x = (r - y*np.sin(theta))/np.cos(theta)
            if (x >= 0) and (x <= 127):
                ps.append((round(x),round(y)))

        for x in [0, 127]:
            y = (r - x*np.cos(theta))/np.sin(theta)
            if (y >= 0) and (y <= 127):
                ps.append((round(x),round(y)))

    # avoid wrong features, if detected line is to close to the frame limits
    if len(ps) < 4:
        continue

    ref_ps = [(round(128/2 + C), 127), (round(128/2 - C), 127), (round(128/2 - C), 0), (round(128/2 + C), 0)]

    for i in range(4):
        for j in range(4):
            if j != i:
                for k in range(4):
                    if k != i and k != j:
                        for l in range(4):
                            if l != i and l != j and l != k:
                                a = ps[i]
                                b = ps[j]
                                c = ps[k]
                                d = ps[l]

                                dist = (a[0] - ref_ps[0][0]) * (a[0] - ref_ps[0][0]) + (a[1] - ref_ps[0][1]) * (a[1] - ref_ps[0][1]) + \
                                       (b[0] - ref_ps[1][0]) * (b[0] - ref_ps[1][0]) + (b[1] - ref_ps[1][1]) * (b[1] - ref_ps[1][1]) + \
                                       (c[0] - ref_ps[2][0]) * (c[0] - ref_ps[2][0]) + (c[1] - ref_ps[2][1]) * (c[1] - ref_ps[2][1]) + \
                                       (d[0] - ref_ps[3][0]) * (d[0] - ref_ps[3][0]) + (d[1] - ref_ps[3][1]) * (d[1] - ref_ps[3][1])
                                distances.append([dist, a, b, c, d])

    min = 9999999
    for distance in distances:
        if distance[0] < min:
            min = distance[0]

    for distance in distances:
        if distance[0] == min:
            best_fits.append(distance)

    A_max = -1
    for best_fit in best_fits:
        p1 = best_fit[1]
        p2 = best_fit[2]
        p3 = best_fit[3]
        p4 = best_fit[4]

        # A = 1/2 * (x1*y2 - x2*y1 + x2*y3 - x3*y2 + ... + xn*y1 - x1*yn)
        A = (p1[0] * p2[1] - p2[0] * p1[1] + \
            p2[0] * p3[1] - p3[0] * p2[1] + \
            p3[0] * p4[1] - p4[0] * p3[1] + \
            p4[0] * p1[1] - p1[0] * p4[1]) # * (1/2)
        
        if A > A_max:
            A_max = A
            feature = [p1[0], p1[1], p2[0], p2[1], p3[0], p3[1], p4[0], p4[1], t]
        
    pred_pav_feature_list.append(feature)

np.save(pred_dir + '__pred_pav_features__.npy', pred_pav_feature_list)

In [None]:
# METRICS AND QUANTITATIVE VALIDATION

annot_pav_list = np.load(annot_dir.replace('vis_annot/', 'annot_pav.npy'))
pred_pav_list = np.load(pred_dir + '__pred_pav__.npy')

###################### R ######################
fig = plt.figure()
x_annot = []
y_annot = []

for i in range(len(annot_pav_list)):
    y_annot.append(annot_pav_list[i][0])
    x_annot.append(annot_pav_list[i][2])  

x_pred = []
y_pred = []
for i in range(len(pred_pav_list)):
    y_pred.append(pred_pav_list[i][0])
    x_pred.append(pred_pav_list[i][2])

plt.figure(facecolor='white')
plt.plot(x_annot, y_annot, label = "annotated r")
plt.plot(x_pred, y_pred, label = "predicted r")
plt.xlabel('t (ms)')
plt.ylabel('r (pixels in 128x128 grid)')
plt.legend()
plt.savefig(pred_dir + '__r__.png')
plt.close(fig)

#################### THETA ####################
fig = plt.figure()
x_annot = []
y_annot = []

for i in range(len(annot_pav_list)):
    y_annot.append(annot_pav_list[i][1])
    x_annot.append(annot_pav_list[i][2])    

x_pred = []
y_pred = []
for i in range(len(pred_pav_list)):
    y_pred.append(pred_pav_list[i][1])
    x_pred.append(pred_pav_list[i][2])

plt.figure(facecolor='white')
plt.plot(x_annot, y_annot, label = "annotated θ")
plt.plot(x_pred, y_pred, label = "predicted θ")
plt.xlabel('t (ms)')
plt.ylabel('θ (rad)')
plt.legend()
plt.savefig(pred_dir + '__theta__.png')
plt.close(fig)