# COGS 185 Final Project: Auto Context (2, 2.5k, 2.5k)

## Step 1: Installations and Imports

In [1]:
!pip install dlib

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import warnings; warnings.simplefilter('ignore')
import numpy as np
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
oe = preprocessing.OneHotEncoder(sparse=False)
import dlib
import re
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
%matplotlib inline
import timeit
label_encoder = preprocessing.LabelEncoder()
import pickle
import random

## Step 2: Function Definitions

In [3]:
# Windows Length
L = 2
# Number of examples
N = 5000
# Length of a feature
d = 128
# The hyper-parameter for icm search
Niter = 2     

In [4]:
def l2i(a):
        return int(ord(a)-ord('a'))
def i2l(i):
    if i >= 0:
        return chr(i+ord('a'))
    else:
        return '_'
def iors(s):
    try:
        return int(s)
    except ValueError: # if it is a string, return a string
        return s

In [5]:
# Read the entire dataset into lists or list of lists
def read_OCR(filename, n_features):
    F = open(filename)
    dataset = {}
    dataset['ids'] = []#np.zeros(n_examples, dtype=int)
    dataset['labels'] = []#np.zeros(n_examples,dtype=int)
    dataset['labelDic'] = {} # To profile the distribution of labels
    dataset['next_ids'] = []#np.zeros(n_examples,dtype=int)
    dataset['word_ids'] = []#np.zeros(n_examples,dtype=int)
    dataset['positions'] = []#np.zeros(n_examples,dtype=int)
    dataset['folds'] = []#np.zeros(n_examples,dtype=int)
    dataset['features'] = []#np.zeros([n_examples,n_features])
    
    for str_line in F.readlines():
        #line0 = map(iors, filter(None, re.split('\t', str_line.strip())))
        ## ATTENTION: If you are using Python3, use the following line instead
        line0 = list(map(iors, filter(None, re.split('\t', str_line.strip()))))


        dataset['ids'].append(int(line0.pop(0)))
        dataset['labels'].append(l2i(line0.pop(0))) # The label is converted into integer('a'=>0, 'z'=>25)
        if dataset['labels'][-1] in dataset['labelDic']:
            dataset['labelDic'][dataset['labels'][-1]] += 1
        else:
            dataset['labelDic'][dataset['labels'][-1]] = 1
            
        dataset['next_ids'].append(int(line0.pop(0)))
        dataset['word_ids'].append(int(line0.pop(0)))
        dataset['positions'].append(int(line0.pop(0)))
        dataset['folds'].append(int(line0.pop(0)))
        if len(line0) != 128:  # Sanity check of the length
            print (len(line0))
        dataset['features'].append(line0)

    return dataset

In [6]:
dataset1 = read_OCR('OCRdataset/letter.data', d)

# Step 3: Concatenating and Structurizing

1) Find 5000 words, split them into 2500 for training, and 2500 for testing.

2) Chop first 2 characters from each word.

3) Now there are 2500 two-words pairs for training, and 2500 for testing.

4) Construct new structures data based on this pairs.
- Ex: 
    - apple => ap
    - banana => ba

In [7]:
# Extract the first L letters in a word

def structurize1(dataset, N, L):
    d_features = len(dataset['features'][0])
    y = dataset['labels']
    X = dataset['features']
    next_id = dataset['next_ids']

    labels = np.zeros((N, L))
    features = np.zeros((N, L * d_features))
    
    def extract(iN, loc):
        labels[iN] = y[loc:loc + L]
        features[iN] = np.array(X[loc:loc + L]).ravel().tolist()
        iN += 1
        return iN
    
    iN = 0
    iN = extract(iN, 0)
    
    for key, value in enumerate(y):
        if next_id[key] == -1:
            iN = extract(iN, key + 1)
            if iN == N:
                break
    
    c = list(zip(labels, features))
    random.shuffle(c)
    labels, features = zip(*c)
    
    return np.array(labels), np.array(features)

In [8]:
labels1, features1 = structurize1(dataset1, N, L)

# Step 4: dlib Classification

