<a href="https://colab.research.google.com/github/ldeluigi/supermarket-2077-product-vision/blob/master/ProductDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Download datasets

In [None]:
!rm -rf sample_data
!gdown --id 1fDr4g4wbnSRkuCYyS3wpuJS7Ax22bVB_ -O all.zip
!unzip -oq all.zip

%matplotlib inline

In [None]:
#!pip install opencv-python==3.4.2.17
#!pip install opencv-contrib-python==3.4.2.17

## Imports

In [None]:
import scipy.io
import os
from pathlib import Path
import re
import cv2
import matplotlib.pyplot as plt
import numpy as np
import math
import itertools
import shutil
from tqdm.notebook import tqdm
from keras.preprocessing.image import ImageDataGenerator
import tensorflow as tf
from sklearn.metrics import confusion_matrix

## Data loaders

In [None]:
training_dirname = 'Training'

def create_class_label(class_index, class_name):
  return class_name

def read_classes():
  mat = scipy.io.loadmat(os.path.join(training_dirname, 'TrainingClassesIndex.mat'))
  raw_classes = list(map(lambda x: x[0], mat['classes'][0]))
  classes = map(lambda x: (x[0], create_class_label(*x)), enumerate(raw_classes, start=1))
  return dict(classes), dict(enumerate(raw_classes, start=1))

def read_training_data(classes):
  result = []
  for class_index, class_name in classes.items():
    dirname_images = os.path.join(training_dirname, class_name)
    directory_images = os.fsencode(dirname_images)
    for file in os.listdir(directory_images):
      img = cv2.imread(os.path.join(dirname_images, os.fsdecode(file)))
      img_rgb =  cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
      result.append((img_rgb, class_index))
  return np.rec.array(result, dtype=[('image', 'O'), ('class_index', 'i4')])

def read_store_data(storename):
  dirname_anno = os.path.join(storename, 'annotation')
  dirname_images = os.path.join(storename, 'images')
  directory_anno = os.fsencode(dirname_anno)
  directory_images = os.fsencode(dirname_images)

  result = []

  for file in os.listdir(directory_anno):
    filename = os.fsdecode(file)
    if filename.endswith(".mat"): 
      mat = scipy.io.loadmat(os.path.join(dirname_anno, filename))
      number = re.search(r'^anno.(\d+).mat$', filename).group(1)
      img = cv2.imread(os.path.join(dirname_images, number + '.jpg'))

      img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
      img_annotation = mat['annotation'][0, 0]
      bboxes = map(lambda x: x[0], img_annotation[0][0])
      labels = map(lambda x: str(x[0][0][0]), img_annotation[1][0])
      class_indexes = img_annotation[2][0]
      result.append((img_rgb, list(zip(bboxes, labels, class_indexes))))
  return np.rec.array(result, dtype=[('image', 'O'), ('items', 'O')])

## Data visualization utilities

In [None]:
def show_image(img):
  plt.axis('off')
  plt.imshow(img)

def show_grayscale_image(img):
  show_image(cv2.merge([img, img, img]))

def plot_grid(images, columns, show_axis=False, labels=None):
  if len(images) == 0 or columns <= 0:
    return
  height = 1 + math.ceil(len(images) / columns) * 2
  width = columns * 4
  dpi = max(images[0].shape[0], images[0].shape[1]) // 2
  fig = plt.figure(figsize=(width, height), dpi=dpi)
  fig.subplots_adjust(hspace=0.4)
  for index, img in enumerate(images, start=1):
    if 'float' in img.dtype.str:
      img = (img * 255).astype('uint8')
    sp = fig.add_subplot(math.ceil(len(images) / columns), columns, index)
    if not show_axis:
      plt.axis('off')
    plt.imshow(img)
    if labels is not None:
      l = len(labels)
      sp.set_title(labels[(index-1) % l], fontsize=10)
    else:
      sp.set_title(index, fontsize=10)

def dataset_plot_grid(indexes, columns, dataset, draw_item):
  fig = plt.figure(figsize=(12, 6), dpi=120)
  # fig.subplots_adjust(hspace=0.2)
  for index, i_img in enumerate(indexes, start=1):
    sp = fig.add_subplot(math.ceil(len(indexes) / columns), columns, index)
    row = dataset[i_img]
    draw_item(row, sp)

