In [None]:
# 1. Prepare dataset

# Dataset CIFAR-10
# downloaded from: https://www.cs.toronto.edu/~kriz/cifar.html
# introduced in: Learning Multiple Layers of Features from Tiny Images, Alex Krizhevsky, 2009.

import pickle
import os
import tarfile
import urllib.request

DATA_DIR = 'dataset/'
EXTRACTED_DATA_DIR = 'cifar-10-batches-py/'

def download_extract_if_necessary(dest_dir, data_url, expected_tarfile, expected_extracted_file):
    """
    Download (if necessary) and extract (if necessary) a file
    in tar.gz format (if necessary) to a given directory.
    
    Both arguments 'expected_*' help to avoid unnecessary download or extraction.
    """
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)
    
    dest_filename = os.path.join(dest_dir, expected_tarfile)
    if not os.path.exists(dest_filename):
        print('Downloading data from %s...' % data_url)
        dest_filename, _ = urllib.request.urlretrieve(data_url, dest_filename)
        print('Download finished')
    
    if not os.path.exists(os.path.join(dest_dir, expected_extracted_file)):
        print('Extracting archive...')
        with tarfile.open(dest_filename, "r:gz") as tar:
            tar.extractall(dest_dir)
    
    print('Extracted file(s) ready in directory: %s' % dest_dir)
    
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict


download_extract_if_necessary(
    dest_dir=DATA_DIR,
    data_url='https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz',
    expected_tarfile='cifar-10-python.tar.gz',
    expected_extracted_file=EXTRACTED_DATA_DIR
)

batch_train_dicts = [
    unpickle(os.path.join(DATA_DIR, EXTRACTED_DATA_DIR, ('data_batch_%d' % i)))
    for i in range(1, 6)
]
batch_test_dict = unpickle(os.path.join(DATA_DIR, EXTRACTED_DATA_DIR, 'test_batch'))

In [None]:
# Join training set batches together, for simplified processing

import numpy as np
np.random.seed(42)

BATCH_SIZE = 10000
BATCH_CNT = 5
TRAIN_HEIGHT = 50000  # = BATCH_SIZE * BATCH_CNT = sum([batch_dicts[0][b'data'].shape[i] for i in range(5)])
TRAIN_WIDTH = batch_train_dicts[0][b'data'].shape[1]

# join batches together
X_train = np.empty((TRAIN_HEIGHT, TRAIN_WIDTH), dtype='uint8')
y_train = np.empty(TRAIN_HEIGHT, dtype='uint8')
for i in range(0, BATCH_CNT):
    X_train[(i * BATCH_SIZE):((i + 1) * BATCH_SIZE)] = batch_train_dicts[i][b'data']
    y_train[(i * BATCH_SIZE):((i + 1) * BATCH_SIZE)] = batch_train_dicts[i][b'labels']

X_test = batch_test_dict[b'data']
y_test = np.asarray(batch_test_dict[b'labels'], dtype='uint8')

print('Full training set size: %d' % len(X_train))
print('Full test set size: %d' % len(X_test))

In [None]:
# Convert images to RGB format

def cifar_to_rgb_dataset(imgs):
    """
    Change format from CIFAR-like to matplotlib-like of all given images 
    
    :param imgs_cifar: an array of images represented by list of 3072 consecutive pixel values:
        first all red, then green, then blue; row-wise
    :return: an array of shape (..., 32, 32, 3), with values of type 'float32'
    """
    img_3d = np.reshape(imgs, (-1, 3, 32, 32))
    img_rgb = np.transpose(img_3d, (0, 2, 3, 1))
    # scale values to [0, 1] interval:
    return np.asarray(img_rgb, dtype='float32') / 255.

In [None]:
# 2. Plot images

import matplotlib.pyplot as plt
%matplotlib inline

CLASS_CNT = 10  # = np.unique(test_labels)
CLASS_SAMPLE_SIZE = 10  # class images sample size

fig = plt.figure()
plt.subplots_adjust(wspace=0.1, hspace=0.1)

for cls in range(CLASS_CNT):
    # select class images
    X_class = X_train[y_train == cls]
    # choose 10 random images and convert to RGB format
    rnd_indices = np.random.choice(len(X_class), CLASS_SAMPLE_SIZE, replace=False)
    X_cls = cifar_to_rgb_dataset(X_class[rnd_indices])
    # plot them
    for x, img in enumerate(X_cls):
        fig.add_subplot(CLASS_CNT, CLASS_SAMPLE_SIZE, cls * CLASS_SAMPLE_SIZE + x + 1)
        plt.imshow(img)
        plt.axis('off')

