In [None]:
import os
from pathlib import Path
from functools import reduce

import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import minimize
import plotly.express as px

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error

In [None]:
PATH = Path("./dataset/")

In [None]:
train = pd.read_csv(PATH / "train.csv")
test = pd.read_csv(PATH / "sample_submission.csv")

camera_matrix = np.array([[2304.5479, 0, 1686.2379],
                          [0, 2305.8758, 1354.9849],
                          [0, 0, 1]])
camera_matrix_inv = np.linalg.inv(camera_matrix)

In [None]:
train.head()

In [None]:
def imread(path, fast_mode=False):
    img = cv2.imread(str(path))
    if not fast_mode and img is not None and len(img.shape) == 3:
        img = np.array(img[:, :, ::-1])
    return img

In [None]:
train.iloc[0]["ImageId"]

In [None]:
img = imread(PATH / 'train_images/{}.jpg'.format(train.iloc[0]["ImageId"]))
IMG_SHAPE = img.shape
plt.figure(figsize=(15, 8))
plt.imshow(img)

In [None]:
def str2coords(s, names=['id', 'yaw', 'pitch', 'roll', 'x', 'y', 'z']):
    coords = []
    for l in np.array(s.split((" "))).astype(np.float).reshape([-1, 7]):
        coords.append(dict(zip(names, l)))
        if 'id' in coords[-1].keys():
            coords[-1]['id'] = int(coords[-1]['id'])
    return coords

In [None]:
str2coords(train.iloc[0]["PredictionString"])

In [None]:
lens = [len(str2coords(s)) for s in train['PredictionString']]
plt.figure(figsize=(15, 6))
sns.countplot(lens)
plt.xlabel("Number of cars in image")

In [None]:
points_df = pd.DataFrame()
for col in ['x', 'y', 'z', 'yaw', 'pitch', 'roll']:
    arr = []
    for ps in train['PredictionString']:
        coords = str2coords(ps)
        arr += [c[col] for c in coords]
    points_df[col] = arr
    
print('len(points_df) ', len(points_df))
points_df.head()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['x'], bins=500)
plt.xlabel('x')
plt.show()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['y'], bins=500)
plt.xlabel('y')
plt.show()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['z'], bins=500)
plt.xlabel('z')
plt.show()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['yaw'] * 180 / np.pi, bins=500)
plt.xlabel('yaw (degree)')
plt.show()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['pitch'] * 180 / np.pi, bins=500)
plt.xlabel("pitch (degree)")
plt.show()

In [None]:
plt.figure(figsize=(15, 6))
sns.distplot(points_df['roll'] * 180 / np.pi, bins=500)
plt.xlabel("roll (degree)")
plt.show()

In [None]:
def get_img_coords(s):
    coords = str2coords(s)
    P = np.array([[c['x'], c['y'], c['z']] for c in coords]).T
    img_p = np.dot(camera_matrix, P).T
    return img_p[:, 0] / img_p[:, 2], img_p[:, 1] / img_p[:, 2]

plt.figure(figsize=(14, 14))
plt.imshow(imread(PATH / "train_images/{}.jpg".format(train['ImageId'][2217])))
plt.scatter(*get_img_coords(train['PredictionString'][2217]), color='red', s=100)

In [None]:
plt.figure(figsize=(14, 14))

plt.imshow(imread(PATH / "train_images/{}.jpg".format(train['ImageId'][2217])),
           alpha=0.3)

xs, ys = [], []
for s in train['PredictionString']:
    x, y = get_img_coords(s)
    xs += list(x)
    ys += list(y)
    
plt.scatter(xs, ys, color='red', s=2)
plt.show()

In [None]:
plt.figure(figsize=(14, 14))

plt.scatter(points_df['x'], np.sqrt(points_df['y'] ** 2 + points_df['z'] ** 2), color='red', s=2, alpha=0.2)
plt.xlim([-50, 50])
plt.ylim([0, 100])
plt.show()

In [None]:
fig = px.scatter_3d(points_df, x='x', y='y', z='z', color='pitch', 
                    range_x=(-50, 50), range_y=(0, 50), range_z=(0, 250),
                    opacity=0.5)
fig.show()

In [None]:
zy_slope = LinearRegression()
X = points_df[['z']]
y = points_df['y']
zy_slope.fit(X, y)

print('MAE without x: ', mean_absolute_error(y, zy_slope.predict(X)))

xzy_slope = LinearRegression()
X = points_df[['x', 'z']]
y = points_df['y']
xzy_slope.fit(X, y)

print('MAE with x: ', mean_absolute_error(y, xzy_slope.predict(X)))

print('\ndy/dx = {:.3f}\ndy/dx = {:.3f}'.format(*xzy_slope.coef_))