# Image search

## Prepare products class dictionary

In [None]:
classes, raw_classes = read_classes()

def class_name(class_index):
  return classes[class_index] if class_index >= 0 else None

## Load training raw images

In [None]:
products = read_training_data(raw_classes)

## Products visualization

In [None]:
def show_products_with_class(indexes, columns, dataset):
  def show_single_product_with_class(row, sp):
    plt.axis('off')
    plt.imshow(row.image)
    sp.set_title(class_name(row.class_index), fontsize=10)
  dataset_plot_grid(indexes, columns, dataset, show_single_product_with_class)

show_products_with_class(np.random.randint(0, len(products), 6), 3, products)

## Image preprocessing

### Background removal

In [None]:
# code taken from https://www.kaggle.com/vadbeg/opencv-background-removal and modified

def remove_background(img, threshold):
    gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    _, threshed = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY_INV)

    kernel_size = round(max(img.shape[0], img.shape[1]) * 0.02)
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
    morphed = cv2.morphologyEx(threshed, cv2.MORPH_CLOSE, kernel)

    cnts = cv2.findContours(morphed, 
                            cv2.RETR_EXTERNAL,
                            cv2.CHAIN_APPROX_SIMPLE)[0] # should be [1] for cv2 version <= 4

    cnts = sorted(cnts, key=cv2.contourArea)

    mask = cv2.drawContours(threshed, [cnts[-1]], 0, [255], cv2.FILLED)
    masked_data = cv2.bitwise_and(img, img, mask=mask)

    x, y, w, h = cv2.boundingRect(cnts[-1])
    dst = masked_data[y: y + h, x: x + w]

    alpha = mask[y: y + h, x: x + w]
    r, g, b = cv2.split(dst)

    rgba = [r, g, b, alpha]
    dst = cv2.merge(rgba, 4)
    return dst

n = np.random.randint(products.shape[0])
print(f'Index: {n}')
print(f'Class: {class_name(products[n].class_index)}')
plot_grid([products[n].image, remove_background(products[n].image, 250)], 2, show_axis=True)

### Image resize

In [None]:
def resize_image(img, size, color=[0,0,0,0]):
  target_w, target_h = size
  original_h, original_w, _ = img.shape
  target_ar = target_w / target_h
  original_ar = original_w / original_h

  scale_factor = target_h / original_h if target_ar > original_ar else target_w / original_w
  scaled_w = round(original_w * scale_factor)
  scaled_h = round(original_h * scale_factor)
  scaled_size = (scaled_w, scaled_h)
  resized = cv2.resize(img, scaled_size)

  delta_h = target_h - scaled_h
  delta_w = target_w - scaled_w
  top    = delta_h // 2
  left   = delta_w // 2
  bottom = delta_h - top
  right  = delta_w - left

  return cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)

n = np.random.randint(products.shape[0])
image = products[n].image
image = remove_background(image, 250)
plot_grid([image, resize_image(image, (400, 400))], 2, show_axis=True)

## Dataset preparation



### Image cleaning

In [None]:
size = (224, 224)

def clean_image(img):
  threshold = 250
  img = remove_background(img, threshold)
  return resize_image(img, size)

### Export new dataset on disk

Optionally reduce it to save memory.

In [None]:
number_of_images_to_save = 1000
reduce_dataset = False

In [None]:
train_data_directory = 'Temp'
shutil.rmtree(train_data_directory, ignore_errors=True)

index_to_class_map = dict()
def index_to_class(index):
  # We could search in products for O(n)
  # Instead we use a map for O(1)
  return index_to_class_map[index]

if reduce_dataset:
  products_size = number_of_images_to_save
  products_to_dump = np.random.choice(products.shape[0], number_of_images_to_save, replace = False)
else:
  products_size = products.shape[0]
  products_to_dump = np.arange(products_size)

