In [1]:
import os
os.chdir('/Users/nick/Documents/school/research/EfficientLPR')
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn import svm
from rapidfuzz import fuzz
import pickle
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import MinMaxScaler
from tqdm import trange

In [2]:
def get_l2_norm(y_trues, y_preds):
    distances = []
    for i in range(len(y_preds)): # calculate euclidian distance between preds and answer
        y_true, y_pred = y_trues[i], y_preds[i]
        distances.append(np.linalg.norm(y_true - y_pred))
    return np.expand_dims(distances,1)

def get_lev_distance(y_true, y_preds):
    lev_distances = []
    for row in np.hstack([np.expand_dims(y_preds, 1), np.expand_dims(y_true, 1)]):
        lev_distances.append(fuzz.ratio(row[0], row[1]))
    return np.expand_dims(lev_distances,1)

In [258]:
"""
Script that transforms the predictions into three features:
    lp_distance: a similarity metric representing the levenshtein distance between true and predicted LP
    body_distance: distance between true and predicted body
    color_distance: distance between true and predicted color

"""
test = True

predictions_path = f'predictions_nzvd_{"test" if test else "train"}.full.csv'
trues_path = f'data/processed/nzvd/{"test" if test else "train"}_annotations.csv'
lps_path = f'data/raw/nzvd/{"test" if test else "train"}_labels.csv'
classes_path = 'data/processed/classes.csv'
colors_path = 'data/processed/colors.csv'

# get class data
classes = [x[0] for x in pd.read_csv(classes_path, header=None).values]
colors = [x[0] for x in pd.read_csv(colors_path, header=None).values]
class_labels = {x:i for i,x in enumerate(classes)}
color_labels = {x:i for i,x in enumerate(colors)}

# get y_pred 
preds = pd.read_csv(predictions_path)
preds = preds.fillna('') # fill NaN values with empty string

# get y_true
lps = pd.read_csv(lps_path)[['lp-string']].T.squeeze()
lps = lps.apply(lambda x: str(x).replace(' ', ''))
trues = pd.read_csv(trues_path, header=None)
if 'train' in trues_path:
    trues = pd.concat([trues, pd.read_csv(trues_path.replace('train', 'val'), header=None)])
trues.columns = ['file', 't', 'l', 'h', 'w', 'body', 'color']
trues = trues.sort_values(by=['file'])
trues.reset_index(inplace=True)
trues = trues.assign(lp=lps)

if False:
    print(dict(zip(*np.unique(trues[['color']].values, return_counts=True))))
    print(dict(zip(*np.unique(trues[['body']].values, return_counts=True))))

# LICENSE PLATES    
lp_true, lp_pred = trues[['lp']].values.squeeze(), preds[['lp']].values.squeeze()
lp_acc = np.mean([lp_true == lp_pred])
print("LP Exact Accuracy:", f'{lp_acc}%', )

def featurize(trues, preds):
    """Converts [preds, true] into [levenshtein distance, CCE_body, CCE_color]"""

    # levenshtein distance of license plates
    lev_distances = get_lev_distance(trues[['lp']].values.squeeze(), preds[['lp']].values.squeeze())

    # BODY
    body_true = list(map(lambda x: class_labels[x], trues[['body']].values.squeeze().tolist()))
    body_true = tf.one_hot(body_true, depth=len(class_labels))
    body_headers = [header for header in preds.columns if header.startswith('body')]
    body_pred = preds[body_headers].values
    body_cce = np.expand_dims(tf.losses.categorical_crossentropy(body_true, body_pred).numpy(), 1)

    # COLOR
    color_true = list(map(lambda x: color_labels[x], trues[['color']].values.squeeze().tolist()))
    color_true = tf.one_hot(color_true, depth=len(color_labels))
    color_headers = [header for header in preds.columns if header.startswith('color')]
    color_pred = preds[color_headers].values
    color_cce = np.expand_dims(tf.losses.categorical_crossentropy(color_true, color_pred).numpy(), 1)

    # return [100-lev_distances, color_cce, body_cce] #full
    # return [ color_cce, body_cce] #ablate lp
    # return [100-lev_distances, color_cce] #ablate body
    return [100-lev_distances, body_cce] #ablate color
    # return [100-lev_distances] # ablate both
    
y_positive = np.expand_dims(np.repeat([1], len(preds)), 1) # positive samples have class==1
x_positive = np.hstack(featurize(trues, preds))
x_negative = np.empty((0,x_positive.shape[1]))
y_negative = np.empty((0,1))

# create negatives
for i, sample in enumerate(trues.iloc):
    """ Create negative samples. For each true sample, pair with every non-matching sample """
    headers = sample.index.values
    neg_true = [sample.values for _ in range(len(trues)-1)]
    neg_true = pd.DataFrame(neg_true, columns=headers)
    # add all preds except current sample
    neg_pred = pd.concat([preds.iloc[:i], preds.iloc[i+1:]])
    neg_x = featurize(neg_true, neg_pred)
    neg_x = np.hstack(neg_x)
    neg_y = np.zeros((len(neg_x), 1))

    x_negative = np.concatenate([x_negative, neg_x])
    y_negative = np.concatenate([y_negative, neg_y])

