In [1]:
%matplotlib inline
import sys
import time
import random

from multiprocessing import Queue
from concurrent.futures import ProcessPoolExecutor

import cv2
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import xgboost as xgb

from numpy.linalg import norm
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics.scorer import accuracy_scorer

from utils import get_cache, set_cache

path = 'LP_data/dataset/P{}/G{}/R{}_{}.png'



In [2]:
training_lpaths = []
training_rpaths = []
labels = []
for p in range(1, 5):
    for g in range(1, 11):
        for r in range(1, 26):
            fname = path.format(p, g, r, 'l')
            training_lpaths.append(fname)
            fname = path.format(p, g, r, 'r')
            training_rpaths.append(fname)
            labels.append(g)

total = len(labels)
print('Total images: {}'.format(total))


def preprocess_img(current, path):
    data = cv2.imread(path, 0)
    # data = cv2.fastNlMeansDenoising(data)
    return (current, data)



def get_imgs(training_paths, name, load_cache=True):
    imgs = get_cache(name)
    if not load_cache or imgs is None:
        imgs = [None for k in range(total)]
        tasks = []
        def callback(future):
            idx, data = future.result()
            imgs[idx] = data
        with ProcessPoolExecutor() as executor:
            for idx, p in enumerate(training_lpaths):
                future = executor.submit(preprocess_img, idx, p)
                future.add_done_callback(callback)
                tasks.append(future)
            while True:
                finished = sum(f.done() for f in tasks)
                sys.stdout.flush()
                sys.stdout.write('Finished {:>4d} / {:>4d}\r'.format(finished, total))
                if all(f.done() for f in tasks):
                    print()
                    break
                time.sleep(1)
        set_cache(imgs, name)
    return imgs


limgs = get_imgs(training_lpaths, 'limgs', False)
rimgs = get_imgs(training_rpaths, 'rimgs', False)

Total images: 1000
Finished 1000 / 1000
Finished 1000 / 1000


In [4]:
def hog(digits, cellx=20, celly=20):
    samples = []
    for current, img in enumerate(digits, 1):
        w, h = img.shape
        sys.stdout.flush()
        sys.stdout.write('Processing {:>4d} / {:>4d}\r'.format(current, total))
        gx = cv2.Sobel(img, cv2.CV_32F, 1, 0)
        gy = cv2.Sobel(img, cv2.CV_32F, 0, 1)
        mag, ang = cv2.cartToPolar(gx, gy)
        bin_n = 16
        bin = np.int32(bin_n*ang/(2*np.pi))
        bin_cells = []
        mag_cells = []
        cellxn = w // cellx
        cellyn = h // celly
        for x in range(cellxn+1):
            for y in range(cellyn+1):
                bin_cells.append(bin[x*cellx:(x+1)*cellx, y*celly:(y+1)*celly])
                mag_cells.append(mag[x*cellx:(x+1)*cellx, y*celly:(y+1)*celly])
        hists = [np.bincount(b.ravel(), m.ravel(), bin_n) for b, m in zip(bin_cells, mag_cells)]
        hist = np.hstack(hists)

        # transform to Hellinger kernel
        eps = 1e-7
        hist /= hist.sum() + eps
        hist = np.sqrt(hist)
        hist /= norm(hist) + eps

        samples.append(hist)
    print('\n', samples[0].shape)
    return np.float32(samples)


def hog_single(idx, img, cellx=20, celly=20):
    w, h = img.shape
    gx = cv2.Sobel(img, cv2.CV_32F, 1, 0)
    gy = cv2.Sobel(img, cv2.CV_32F, 0, 1)
    mag, ang = cv2.cartToPolar(gx, gy)
    bin_n = 16
    bin = np.int32(bin_n*ang/(2*np.pi))
    bin_cells = []
    mag_cells = []
    cellxn = w // cellx
    cellyn = h // celly
    for x in range(cellxn+1):
        for y in range(cellyn+1):
            bin_cells.append(bin[x*cellx:(x+1)*cellx, y*celly:(y+1)*celly])
            mag_cells.append(mag[x*cellx:(x+1)*cellx, y*celly:(y+1)*celly])
    hists = [np.bincount(b.ravel(), m.ravel(), bin_n) for b, m in zip(bin_cells, mag_cells)]
    hist = np.hstack(hists)
    # transform to Hellinger kernel
    eps = 1e-7
    hist /= hist.sum() + eps
    hist = np.sqrt(hist)
    hist /= norm(hist) + eps
    return (idx, np.float32(hist))

In [5]:
def get_hog_imgs(imgs, name, load_cache=True):
    hog_imgs = get_cache(name)
    if not load_cache or hog_limgs is None: 
        hog_imgs = [None for k in range(total)]
        tasks = []
        def hog_callback(future):
            idx, data = future.result()
            hog_imgs[idx] = data
        with ProcessPoolExecutor() as executor:
            for idx, img in enumerate(imgs):
                future = executor.submit(hog_single, idx, img)
                future.add_done_callback(hog_callback)
                tasks.append(future)
            while True:
                finished = sum(f.done() for f in tasks)
                sys.stdout.flush()
                sys.stdout.write('Finished {:>4d} / {:>4d}\r'.format(finished, total))
                if finished == total:
                    print()
                    break
                time.sleep(1)
        set_cache(hog_imgs, name)
    return hog_imgs


hog_limgs = get_hog_imgs(limgs, 'hog_limgs', False)
hog_rimgs = get_hog_imgs(rimgs, 'hog_rimgs', False)

Finished 1000 / 1000
Finished 1000 / 1000


In [9]:
res = []
for i in range(20):
    X_ltrain, X_ltest, X_rtrain, X_rtest, y_train, y_test = train_test_split(hog_limgs, hog_rimgs, labels, test_size=0.2)
    X_train = np.append(np.array(X_ltrain), np.array(X_rtrain), axis=1)
    X_test = np.append(np.array(X_ltest), np.array(X_rtest), axis=1)
    y_train = np.array(y_train)-1
    y_test = np.array(y_test)-1
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)
    watchlist = [(dtrain, 'train'), (dtest, 'test')]
    param = {'max_depth': 2, 'eta': 1, 'silent': 1, 'objective': 'multi:softmax', 'num_class': 10}
    num_round = 50
    bst = xgb.train(param, dtrain, num_round)
    preds = bst.predict(dtest)
    accuracy = np.sum(preds == y_test) / len(y_test)
    res.append(accuracy)
    print('Round {:2d} Accuracy: {:5.2%}'.format(i+1, accuracy))

Round  1 Accuracy: 70.00%


KeyboardInterrupt: 

In [None]:
res = np.array(res)
print()
print('Mean: {:>5.2%}'.format(res.mean()))
print('Max: {:>5.2%}'.format(res.max()))
print('Min: {:>5.2%}'.format(res.min()))