# Color Coherence Vectors for Image Classification
*Implemented by [Jing Yu Koh](http://kohjingyu.com).*

This notebook implements the method of using color coherence vectors for image classification. For more information, refer to the [CCV paper](https://www.cs.cornell.edu/~rdz/Papers/PZM-MM96.pdf).

In [4]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from PIL import Image

import cv2

import glob
import random

from tqdm import tqdm

%matplotlib inline

In [5]:
def is_adjacent(x1, y1, x2, y2):
    ''' Returns true if (x1, y1) is adjacent to (x2, y2), and false otherwise '''
    x_diff = abs(x1 - x2)
    y_diff = abs(y1 - y2)
    return not (x_diff == 1 and y_diff == 1) and (x_diff <= 1 and y_diff <= 1)

def find_max_cliques(arr, n):
    ''' Returns a 2*n dimensional vector
    v_i, v_{i+1} describes the number of coherent and incoherent pixels respectively a given color
    '''
    tau = int(arr.shape[0] * arr.shape[1] * 0.01) # Classify as coherent is area is >= 1%
    ccv = [0 for i in range(n**3 * 2)]
    unique = np.unique(arr)
    for u in unique:
        x, y = np.where(arr == u)
        groups = []
        coherent = 0
        incoherent = 0
                
        for i in range(len(x)):
            found_group = False
            for group in groups:
                if found_group:
                    break

                for coord in group:
                    xj, yj = coord
                    if is_adjacent(x[i], y[i], xj, yj):
                        found_group = True
                        group[(x[i], y[i])] = 1
                        break
            if not found_group:
                groups.append({(x[i], y[i]): 1})
        
        for group in groups:
            num_pixels = len(group)
            if num_pixels >= tau:
                coherent += num_pixels
            else:
                incoherent += num_pixels
        
        assert(coherent + incoherent == len(x))
        
        index = int(u)
        ccv[index*2] = coherent
        ccv[index*2+1] = incoherent
    
    return ccv
    
def get_ccv(img, n):
    # Blur pixel slightly using avg pooling with 3x3 kernel
    blur_img = cv2.blur(img, (3,3))
    blur_flat = blur_img.reshape(32*32, 3)
    
    # Discretize colors
    hist, edges = np.histogramdd(blur_flat, bins=n)
    
    graph = np.zeros((img.shape[0], img.shape[1]))
    result = np.zeros(blur_img.shape)
    
    total = 0 
    for i in range(0, n):
        for j in range(0, n):
            for k in range(0, n):
                rgb_val = [edges[0][i+1], edges[1][j+1], edges[2][k+1]]
                previous_edge = [edges[0][i], edges[1][j], edges[2][k]]
                coords = ((blur_img <= rgb_val) & (blur_img >= previous_edge)).all(axis=2)
                result[coords] = rgb_val
                graph[coords] = i + j * n + k * n**2
    
    result = result.astype(int)
    return find_max_cliques(graph, n)

In [7]:
n = 2 # indicating 2^3 discretized colors
feature_size = n**3 * 2 # Number of discretized colors * 2 for coherent and incoherent

def extract_features(image):
    return get_ccv(image, n) # image.flatten()

def shuffle_data(data, labels):
    p = np.random.permutation(len(data))
    return data[p], labels[p]

def load_data(dataset="train", classes=["airplane", "automobile", "bird", "cat"]):
    random.seed(1337)
    
    data = []
    labels = []
    
    for i, c in enumerate(classes):
        for file in glob.glob("data/{}/{}/*.jpg".format(dataset, c)):
            one_hot_label = np.zeros(len(classes))
            one_hot_label[i] = 1
            labels.append(one_hot_label)
            
            img = np.array(Image.open(file))
            features = extract_features(img)
            data.append(features)
    
    data, labels = np.array(data), np.array(labels)
    
    if dataset == "train":
        data, labels = shuffle_data(data, labels)
    
    return data, labels

# Binary Classification
We try out CCV on binary classification of two classes: bird and cat. Training set consists of 20 images from CIFAR-10 for each class. Testing set also contains a different set of 20 images for each class.

In [9]:
np.random.seed(1337)

classes = ["bird", "cat"]
num_classes = len(classes)

train_data, train_labels = load_data(dataset="train", classes=classes)
test_data, test_labels = load_data(dataset="test", classes=classes)

batch_size = 16

lr = tf.placeholder(tf.float32, shape=[])
base_lr = 1

x = tf.placeholder(tf.float32, [None, feature_size]) # Placeholder for image input
y = tf.placeholder(tf.float32, [None, num_classes]) # Placeholder for labels

# Model parameters
# Weights and bias
w = tf.Variable(tf.zeros([feature_size, num_classes]))
b = tf.Variable(tf.zeros([num_classes]))

pred = tf.matmul(x, w) + b
cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=pred, labels=y))
# cost = tf.reduce_mean(tf.exp(-tf.reduce_sum(tf.multiply(tf.scalar_mul(2,y) - 1, pred), axis=1))) # Logistic loss
# cost = tf.reduce_mean(tf.maximum(1 - tf.multiply(tf.scalar_mul(2, y) - 1, pred), 0)) # Hinge loss
# cost = tf.reduce_mean(tf.losses.hinge_loss(y, pred))

optimizer = tf.train.GradientDescentOptimizer(lr).minimize(cost)

init = tf.global_variables_initializer()

# For early stopping
eps = 0.0001 # If loss decreases below this amount, stop training
last_loss = None
losses_to_consider = 5 # If the mean of the last 3 losses < eps, stop
losses = []