x_negative = np.array(x_negative)
y_negative = np.array(y_negative)

np.random.seed(1)

# make some "close" samples where LD is the same, but body/color is wrong
close_idxs = np.random.randint(0, len(x_positive), len(x_positive))
x_close = np.hstack([x_positive[:,0].reshape(-1,1), x_negative[close_idxs,1:]])

# balance data
neg_idxs = np.random.randint(0, len(x_positive), len(x_positive))
x_negative = x_negative[neg_idxs]
y_negative = y_negative[neg_idxs]

x1 = np.vstack([x_positive, x_negative])
x2 = np.vstack([x_positive, x_close])
y = np.vstack([y_positive, y_negative])

scaler1 = MinMaxScaler().fit(x1)
scaler2 = MinMaxScaler().fit(x2)

x1 = scaler1.transform(x1)
x2 = scaler2.transform(x2)

LP Exact Accuracy: 0.74%


In [262]:
import joblib
if not test:
    print('Training models')
    
    # classifies if in db
    gater = LogisticRegression().fit(x1, y)

    # picks which one
    assigner = LogisticRegression().fit(x2, y)

    joblib.dump(gater, 'logistic1.joblib')
    joblib.dump(assigner, 'logistic2.joblib')

else:
    gater = joblib.load('logistic1.joblib')
    assigner = joblib.load('logistic2.joblib')

score_accs = []
unmatched = []

import time
elapsed = 0

# iterate over predictions, comparing each pred against whole GT database
# if system works, the i-th GT entry should be the max fit
for i, prediction in enumerate(preds.iloc):
    
    # initialize y_array
    y_true = np.zeros(len(trues))
    
    # set single true match
    y_true[i] = 1

    # repeat sample
    colnames = prediction.index.values
    prediction = prediction.values
    x_pred = [prediction for _ in range(len(trues))]
    x_pred = pd.DataFrame(x_pred, columns=colnames)

    s = time.time()
    
    # compare sample against all samples
    x_pred = featurize(trues, x_pred)
    x_pred = np.hstack(x_pred)

    candidate_scores = gater.predict_proba(scaler1.transform(x_pred))[:,1]
    proposals = np.where(candidate_scores > 0.5)[0]

    # get proposal(s) by lowest LD
    min_ld_idx = np.where(x_pred[:,0] == x_pred[:,0].min())
    x_pred = x_pred[min_ld_idx]
    y_cand = y_true[min_ld_idx]

    candidate_scores = assigner.predict_proba(scaler2.transform(x_pred))[:,1]
    max_score_idx = np.argmax(candidate_scores)
    matched = y_cand[max_score_idx]
    
    pred_lp = prediction[-1]
    true_lp = trues.iloc[i][['lp']].values[0]
    fname = trues.iloc[i][['file']].values[0]

    if not matched or len(proposals) < 1:
        unmatched.append([pred_lp, true_lp, fname])
        score_accs.append(False)
    else:
        score_accs.append(matched)
    elapsed += time.time() - s

    
print('Matched', np.mean(score_accs)*100, '%')
print('Gating model coeffs', gater.coef_[0])
print('Assignment model coeffs', assigner.coef_[0])
print('Compared {} instances to {} others in {}s'.format(len(preds), len(preds)-1, elapsed))
print('Unmatched', unmatched)

Matched 97.0 %
Gating model coeffs [-6.84179102 -2.69650473]
Assignment model coeffs [ 0.00838192 -4.77696708]
Compared 100 instances to 99 others in 0.5820460319519043s
Unmatched [['J66', 'UA7896', 'test/429.jpg'], ['', 'KQ2057', 'test/440.jpg'], ['LCJ513', 'LCJ510', 'test/471.jpg']]


In [260]:
# testing only the gating model
threshold = 0.5

prec = tf.metrics.Precision()
rec = tf.metrics.Recall()
acc = tf.metrics.Accuracy()

for x_, y_ in zip(x1, y):
    if gater.predict_proba(np.expand_dims(x_, 0))[0,1] > threshold:
        p = 1
    else:
        p = 0
    prec.update_state(y_,[p])
    rec.update_state(y_,[p])
    acc.update_state(y_,[p])
    # if not p == y_:
    #     print(x_, y_)
print('Precision', prec.result().numpy())
print('Recall', rec.result().numpy())
print('Accuracy', acc.result().numpy())

Precision 1.0
Recall 0.97
Accuracy 0.985


In [261]:
# testing the assignment model
# this is where the color and body needs to be useful
prec = tf.metrics.Precision()
rec = tf.metrics.Recall()
acc = tf.metrics.Accuracy()
for x_,y_ in zip(x2,y):
    p = assigner.predict_proba(np.expand_dims(x_,0))[0,1] > 0.5
    prec.update_state(y_,[p])
    rec.update_state(y_,[p])
    acc.update_state(y_,[p])