for index, product_index in tqdm(enumerate(products_to_dump), total=products_size, desc='Writing files...'):
  image, class_index = products[product_index]
  index_str = f'{index:04d}'
  output_dir = os.path.join(train_data_directory, index_str)
  Path(output_dir).mkdir(parents=True, exist_ok=True)
  out = cv2.cvtColor(clean_image(image), cv2.COLOR_RGBA2BGRA)
  cv2.imwrite(os.path.join(output_dir, f'{index_str}.png'), out)
  index_to_class_map[index] = class_index

## Data Augmentation

### 3D rotation

In [None]:
def image_3D_rotation(img, theta = 0, phi = 0, gamma = 0, dx = 0, dy = 0, dz = 0):
  """
  Parameters:
      img       : the image data as numpy array
      theta     : rotation around the x axis
      phi       : rotation around the y axis
      gamma     : rotation around the z axis (basically a 2D rotation)
      dx        : translation along the x axis
      dy        : translation along the y axis
      dz        : translation along the z axis (distance to the image)
  Output:
      image     : the rotated image
  
  Reference:
      1.        : http://stackoverflow.com/questions/17087446/how-to-calculate-perspective-transform-for-opencv-from-rotation-angles
      2.        : http://jepsonsblog.blogspot.tw/2012/11/rotation-in-3d-using-opencvs.html
      3.        : Code taken from https://github.com/eborboihuc/rotate_3d/blob/master/image_transformer.py
  """
  def deg_to_rad(deg):
    return deg * math.pi / 180.0
  def get_M(theta, phi, gamma, dx, dy, dz, size, focal):
    w = size[0]
    h = size[1]
    f = focal
    # Projection 2D -> 3D matrix
    A1 = np.array([ [1, 0, -w/2],
                    [0, 1, -h/2],
                    [0, 0, 1],
                    [0, 0, 1]])
    # Rotation matrices around the X, Y, and Z axis
    RX = np.array([ [1, 0, 0, 0],
                    [0, np.cos(theta), -np.sin(theta), 0],
                    [0, np.sin(theta), np.cos(theta), 0],
                    [0, 0, 0, 1]])
    RY = np.array([ [np.cos(phi), 0, -np.sin(phi), 0],
                    [0, 1, 0, 0],
                    [np.sin(phi), 0, np.cos(phi), 0],
                    [0, 0, 0, 1]])
    RZ = np.array([ [np.cos(gamma), -np.sin(gamma), 0, 0],
                    [np.sin(gamma), np.cos(gamma), 0, 0],
                    [0, 0, 1, 0],
                    [0, 0, 0, 1]])
    # Composed rotation matrix with (RX, RY, RZ)
    R = np.dot(np.dot(RX, RY), RZ)
    # Translation matrix
    T = np.array([  [1, 0, 0, dx],
                    [0, 1, 0, dy],
                    [0, 0, 1, dz],
                    [0, 0, 0, 1]])
    # Projection 3D -> 2D matrix
    A2 = np.array([ [f, 0, w/2, 0],
                    [0, f, h/2, 0],
                    [0, 0, 1, 0]])
    # Final transformation matrix
    return np.dot(A2, np.dot(T, np.dot(R, A1)))
  height = img.shape[0]
  width = img.shape[1]
  num_channels = img.shape[2]
  rtheta = deg_to_rad(theta)
  rphi = deg_to_rad(phi)
  rgamma = deg_to_rad(gamma)
  d = np.sqrt(height**2 + width**2)
  focal = d / (2 * np.sin(rgamma) if np.sin(rgamma) != 0 else 1)
  dz = focal
  mat = get_M(rtheta, rphi, rgamma, dx, dy, dz, (width, height), focal)
  return cv2.warpPerspective(img.copy(), mat, (width, height))

def random_spatial_rotation(theta_range, phi_range, gamma_range):
  return lambda img: image_3D_rotation(
    img, 
    theta = np.random.randint(theta_range[0], theta_range[1] + 1),
    phi = np.random.randint(phi_range[0], phi_range[1] + 1),
    gamma = np.random.randint(gamma_range[0], gamma_range[1] + 1)
  )

### Data generator parameters definition

In [None]:
real_datagen = ImageDataGenerator(
    data_format = 'channels_last',
)