plt.show()

In [None]:
# Extract HOG features

from skimage.feature import hog
from skimage import color

def rgb_to_hog(img):
    img_gray = color.rgb2gray(img)
    return hog(img_gray, block_norm='L2-Hys', visualise=False)

def rgb_to_hog_dataset(imgs):
    """ Calculate HOG for all images in dataset """
    result = list()
    for img in imgs:
        result.append(rgb_to_hog(img))
    return np.asarray(result, dtype='float32')

# Extract HOG features, starting from CIFAR format
def cifar_to_hog(imgs):
    return rgb_to_hog_dataset(cifar_to_rgb_dataset(imgs))

In [None]:
# 3. Shallow classifier

from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC

from shutil import rmtree
from tempfile import mkdtemp
from time import time

cachedir = mkdtemp()
hogs = FunctionTransformer(cifar_to_hog)
pipe = Pipeline([('hog', hogs), ('norm', StandardScaler()), ('svc', SVC())], memory=cachedir)

Cs = np.logspace(0, 1, 2)
grid_params = [{'svc__kernel': ['rbf'], 'svc__C': [1., 10.]},
               {'svc__kernel': ['linear'], 'svc__C': [1.]}]

clf = GridSearchCV(pipe, grid_params, cv=3, n_jobs=-1)

In [None]:
# Fit estimator using a smaller subset of images

# TODO: set appropriate value / use full dataset:
SUBSET_SIZE = 100

def choose_random_subset(X, y, subset_size):
    indices = np.random.permutation(len(X))[:subset_size]
    return X[indices], y[indices]

X, y = choose_random_subset(X_train, y_train, SUBSET_SIZE)

print('Reduced training set size: %d' % len(X))
print()
print('Fitting ...')
start = time()
clf.fit(X, y)
end = time()

print('Fitting done.')
print('Time elapsed: %0.03fs' % (end - start,))
rmtree(cachedir)

In [None]:
# Grid search summary

print('Best parameters set found on training set:')
print(clf.best_params_)
print()
print('Grid scores on training set:')
means = clf.cv_results_['mean_test_score']
stds = clf.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, clf.cv_results_['params']):
    print("%0.3f (+/-%0.03f) for %r"
          % (mean, std * 2, params))
print()
print('Score on test set:')
print(clf.score(X_test, y_test))

In [None]:
# 4. Visual features

# Inception v3 model, trained on ImageNet data
# source: http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz

import tensorflow as tf

MODEL_DIR = 'inception/'
MODEL_FILE = 'classify_image_graph_def.pb'

# Download inception model
download_extract_if_necessary(
    dest_dir=MODEL_DIR,
    data_url='http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz',
    expected_tarfile='inception-2015-12-05.tgz',
    expected_extracted_file=MODEL_FILE
)

# Load the model graph (with pretrained weights)
def create_graph():
  """ Creates a graph from saved GraphDef file. """
  with tf.gfile.FastGFile(os.path.join(MODEL_DIR, MODEL_FILE), 'rb') as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name='')

create_graph()
sess = tf.Session()
bottleneck = sess.graph.get_tensor_by_name('pool_3:0')
input_tensor_name = 'DecodeJpeg:0'
resized_image_tensor_name = 'ResizeBilinear:0'

In [None]:
# Methods for visual features extraction from CIFAR10 images with Inception model

def extract_cnn_codes(rgb_img):
    """
    :param rgb_img: an array (?, ?, 3) with pixels in range [0,255] (*NOT* [0,1]).
    :return: an array (2048,) of calculated CNN codes
    """
    cnn_codes = sess.run(bottleneck, {input_tensor_name: rgb_img})
    return np.squeeze(cnn_codes)

def cifar_to_cnn_codes_dataset(imgs):
    """
    :param imgs: an array (?, 3072) of type 'float32' with pixels in range [0,1]
    :return: an array (?, 2048) of calculated CNN codes
    """
    rgb_imgs = cifar_to_rgb_dataset(imgs) * 255  # rescale to values in range [0,255]
    result = list()
    for img in rgb_imgs:
        result.append(extract_cnn_codes(img))
    return np.asarray(result, dtype='float32')