print('Precision', prec.result().numpy())
print('Recall', rec.result().numpy())
print('Accuracy', acc.result().numpy())
# this test effectively makes our logistic reg choose between two options

Precision 0.61333334
Recall 0.92
Accuracy 0.67


# Graph stuff

In [None]:
import seaborn as sns
sns.set()

d = pd.DataFrame(np.hstack([x[:size//2],y[:size//2]]), columns=['lp', 'color', 'body', 'match'])
ax = sns.catplot(data=d[['lp', 'color', 'body']])
ax.set(xlabel='feature', ylabel='normalized value')

In [None]:
import seaborn as sns
sns.set()

d = pd.DataFrame(np.hstack([x[size//2:],y[size//2:]]), columns=['lp', 'color', 'body', 'match'])
ax = sns.catplot(data=d[['lp', 'color', 'body']])
ax.set(xlabel='feature', ylabel='normalized value')

# Ignore below 

In [None]:

import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

scores = []

# step 1: assuming body == color weighting, adjust LP weighting to find highest accuracy
k_values = np.linspace(0, 5, 51)
for j in k_values:
    score_accs = []

    # compare each prediction against entire GT "database"
    for i, prediction in enumerate(preds.iloc):
        # initialize y_array
        y_true = np.zeros(len(trues))
        # set single true match
        y_true[i] = 1
        # repeat sample
        colnames = prediction.index.values
        prediction = prediction.values
        x_pred = [prediction for _ in range(len(trues))]
        x_pred = pd.DataFrame(x_pred, columns=colnames)

        # compare sample against all samples
        x_pred = featurize(trues, x_pred)
        x_pred = np.hstack(x_pred)

        score = similarity(x_pred, [j, 1, 1])
        imax = np.argmax(score)
        score_accs.append(y_true[imax] == 1)
    scores.append(np.mean(score_accs))

In [None]:
k=2.8 # best k factor

# step 2: see which threshold gets optimal accuracy
thresholds = np.linspace(0, k, k*10+1)
metrics = np.zeros(shape=(len(thresholds), 3))

for threshold in thresholds:
    prec = tf.metrics.Precision()
    rec = tf.metrics.Recall()
    acc = tf.metrics.Accuracy()
    for x_, y_ in zip(x, y):
        if similarity(x_, [k, 1, 1]) > threshold:
            prec.update_state(y_,[1])
            rec.update_state(y_,[1])
            acc.update_state(y_,[1])
        else:
            prec.update_state(y_,[0])
            rec.update_state(y_,[0])
            acc.update_state(y_,[0])
    index = int(threshold*10)
    metrics[index,0] = prec.result().numpy()
    metrics[index,1] = rec.result().numpy()
    metrics[index,2] = acc.result().numpy()

In [None]:
# step 3: establish impact of color and body, need to compare against many thresholds
thresholds = np.linspace(0,5,51)
res = np.zeros(shape=(len(thresholds),2,2))

for threshold in thresholds:
    for b_color in [0,1]:   
        for b_body in [0,1]:
            prec = tf.metrics.Precision()
            rec = tf.metrics.Recall()
            acc = tf.metrics.Accuracy() 
            for x_, y_ in zip(x, y):
                if similarity(x_, [k, b_color, b_body]) > threshold:
                    prec.update_state(y_,[1])
                    rec.update_state(y_,[1])
                    acc.update_state(y_,[1])
                else:
                    prec.update_state(y_,[0])
                    rec.update_state(y_,[0])
                    acc.update_state(y_,[0])
            res[int(threshold*10), b_color, b_body] = acc.result().numpy()

In [None]:
scores_df = pd.DataFrame(scores, columns=['acc'])

ax = sns.lineplot(data=scores_df)
ax.set(xlabel='k', ylabel='acc')
ax.set_xticklabels([0,0, 1, 2, 3, 4, 5])
plt.show()
i_max_score = np.argmax(scores_df["acc"])
print('Score acc maximized at:', i_max_score/10)
print(scores_df.iloc[i_max_score])

In [None]:
metrics_df = pd.DataFrame(metrics, columns=['precision', 'recall', 'accuracy'])
ax = sns.lineplot(data=metrics_df)
ax.set_xticklabels([0,0,1,2,3,4,5])
ax.set(xlabel="threshold")
plt.show()
i_max_acc = np.argmax(metrics_df["accuracy"])
print('Max acc idx:', i_max_acc/10)
print(metrics_df.iloc[i_max_acc])

In [None]:
# illustrate influence of the ancillary features
# (0,0) cell shows accuracy without body and color
for confusion in res:
    print(confusion[1,1], confusion[0,0])
res_df = pd.DataFrame(res)
sns.heatmap(data=res, annot=True)