augmented_datagen = ImageDataGenerator(
    brightness_range = [0.5, 1.2],
    width_shift_range = size[0] // 10,
    height_shift_range = size[1] // 10,
    zoom_range = 0.1,
    fill_mode = 'constant',
    cval = 0,
    data_format = 'channels_last',
    preprocessing_function = random_spatial_rotation(
      theta_range = (-20, 20),
      phi_range = (-30, 30),
      gamma_range = (-10, 10)
    )
)

def create_flow(single_pass = False):
  rd = real_datagen.flow_from_directory(
    directory = train_data_directory,
    target_size = size,
    color_mode = 'rgba',
    class_mode = 'sparse',
    batch_size = 1,
    shuffle = True
  )
  if single_pass:
    return itertools.islice(rd, rd.samples)
  return rd

def flow_to_augmented_tuple(t):
  rescale = 1./255
  original = t[0][0] * rescale
  transformed = augmented_datagen.random_transform(augmented_datagen.preprocessing_function(t[0][0])) * rescale
  label = t[1][0]
  return original, transformed, label

def flow_to_tuple(t):
  rescale = 1./255
  original = t[0][0] * rescale
  label = t[1][0]
  return original, label

def mismatched_images():
  flow_1 = create_flow()
  flow_2 = create_flow()
  while True:
    t_1 = next(flow_1)
    t_2 = next(flow_2)
    if t_1[1][0] != t_2[1][0]:
      image, _ = flow_to_tuple(t_1)
      _, transformed, _ = flow_to_augmented_tuple(t_2)
      yield image, transformed
    else:
      next(flow_1) # used for misaligning iterators

def matched_images(single_pass = False):
  flow = create_flow(single_pass)
  for original, transformed, _ in map(flow_to_augmented_tuple, flow):
    yield original, transformed

def single_images(single_pass = False):
  flow = create_flow(single_pass)
  return map(flow_to_tuple, flow)

def single_altered_images(single_pass = False):
  flow = create_flow(single_pass)
  def tuple_f(t):
    _, transformed, label = flow_to_augmented_tuple(t)
    return transformed, label
  return map(tuple_f, flow)

res = []
it = matched_images()
for _ in range(10):
  t = next(it)
  res.append(t[0])
  res.append(t[1])

plot_grid(res, 2, labels=["Original", "Same, altered"])

res = []
it = mismatched_images()
for _ in range(10):
  t = next(it)
  res.append(t[0])
  res.append(t[1])

plot_grid(res, 2, labels=["Original", "Other, altered"])
del res, it

In [None]:
def probability_merge(it_1, it_2, p = 0.5):
  while True:
    rand = np.random.random()
    it, label = (it_1, 1) if rand < p else (it_2, 0)
    original, transformed = next(it)
    yield original, transformed, label

it = probability_merge(mismatched_images(), matched_images(), p = 0.5)
res = []
for _ in range(20):
  t = next(it)
  res.append(t[0])
  res.append(t[1])

plot_grid(res, 2, labels=["First", "Second"])
del res, it

## Model definition

### Performance Evaluation

In [None]:
def accuracy(extract_features, compute_feature_distance, threshold, n = 100):
  actual = []
  predicted = []
  false_positives = []
  false_negatives = []
  it = probability_merge(mismatched_images(), matched_images(), p = 0.5)
  for _ in tqdm(range(n), total=n, desc='Calculating accuracy...'):
    original, transformed, label = next(it)
    original_features = extract_features(original)
    transformed_features = extract_features(transformed)
    distance = compute_feature_distance(original_features, transformed_features)
    prediction = 1 if distance > threshold else 0
    actual.append(label)
    predicted.append(prediction)
    if label != prediction:
      (false_positives if prediction == 0 else false_negatives).append((original, transformed))
  confusion = confusion_matrix(actual, predicted)
  print(confusion)
  to_be_plotted = []
  for original, transformed in false_positives:
    to_be_plotted.append(original)
    to_be_plotted.append(transformed)
  plot_grid(to_be_plotted, 2, labels=['Original', 'False positive'])

  to_be_plotted = []
  for original, transformed in false_negatives:
    to_be_plotted.append(original)
    to_be_plotted.append(transformed)
  plot_grid(to_be_plotted, 2, labels=['Original', 'False negative'])