In [None]:
zy_slope.__dict__

In [None]:
plt.figure(figsize=(14, 14))
z_reg = [z for z in range(0, 500)]
y_reg = [zy_slope.intercept_ + zy_slope.coef_[0] * z for z in z_reg]
plt.scatter(points_df['z'], points_df['y'], label='Real points')
plt.plot(z_reg, y_reg, color='orange', label='Regression')
plt.xlim([0, 500])
plt.ylim([0, 100])
plt.legend()

In [None]:
from math import sin, cos

def eular2rot(yaw, pitch, roll):
    y = np.array([[cos(yaw), 0, sin(yaw)],
                  [0, 1, 0],
                  [-sin(yaw), 0, cos(yaw)]])
    p = np.array([[1, 0, 0],
                  [0, cos(pitch), -sin(pitch)],
                  [0, sin(pitch), cos(pitch)]])
    r = np.array([[cos(roll), -sin(roll), 0],
                  [sin(roll), cos(roll), 0],
                  [0, 0, 1]])
    return np.dot(y, np.dot(p, r))

In [None]:
def draw_line(image, points):
    color = (255, 0, 0)
    cv2.line(image, tuple(points[0][:2]), tuple(points[3][:2]), color, 16)
    cv2.line(image, tuple(points[0][:2]), tuple(points[1][:2]), color, 16)
    cv2.line(image, tuple(points[1][:2]), tuple(points[2][:2]), color, 16)
    cv2.line(image, tuple(points[2][:2]), tuple(points[3][:2]), color, 16)
    return image

def draw_points(image, points):
    for (p_x, p_y, p_z) in points:
        cv2.circle(image, (p_x, p_y), int(1000 / p_z), (0, 255, 0), -1)
    return image

In [None]:
def visualize(img, coords):
    x_1 = 1.02
    y_1 = 0.80
    z_1 = 2.31
    
    img = img.copy()
    for point in coords:
        x, y, z = point['x'], point['y'], point['z']
        yaw, pitch, roll = -point['pitch'], -point['yaw'], -point['roll']
        Rt = np.eye(4)
        t = np.array([x, y, z])
        Rt[:3, 3] = t
        Rt[:3, :3] = eular2rot(yaw, pitch, roll).T
        Rt = Rt[:3, :]
        P = np.array([[x_1, -y_1, -z_1, 1],
                      [x_1, -y_1, z_1, 1],
                      [-x_1, -y_1, z_1, 1],
                      [-x_1, -y_1, -z_1, 1],
                      [0, 0, 0, 1]]).T
        img_cor_points = np.dot(camera_matrix, np.dot(Rt, P))
        img_cor_points = img_cor_points.T
        img_cor_points[:, 0] /= img_cor_points[:, 2]
        img_cor_points[:, 1] /= img_cor_points[:, 2]
        img_cor_points = img_cor_points.astype(int)
        
        img = draw_line(img, img_cor_points)
        img = draw_points(img, img_cor_points[-1:])
        
    return img

In [None]:
n_rows= 6

for idx in range(n_rows):
    fig, axes = plt.subplots(1, 2, figsize=(20, 20))
    img = imread(PATH / "train_images/{}.jpg".format(train["ImageId"][idx]))
    axes[0].imshow(img)
    img_vis = visualize(img, str2coords(train['PredictionString'][idx]))
    axes[1].imshow(img_vis)
    plt.show()

In [None]:
IMG_WIDTH = 1024
IMG_HEIGHT = IMG_WIDTH // 16 * 5
MODEL_SCALE = 8

def rotate(x, angle):
    x = x + angle
    x = x - (x + np.pi) // (2 * np.pi) * 2 * np.pi
    return x

def _regr_preprocess(regr_dict, flip=False):
    if flip:
        for k in ['x', 'pitch', 'roll']:
            regr_dict[k] = -regr_dict[k]

    for name in ['x', 'y', 'z']:
        regr_dict[name] = regr_dict[name] / 100
        
    regr_dict['roll'] = rotate(regr_dict['roll'], np.pi)
    regr_dict['pitch_sin'] = sin(regr_dict['pitch'])
    regr_dict['pitch_cos'] = cos(regr_dict['pitch'])
    regr_dict.pop('pitch')
    regr_dict.pop('id')
    return regr_dict

def _regr_back(regr_dict):
    for name in ['x', 'y', 'z']:
        regr_dict[name] = regr_dict[name] * 100
    regr_dict['roll'] = rotate(regr_dict['roll'], -np.pi)
    pitch_sin = regr_dict['pitch_sin'] / np.sqrt(
        regr_dict['pitch_sin'] ** 2 + regr_dict['pitch_cos'] ** 2)
    pitch_cos = regr_dict['pitch_cos'] / np.sqrt(
        regr_dict['pitch_sin'] ** 2 + regr_dict['pitch_cos'] ** 2)
    regr_dict['pitch'] = np.arccos(pitch_cos) * np.sign(pitch_sin)
    return regr_dict