with tf.Session() as sess:
    sess.run(init)
    
    for epoch in range(100):
        batch_start = 0
        batch_end = batch_size
        train_batch, train_label = train_data[batch_start:batch_end], train_labels[batch_start:batch_end]

        _, batch_cost = sess.run([optimizer, cost], feed_dict={x: train_batch,
                                                      y: train_label, lr: base_lr / (epoch+1)})
        losses.append(batch_cost)
        
        if epoch % 10 == 0:
            print("Epoch: {}, loss: {}".format(epoch+1, batch_cost))
        
        last_losses = np.mean(losses[-1-losses_to_consider:-1])

        if abs(last_losses) < eps:
            break
        
        # Shuffle data again
        train_data, train_labels = shuffle_data(train_data, train_labels)

    correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
    print("Train accuracy:", accuracy.eval({x: train_data, y: train_labels}))
    print("Test accuracy:", accuracy.eval({x: test_data, y: test_labels}))

Epoch: 1, loss: 0.6931471824645996
Epoch: 11, loss: 644.6712646484375
Epoch: 21, loss: 90.36966705322266
Epoch: 31, loss: 313.6954650878906
Epoch: 41, loss: 250.98727416992188
Epoch: 51, loss: 315.92041015625
Epoch: 61, loss: 98.74835205078125
Epoch: 71, loss: 238.23143005371094
Epoch: 81, loss: 47.603675842285156
Epoch: 91, loss: 114.12418365478516
Train accuracy: 0.775
Test accuracy: 0.65


  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


# Multi-Class Using kNN
Attempt multi-class classification using kNN.

In [12]:
classes = ["airplane", "automobile", "bird", "cat"]
num_classes = len(classes)

train_data, train_labels = load_data(dataset="train", classes=classes)
test_data, test_labels = load_data(dataset="test", classes=classes)

print(train_data.shape)

(80, 16)


In [13]:
from sklearn.neighbors import NearestNeighbors
k = 2
nn = NearestNeighbors(n_neighbors=k).fit(train_data)

In [14]:
distance, indices = nn.kneighbors(test_data)
pred_sum = np.zeros(test_labels.shape)

for i in range(len(indices)):
    index = indices[i]
    for j in range(k):
        pred_sum[i] += train_labels[index[j]]

class_pred = np.argmax(pred_sum, axis=1)
preds = np.zeros(pred_sum.shape)
preds[range(len(class_pred)), class_pred] = 1 # Convert highest in axis to one hot

num_correct = np.all(np.equal(preds, test_labels),axis=1) # Check number of rows that are equal
accuracy = np.mean(num_correct.astype(int)) # Get accuracy
print(accuracy)

0.3875


# Multi-Class Using Logistic Regression
We use logistic regression here to do multi-class classification. Generally, this seems to perform better than kNN.

In [16]:
np.random.seed(1337)

# classes = ["airplane", "automobile", "bird", "cat"]
num_classes = len(classes)

# train_data, train_labels = load_data(dataset="train", classes=classes)
# test_data, test_labels = load_data(dataset="test", classes=classes)
batch_size = 16

lr = tf.placeholder(tf.float32, shape=[])
base_lr = 1

x = tf.placeholder(tf.float32, [None, feature_size]) # Placeholder for image input
y = tf.placeholder(tf.float32, [None, num_classes]) # Placeholder for labels

# Model parameters
# Weights and bias
w = tf.Variable(tf.zeros([feature_size, num_classes]))
b = tf.Variable(tf.zeros([num_classes]))

pred = tf.matmul(x, w) + b
cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=pred, labels=y))
# cost = tf.reduce_mean(tf.exp(-tf.reduce_sum(tf.multiply(tf.scalar_mul(2,y) - 1, pred), axis=1))) # Logistic loss
# cost = tf.reduce_mean(tf.maximum(1 - tf.multiply(tf.scalar_mul(2, y) - 1, pred), 0)) # Hinge loss
# cost = tf.reduce_mean(tf.losses.hinge_loss(y, pred))

optimizer = tf.train.GradientDescentOptimizer(lr).minimize(cost)

init = tf.global_variables_initializer()

# For early stopping
eps = 0.0001 # If loss decreases below this amount, stop training
last_loss = None
losses_to_consider = 3 # If the mean of the last 3 losses < eps, stop
losses = []

with tf.Session() as sess:
    sess.run(init)
    
    for epoch in range(100):
        batch_start = 0
        batch_end = batch_size
        train_batch, train_label = train_data[batch_start:batch_end], train_labels[batch_start:batch_end]

        _, batch_cost = sess.run([optimizer, cost], feed_dict={x: train_batch,
                                                      y: train_label, lr: base_lr / (epoch+1)})
        losses.append(batch_cost)
        
        if epoch % 10 == 0:
            print("Epoch: {}, loss: {}".format(epoch+1, batch_cost))
        
        last_losses = np.mean(losses[-1-losses_to_consider:-1])

        if abs(last_losses) < eps:
            break
        
        # Shuffle data again
        train_data, train_labels = shuffle_data(train_data, train_labels)

    correct = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
    print("Train accuracy:", accuracy.eval({x: train_data, y: train_labels}))
    print("Test accuracy:", accuracy.eval({x: test_data, y: test_labels}))

Epoch: 1, loss: 0.6931471824645996
Epoch: 11, loss: 519.8790283203125
Epoch: 21, loss: 436.6109924316406
Epoch: 31, loss: 170.10304260253906
Epoch: 41, loss: 189.2510986328125
Epoch: 51, loss: 142.00942993164062
Epoch: 61, loss: 71.0032958984375
Epoch: 71, loss: 60.660945892333984
Epoch: 81, loss: 47.96961212158203
Epoch: 91, loss: 59.73826599121094
Train accuracy: 0.45
Test accuracy: 0.4


  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)