In [None]:
feature_cache = None
feature_cache_name = None
def cache_features(extract_features):
  global feature_cache, feature_cache_name
  if extract_features.__name__ != feature_cache_name:
    feature_cache_name = None
  if feature_cache is None or feature_cache_name is None:
    feature_cache_name = extract_features.__name__
    feature_cache = []
    for image, index in tqdm(single_images(single_pass = True), desc = 'Creating feature cache...'):
      features = extract_features(image)
      feature_cache.append((features, index))
  return feature_cache

In [None]:
def product_classification(img, extract_features, compute_feature_distance):
  img_features = extract_features(img)
  feature_db = cache_features(extract_features)
  distances_iter = map(lambda fc: compute_feature_distance(fc[0], img_features), feature_db)
  index, best_distance = min(enumerate(distances_iter), key=lambda x:x[1])
  best_match_label = feature_db[index][1]
  best_match_index = int(best_match_label)
  class_index = index_to_class(best_match_index)
  return best_match_index, class_index, class_name(class_index), best_distance

def product_classifications(imgs, extract_features, compute_feature_distance):
  return list(map(lambda im: product_classification(im, extract_features, compute_feature_distance), imgs))

### Method 1


#### Feature extractor

In [None]:
def rmse(predictions, targets):
  return np.sqrt(((predictions - targets) ** 2).mean())

def create_feature_extractor():
  alpha = 1.0
  weights = 'imagenet'
  pooling = 'max'

  model = tf.keras.applications.MobileNetV2(
    input_shape = (*size, 3),
    alpha = alpha,
    include_top = False,
    weights = weights,
    pooling = pooling
  )

  return model

feature_extractor = create_feature_extractor()

def extract_features(img):
  return feature_extractor.predict(np.asarray([img[:,:,:3]]), batch_size=1)

def stats(it, n = 1):
  rmses = []
  for _ in range(n):
    original, transformed = next(it)
    original_f, transformed_f = extract_features(original), extract_features(transformed)
    rmses.append(rmse(original_f, transformed_f))
  
  #rmses = np.asarray(rmses)
  print('\tAvg:', np.mean(rmses))
  print('\tMedian:', np.median(rmses))
  print('\tMax:', np.max(rmses))
  print('\tMin:', np.min(rmses))
  print('\tStdev:', np.std(rmses))

print("Summary of matched images feature comparison:")
stats(matched_images(), 100)
print("Summary of mismatched images feature comparison:")
stats(mismatched_images(), 100)

#tf.keras.utils.plot_model(feature_extractor, show_shapes=True, show_layer_names=True)

#### Feature classification

In [None]:
rmse_threshold = 2.311 # TODO find it procedurally

#### Evaluate performance

In [None]:
accuracy(extract_features, rmse, rmse_threshold, n = 400)

In [None]:
def column(M, c):
  return [row[c] for row in M]
test_size = 400
test_images = list(itertools.islice(single_altered_images(), test_size))
predictions = product_classifications(column(test_images, 0), extract_features, rmse)
cm = confusion_matrix(column(test_images, 1), column(predictions, 0))
print("Accuracy: ", np.trace(cm) / test_size)
plot_grid(column(test_images[:20], 0), 5, labels=column(predictions[:20], 2))
del test_images, predictions, cm

### Method 2

#### Feature extractor

In [None]:
def compare_features(predictions, targets):
  k_1, d_1 = predictions
  k_2, d_2 = targets
  bf = cv2.BFMatcher(cv2.NORM_L1, crossCheck=True)
  epsilon = 1e-19
  if d_1 is None or d_2 is None:
    return 1 / epsilon
  matches = bf.match(d_1, d_2)
  matches = sorted(matches, key = lambda x:x.distance)
  top_matches = matches[:50]
  return 1 / sum(map(lambda m: 1 / (m.distance ** 2 + epsilon), top_matches))


def create_feature_extractor():
  orb = cv2.ORB_create(edgeThreshold=31) # https://docs.opencv.org/master/d1/d89/tutorial_py_orb.html
  return orb

