# Распознование дорожных знаков

In [1]:
import pandas as pd
import sys
import math
import numpy as np

from threading import Thread, Lock
from PIL import Image

from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import StratifiedKFold

Зададим констатны

In [2]:
directions = 12
field_size = 6, 6
image_size = 32, 32
indentation = 4
eps = 1e-5
field_count = (image_size[0] - 2 * indentation) // field_size[0]

## Обработка тренировочных данных

Считывание csv файла

In [3]:
Train = pd.read_csv("../train/gt.csv")

Извлечение признаков из картинки

In [4]:
def extract_hog(img):
    
    img = img.resize(image_size, Image.ANTIALIAS)
    pix = img.convert('RGB').load()
    brightness = [[0.0 for j in range(image_size[1])] for i in range(image_size[0])]
    for i in range(image_size[0]):
        for j in range(image_size[1]):
            brightness[i][j] = (pix[j, i][0] + pix[j, i][1] + pix[j, i][2]) / 3
    
    hog = [[0 for j in range(directions)] for i in range(field_count ** 2)]
    for i in range(indentation, image_size[0] - indentation):
        for j in range(indentation, image_size[1] - indentation):
            gradient_x = brightness[i - 1][j + 1] + 2 * brightness[i][j + 1] + brightness[i + 1][j + 1] -\
                         brightness[i - 1][j - 1] - 2 * brightness[i][j - 1] - brightness[i + 1][j - 1]
            gradient_y = brightness[i + 1][j - 1] + 2 * brightness[i + 1][j] + brightness[i + 1][j + 1] -\
                         brightness[i - 1][j - 1] - 2 * brightness[i - 1][j] - brightness[i + 1][j + 1]
            gradient_norm = (gradient_x ** 2 + gradient_y ** 2) ** (1 / 2)
            gradient_angle = math.atan2(gradient_y, gradient_x)
            hog[((i - 4) // field_size[0]) * field_count + (j - 4) // field_size[0]]\
                [int(abs(gradient_angle + math.pi - eps) * (directions // 2) / math.pi)] += gradient_norm
    
    for field in hog:
        norm = 0
        for i in field:
            norm += i ** 2
        norm = norm ** (1 / 2)
        for i in range(len(field)):
            field[i] /= norm + eps
    
    return hog

Добавим дополнительные столбцы для признаков:

In [5]:
for i in range((field_count ** 2) * directions):
    Train[str(i)] = 0.0

Извлечём признаки для всех картинок из тренировочной выборки

In [6]:
def thread_func(l, k):
    for i in range(Train.shape[0] * l // k, Train.shape[0] * (l + 1) // k):
        mutex.acquire()
        img = Image.open("../train/" + Train['filename'][i])
        mutex.release()
        hog = extract_hog(img)
        mutex.acquire()
        for j in range(len(hog)):
            for r in range(len(hog[j])):
                Train.set_value(i, str(j * 8 + r), hog[j][r])
        mutex.release()

In [7]:
thread_count = 8
threads = []
mutex = Lock()
for i in range(thread_count):
    t = Thread(target=thread_func, args=(i, thread_count))
    threads.append(t)
for i in range(thread_count):
    threads[i].start()
for i in range(thread_count):
    threads[i].join()

Запишем данные файл для быстрого доступа к этим данным при следующих запусках

In [8]:
#Train.to_csv("train.csv")

## Обработка тестовых данных

Создаём пусто DataFrame

In [9]:
Test = pd.DataFrame(index=[i for i in range(12342)])

Функция для обработки имени файла

In [10]:
def sstr(i):
    res = str(i)
    while len(res) < 5:
        res = '0' + res
    return res

Добавим дополнительные столбцы для признаков:

In [11]:
for i in range((field_count ** 2) * directions):
    Test[str(i)] = 0.0

Извлечём признаки для всех картинок из тренировочной выборки

In [12]:
def thread_func_test(l, k):
    for i in range(Test.shape[0] * l // k, Test.shape[0] * (l + 1) // k):
        mutex.acquire()
        img = Image.open("../test/" + sstr(i) + ".png")
        mutex.release()
        hog = extract_hog(img)
        mutex.acquire()
        for j in range(len(hog)):
            for r in range(len(hog[j])):
                Test.set_value(i, str(j * 8 + r), hog[j][r])
        mutex.release()

In [13]:
thread_count = 8
threads = []
mutex = Lock()
for i in range(thread_count):
    t = Thread(target=thread_func_test, args=(i, thread_count))
    threads.append(t)
for i in range(thread_count):
    threads[i].start()
for i in range(thread_count):
    threads[i].join()

Запишем данные файл для быстрого доступа к этим данным при следующих запусках

In [14]:
#Test.to_csv("test.csv")

## Окончательная обработка данных

Считывание данных

In [15]:
# Train = pd.read_csv("train.csv")
# Test = pd.read_csv("test.csv")

Удаление лишних столбцов

In [16]:
del Train["filename"]
# del Train["Unnamed: 0"]
# del Test["Unnamed: 0"]

Запишем ответы

In [17]:
Train_y = pd.DataFrame(data=Train["class_id"], columns=["class_id"])
del Train["class_id"]

## Нахождение вектора ответов методом SVM

Простая кросс-валидация

In [18]:
learn = SVC()
cross_val_score(learn, Train.values, y=Train_y.values[:, 0], n_jobs=-1, cv=StratifiedKFold(n_splits=5, random_state=42)).mean()

0.057384784255010181

Обучение

In [125]:
learn = SVC()
learn.fit(Train.values, Train_y.values[:, 0])

SVC(C=1.0, cache_size=200, class_weight=None, coef0=0.0,
  decision_function_shape=None, degree=3, gamma='auto', kernel='rbf',
  max_iter=-1, probability=False, random_state=None, shrinking=True,
  tol=0.001, verbose=False)

Запишем результат в файл

In [126]:
score = pd.DataFrame(data=np.concatenate([[[sstr(i) + ".png"] for i in range(Test.shape[0])], 
                                         (learn.predict(Test.values)[np.newaxis]).T], axis=1),
                     columns=["filename", "class_id"])
score.to_csv("result.csv", index="False")

In [127]:
score.to_csv("result.csv", index=False)