def preprocess_image(img, flip=False):
    img = img[img.shape[0] // 2:]
    bg = np.ones_like(img) * img.mean(1, keepdims=True).astype(img.dtype)
    bg = bg[:, :img.shape[1] // 6]
    img = np.concatenate([bg, img, bg], 1)
    img = cv2.resize(img, (IMG_WIDTH, IMG_HEIGHT))
    if flip:
        img = img[:, ::-1]
    return (img / 255).astype('float32')

def get_mask_and_regr(img, labels, flip=False):
    mask = np.zeros([IMG_HEIGHT // MODEL_SCALE, IMG_WIDTH // MODEL_SCALE], dtype='float32')
    regr_names = ['x', 'y', 'z', 'yaw', 'pitch', 'roll']
    regr = np.zeros([IMG_HEIGHT // MODEL_SCALE, IMG_WIDTH // MODEL_SCALE, 7], dtype='float32')
    coords = str2coords(labels)
    xs, ys = get_img_coords(labels)
    
    for x, y, regr_dict in zip(xs, ys, coords):
        x, y = y, x
        x = (x - img.shape[0] // 2) * IMG_HEIGHT / (img.shape[0] // 2) / MODEL_SCALE
        x = np.round(x).astype('int')
        y = (y + img.shape[1] // 6) * IMG_WIDTH / (img.shape[1] * 4 / 3) / MODEL_SCALE
        y = np.round(y).astype('int')
        if x >= 0 and x < IMG_HEIGHT // MODEL_SCALE and y >= 0 and y < IMG_WIDTH // MODEL_SCALE:
            mask[x, y] = 1
            regr_dict = _regr_preprocess(regr_dict, flip)
            regr[x, y] = [regr_dict[n] for n in sorted(regr_dict)]
    if flip:
        mask = np.array(mask[:, ::-1])
        regr = np.array(regr[:, ::-1])
    return mask, regr

In [None]:
img0 = imread(PATH / Path("train_images") / Path(train['ImageId'][0] + '.jpg'))
img = preprocess_image(img0)

mask, regr = get_mask_and_regr(img0, train['PredictionString'][0])
print("img.shape", img.shape, "std:", np.std(img))
print("mask.shape", mask.shape, "std:", np.std(mask))
print("regr.shape", regr.shape, "std:", np.std(regr))

plt.figure(figsize=(16, 16))
plt.title("processed image")
plt.imshow(img)
plt.show()

plt.figure(figsize=(16, 16))
plt.title("Detection Mask")
plt.imshow(mask)
plt.show()

plt.figure(figsize=(16, 16))
plt.title("Yaw values")
plt.imshow(regr[:, :, -2])
plt.show()

In [None]:
DISTANCE_THRESH_CLEAR = 2

def convert_3d_to_2d(x, y, z, 
                     fx=2304.5479, fy=2305.8757,
                     cx=1686.2379, cy=1354.9849):
    return x * fx / z + cx, y * fy / z + cy

def optimize_xy(r, c, x0, y0, z0, flipped=False):
    def distance_fn(xyz):
        x, y, z = xyz
        xx = -x if flipped else x
        slope_err = (xzy_slope.predict([[xx, z]])[0] - y) ** 2
        x, y = convert_3d_to_2d(x, y, z)
        y, x = x, y
        x = (x - IMG_SHAPE[0] // 2) * IMG_HEIGHT \
            / (IMG_SHAPE[0] // 2) / MODEL_SCALE
        y = (y + IMG_SHAPE[1] // 6) * IMG_WIDTH \
            / (IMG_SHAPE[1] * 4 / 3) / MODEL_SCALE
        return max(0.2, (x - r) ** 2 + (y - c) ** 2) \
            + max(0.4, slope_err)
    
    res = minimize(distance_fn, [x0, y0, z0], method='Powell')
    x_new, y_new, z_new = res.x
    return x_new, y_new, z_new

def clear_duplicates(coords):
    for c1 in coords:
        xyz1 = np.array([c1['x'], c1['y'], c1['z']])
        for c2 in coords:
            xyz2 = np.array([c2['x'], c2['y'], c2['z']])
            distance = np.sqrt(((xyz1 - xyz2) ** 2).sum())
            if distance < DISTANCE_THRESH_CLEAR:
                if c1['confidence'] < c2['confidence']:
                    c1['confidence'] = -1
    return [c for c in coords if c['confidence'] > 0]

def extract_coords(prediction, flipped=False):
    logits = prediction[0]
    regr_output = prediction[1:]
    points = np.argwhere(logits > 0)
    col_names = sorted(['x', 'y', 'z', 'yaw', 
                        'pitch_sin', 'pitch_cos', 'roll'])
    coords = []
    for r, c in points:
        regr_dict = dict(zip(col_names, regr_output[:, r, c]))
        coords.append(_regr_back(regr_dict))
        coords[-1]['confidence']  = 1 / (1 + np.exp(-logits[r, c]))
        coords[-1]['x'], coords[-1]['y'], coords[-1]['z'] = \
            optimize_xy(r, c, 
                        coords[-1]['x'],
                        coords[-1]['y'],
                        coords[-1]['z'], flipped)
    coords = clear_duplicates(coords)
    return coords

def coords2str(coords, names=['yaw', 'pitch', 'roll', 'x', 'y', 'z', 'confidence']):
    s = []
    for c in coords:
        for n in names:
            s.append(str(c.get(n, 0)))
    return ' '.join(s)

In [None]:
for idx in range(2):
    fig, axes = plt.subplots(1, 2, figsize=(20, 20))
    
    for ax_i in range(2):
        img0 = imread(PATH / Path("train_images/{}.jpg".format(train['ImageId'].iloc[idx])))
        if ax_i == 1:
            img0 = img0[:, ::-1]
        img = preprocess_image(img0, ax_i==1)
        mask, regr = get_mask_and_regr(img0, train['PredictionString'][idx], ax_i==1)
        regr = np.rollaxis(regr, 2, 0)
        coords = extract_coords(np.concatenate([mask[None], regr], 0), ax_i==1)
        
        axes[ax_i].set_title('Flip = {}'.format(ax_i==1))
        axes[ax_i].imshow(visualize(img0, coords))
    plt.show()

In [None]:
import tensorflow as tf

df_train, df_dev = train_test_split(train, test_size=0.01, random_state=42)

train_dataset = tf.data.Dataset.from_tensor_slices(df_train.values)
dev_dataset = tf.data.Dataset.from_tensor_slices(df_dev.values)
test_dataset = tf.data.Dataset.from_tensor_slices(test.values)

In [None]:
for d in train_dataset.take(1):
    #print(d[0])
    print(d[0].numpy())
    print(PATH / "train_images/{}.jpg".format(d[0].numpy()))
    img0 = imread(PATH / "train_images/{}.jpg".format(d[0].numpy().decode()))


In [None]:
#train_id_ds = tf.data.Dataset.from_tensor_slices(df_train["ImageId"].values)
train_fnames = "dataset/train_images/" + df_train["ImageId"].values + ".jpg"
dev_fnames = "dataset/train_images/" + df_dev["ImageId"].values + ".jpg"
test_fnames = "dataset/train_images/" + test["ImageId"].values + ".jpg"

train_fnames_ds = tf.data.Dataset.from_tensor_slices(train_fnames)
dev_fnames_ds = tf.data.Dataset.from_tensor_slices(dev_fnames)
test_fnames_ds = tf.data.Dataset.from_tensor_slices(test_fnames)

In [None]:
def load_image(fname):
    image = tf.image.decode_jpeg(tf.io.read_file(fname))
    return image

In [None]:
train_image_ds = train_fnames_ds.map(load_image)
dev_image_ds = dev_fnames_ds.map(load_image)
test_image_ds = test_fnames_ds.map(load_image)

In [None]:
train_labels_ds = tf.data.Dataset.from_tensor_slices(df_train["PredictionString"])
dev_labels_ds = tf.data.Dataset.from_tensor_slices(df_dev["PredictionString"])
test_labels_ds = tf.data.Dataset.from_tensor_slices(test["PredictionString"])

train_ds = tf.data.Dataset.zip((train_image_ds, train_labels_ds))
dev_ds = tf.data.Dataset.zip((dev_image_ds, dev_labels_ds))
test_ds = tf.data.Dataset.zip((test_image_ds, test_labels_ds))

In [None]:
def load_and_preprocess(data, training=True):
    image_id, labels = data[0].numpy(), data[1].numpy()
    flip = False
    if training:
        flip = np.random.randint(10) == 1

    img0 = tf.image.decode_jpeg(
        str(PATH / "train_images/{}.jpg".format(image_id.decode())), channels=3)
    img = preprocess_image(img0, flip=flip)
    img = np.rollaxis(img, 2, 0)
    
    mask, regr = get_mask_and_regr(img0, labels, flip=flip)
    regr = np.rollaxis(regr, 2, 0)
    return img0 #, mask, regr