In [9]:
class AutoContextProblem:
    C = 1

    def __init__(self, samples, labels, L, K, d, Niter=2):
        self.L = L
        self.K = K
        self.d = d
        self.num_samples = len(samples)
        self.num_dimensions = (L * K * d + 1) + (L - 1)
        self.samples = samples
        self.labels = labels
        self.context = np.zeros((len(samples), L * K), dtype=float)  # Initialize context as float
        self.loss_for_loop = True
        self.Niter = Niter  # Define Niter attribute

    def make_psi(self, x, label, context):
        psi = dlib.vector()
        psi.resize(self.num_dimensions)
        psi[0] = float(x[0])  # The bias

        for i in range(self.L):
            x_offset = 128 * i
            x_offset += 1
            psi_offset = label[i] * 128
            psi_offset += i * self.K * self.d
            psi_offset += 1
            for j in range(self.d):
                psi[psi_offset + j] = float(x[x_offset + j])

            # Incorporate context
            context_index = -(self.L * self.K) + i * self.K + label[i]
            context_value = float(context[i * self.K + label[i]])  # Ensure context value is a float
            psi[context_index] = context_value

        if label[0] == label[1]:
            psi[-(self.L * self.K) - 1] = 0.0
        else:
            psi[-(self.L * self.K) - 1] = 1.0
        return psi

    def get_truth_joint_feature_vector(self, idx):
        return self.make_psi(self.samples[idx], self.labels[idx], self.context[idx])

    def separation_oracle(self, idx, current_solution):
        samp = self.samples[idx]
        psi = [0] * self.num_dimensions
        max1 = -1e10
        max_scoring_label = [0] * self.L
        for k in range(self.Niter):  # Use self.Niter
            for iL in range(self.L):
                for i in range(self.K):
                    tmp_label = max_scoring_label.copy()
                    tmp_label[iL] = i
                    tmp_psi = self.make_psi(samp, tmp_label, self.context[idx])
                    score1 = dlib.dot(current_solution, tmp_psi)

                    loss1 = 0.0
                    if self.loss_for_loop:
                        for j in range(self.L):
                            if self.labels[idx][j] != tmp_label[j]:
                                loss1 += 1.0
                    else:
                        if self.labels[idx] != tmp_label:
                            loss1 += 1.0

                    if max1 < score1 + loss1:
                        max1 = score1 + loss1
                        loss = loss1
                        max_scoring_label[iL] = i
                        psi = tmp_psi

        return loss, psi

    def update_context(self, samples, labels, weights):
        for idx, samp in enumerate(samples):
            prediction = [0] * self.L
            for iL in range(self.L):
                max_score = -1e10
                for i in range(self.K):
                    tmp_label = prediction.copy()
                    tmp_label[iL] = i
                    psi1 = self.make_psi(samp, tmp_label, self.context[idx])
                    score1 = dlib.dot(weights, psi1)

                    if max_score < score1:
                        max_score = score1
                        prediction[iL] = i

            self.context[idx] = np.array([1.0 if l == labels[idx][i] else 0.0 for i, l in enumerate(prediction) for _ in range(self.K)], dtype=float)


In [10]:
def cal_accuracy(samples, labels, problem, weights, K):
    predictions = []
    for idx, samp in enumerate(samples):
        prediction = [0] * problem.L
        for iL in range(problem.L):
            max_score = -1e10
            for i in range(K):
                tmp_label = prediction.copy()
                tmp_label[iL] = i
                if idx < len(problem.context):  # Ensure idx is within bounds
                    psi1 = problem.make_psi(samp, tmp_label, problem.context[idx])
                    score1 = dlib.dot(weights, psi1)

                    if max_score < score1:
                        max_score = score1
                        prediction[iL] = i
        predictions.append(prediction)

    errCnt = 0
    for i in range(len(predictions)):
        if predictions[i] != labels[i]:
            errCnt += 1

    return 1.0 - float(errCnt) / float(len(predictions))

In [11]:
le1 = preprocessing.LabelEncoder()
nplabels1 = le1.fit_transform(labels1.ravel()).reshape(labels1.shape)
npsamples1 = np.hstack([np.ones((N, 1)), features1])
K1 = len(le1.classes_)

tr_labels = nplabels1[int(N*0.5):].astype(int).tolist()
tr_samples = npsamples1[int(N*0.5):].astype(int).tolist()
te_labels = nplabels1[:int(N*0.5)].astype(int).tolist()
te_samples = npsamples1[:int(N*0.5)].astype(int).tolist()

# Step 5: Auto-Context Implementation

In [12]:
problem = AutoContextProblem(tr_samples, tr_labels, L, K1, d, Niter=2)
start_train = timeit.default_timer()
weights = dlib.solve_structural_svm_problem(problem)
end_train = timeit.default_timer()
print("Training time elapsed:", end_train - start_train, "s")

Training time elapsed: 322.1306966559496 s


In [13]:
pickle.dump(weights, open('auto_context_weights2_2.5_2.5.obj', 'wb'))
weights_load = pickle.load(open('auto_context_weights2_2.5_2.5.obj', 'rb'))

print("Training accuracy=", cal_accuracy(tr_samples, tr_labels, problem, weights_load, K1))
print("Test accuracy=", cal_accuracy(te_samples, te_labels, problem, weights_load, K1))

Training accuracy= 0.49160000000000004
Test accuracy= 0.4556