def load_or_compute_cnn_codes(X, codes_file):
    """
    Computes CNN codes for dataset X and saves them to file 'codes_file'.
    If file already exists, use it.
    
    :param X: an array of images in CIFAR format
    :param codes_file: filepath (String)
    :return: an array (?, 2048) of calculated CNN codes
    """
    codes_loaded = False
    if os.path.exists(codes_file):
        # Load codes from file
        try:
            X_codes = np.load(codes_file)
            # check if dataset matches codes length
            # (weak condition of data integrity, but it's enough here)
            if len(X) == len(X_codes):
                codes_loaded = True
                print('CNN codes loaded successfully from file %s' % codes_file)
            else:
                print('Invalid codes present in file, replacing with new ones...')
            
        except (IOError, ValueError):
            print('Error during codes loading')
    
    if not codes_loaded:
        # Compute codes
        print('Start computing CNN codes...')
        start = time()
        X_codes = cifar_to_cnn_codes_dataset(X)
        end = time()

        print('Computing done.')
        print('  Time elapsed: %0.02fs.' % (end - start,))
        print('  Average time: %0.02fs/image' % ((end - start) / len(X)))
        
        # Save codes to file
        np.save(codes_file, X_codes)
        print('Codes saved succesfully to file %s' % codes_file)
        print()
        
    return X_codes

In [None]:
# Compute CNN codes in batches (to save checkpoints)

from math import ceil

NUM_FEATURES = 2048  # length of bottleneck layer

def compute_codes_in_batches(X, b_size, filename_pattern):
    """
    Compute CNN codes for images X, in batches of size 'b_size'.
    The purpose of this method is checkpointing each batch on disk.
    Subsequent batches will be saved to files according to 'filename_pattern',
    which is a string with one integer to fill (batch_index).
    """

    X_codes = np.empty((len(X), NUM_FEATURES), dtype='float32')

    for i in range(ceil(len(X) / float(b_size))):
        X_codes[i * b_size: (i + 1) * b_size] = load_or_compute_cnn_codes(
                                                    X[i * b_size: (i + 1) * b_size],
                                                    filename_pattern % i
                                                )
    print('All codes computed!')
    return X_codes

    
if not os.path.exists('codes/'):
    os.makedirs('codes/')

X_codes = compute_codes_in_batches(X_train, 1000, 'codes/codes_train_%d.npy')
X_codes_test = compute_codes_in_batches(X_test, 100, 'codes/codes_test_%d.npy')

In [None]:
# Plot CNN codes in 2 dimensions

from sklearn.decomposition import PCA, KernelPCA
from sklearn.manifold import TSNE
from sklearn.pipeline import make_pipeline

# Normalize data first

normalizer = StandardScaler()
X_codes_norm = normalizer.fit_transform(X_codes)
X_codes_norm_test = normalizer.transform(X_codes_test)

# Reduce dimensionality using first PCA, then t-SNE

# TODO: plot more than 1000 examples
X_vis, y_vis = choose_random_subset(X_codes_norm, y_train, 100)

reduction = make_pipeline(PCA(10), TSNE(2))
%time X_vis_2d = reduction.fit_transform(X_vis)

# Plot
y_normalized = y_vis.astype('float32') / 9.  # scale to range [0,1]
colors = plt.cm.rainbow(y_normalized)
plt.scatter(X_vis_2d[:, 0], X_vis_2d[:, 1], c=colors, alpha=0.2)

In [None]:
# Save and load models from disk

from sklearn.externals import joblib

def save_model(filename, model):
    joblib.dump(filename, model)
    
def load_model(filename):
    return joblib.load(filename)

# TODO: train_and_test

In [None]:
# 7. Train SVM model on top of CNN codes

from sklearn.svm import LinearSVC, SVC

grid_params = [{'svc__kernel': ['rbf'], 'svc__C': [1., 10.]},
               {'svc__kernel': ['linear'], 'svc__C': [1.]}]

clf = GridSearchCV(SVC(), grid_params, cv=3, n_jobs=-1)

X, y = choose_random_subset(X_codes_norm, y_train, 5000)
%time clf.fit(X, y)
# TODO: print GridSearch summary
save_model('svm_cv_7.pkl', clf)

# Grid search summary

print('Best parameters set found on training set:')
print(clf.best_params_)
print()
print('Grid scores on training set:')
means = clf.cv_results_['mean_test_score']
stds = clf.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, clf.cv_results_['params']):
    print("%0.3f (+/-%0.03f) for %r"
          % (mean, std * 2, params))
print()
print('Score on test set:')
print(clf.score(X_test, y_test))