feature_extractor = create_feature_extractor()

def extract_features(img):
  img = cv2.GaussianBlur(img, (5, 5), 0)
  image8bit = cv2.cvtColor(img * 255, cv2.COLOR_RGB2GRAY).astype('uint8')
  keypoints, descriptors = feature_extractor.detectAndCompute(image8bit, None) # None is the mask
  if descriptors is None:
    print('WARNING: no keypoints found for an image! Try lowering the threshold')
  return keypoints, descriptors

def stats(it, n = 1):
  results = []
  for _ in range(n):
    original, transformed = next(it)
    original_f, transformed_f = extract_features(original), extract_features(transformed)
    results.append(compare_features(original_f, transformed_f))
  
  #results = np.asarray(results)
  print('\tAvg:', np.mean(results))
  print('\tMedian:', np.median(results))
  print('\tMax:', np.max(results))
  print('\tMin:', np.min(results))
  print('\tStdev:', np.std(results))


match_iter = matched_images()
mismatch_iter = mismatched_images()

def show_image_comparison(first_img, second_img):
  first_img_gray = cv2.cvtColor(first_img * 255, cv2.COLOR_RGB2GRAY).astype('uint8')
  second_img_gray = cv2.cvtColor(second_img * 255, cv2.COLOR_RGB2GRAY).astype('uint8')
  first_k, first_d = extract_features(first_img)
  second_k, second_d = extract_features(second_img)
  first_sift_image = cv2.drawKeypoints(first_img_gray, first_k, first_img)
  second_sift_image = cv2.drawKeypoints(second_img_gray, second_k, second_img)
  bf = cv2.BFMatcher(cv2.NORM_L1, crossCheck=True)
  matches = bf.match(first_d, second_d)
  matches = sorted(matches, key = lambda x:x.distance)
  matched_img = cv2.drawMatches(first_img_gray, first_k, second_img_gray, second_k, matches[:70], second_img_gray, flags=2)
  distance = compare_features((first_k, first_d), (second_k, second_d))
  plot_grid([first_sift_image, second_sift_image, matched_img], 2, labels=['Original ORB', 'Altered ORB', f'Distance: {distance:.3f}'])

show_image_comparison(*next(match_iter))
show_image_comparison(*next(mismatch_iter))


print("Summary of matched images feature comparison:")
stats(match_iter, 100)
print("Summary of mismatched images feature comparison:")
stats(mismatch_iter, 100)

#### Feature classification

In [None]:
distance_threshold = 14000 # TODO find it procedurally

#### Evaluate performance

In [None]:
accuracy(extract_features, compare_features, distance_threshold, n = 400)

In [None]:
def column(M, c):
  return [row[c] for row in M]
test_size = 1
test_images = list(itertools.islice(single_altered_images(), test_size))
predictions = product_classifications(column(test_images, 0), extract_features, compare_features)
cm = confusion_matrix(column(test_images, 1), column(predictions, 0))
print("Accuracy: ", np.trace(cm) / test_size)
plot_grid(column(test_images[:20], 0), 5, labels=column(predictions[:20], 2))
del test_images, predictions, cm

## Testing model on stores

In [None]:
store = read_store_data('store2')

In [None]:
img, items = store[1]

image_portion = img
resized_portion = resize_image(image_portion, size)
show_image(img)
res = product_classification(resized_portion, extract_features, compute_feature_distance = rmse)
print(res)

In [None]:
def sliding_window(img, size, stride=(0.5, 0.5)):
  scale_values=[0.5, 0.3]
  height, width, _ = img.shape
  aspect_ratio = size[1] / size[0]
  for scale in scale_values:
    scaled_width = int(width * scale)
    scaled_height = int(height * scale)
    scaled_img = resize_image(img, (scaled_width, scaled_height))
    y_min = 0
    while y_min + size[1] < scaled_height:
      x_min = 0
      while x_min + size[0] < scaled_width:
        yield scaled_img[y_min : y_min + size[0] - 1, x_min : x_min + size[1] - 1]
        x_min += int(size[0] * stride[0])
      y_min += int(size[1] * stride[1])

res = list(sliding_window(img, size))

plot_grid(res, 9)