In [1]:
from os import listdir
import os, random, copy
from PIL import Image
import numpy as np
from collections import defaultdict
import math

In [2]:
#define sigmoid and its derivative for activation & backprop
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def derivSigmoid(x):
    return sigmoid(x) * (1 - sigmoid(x))

## Question 1 - Import Data

In [3]:
################################################################################
# CSE 253: Programming Assignment 1
# Code snippet by Michael
# Winter 2020
################################################################################
# We've provided you with the dataset in PA1.zip
################################################################################
# To install PIL, refer to the instructions for your system:
# https://pillow.readthedocs.io/en/5.2.x/installation.html
################################################################################
# If you don't have NumPy installed, please use the instructions here:
# https://scipy.org/install.html
################################################################################


''' 
list of face expressions (contempt, neutral are excluded) are:
1. anger
2. disgust
3. fear
4. happiness
5. sadness
6. surprise
'''

def load_data(data_dir="./aligned/"):
	""" Load all PNG images stored in your data directory into a list of NumPy
	arrays.

	Args:
		data_dir: The relative directory path to the CK+ image directory.
	Returns:
		images: A dictionary with keys as emotions and a list containing images associated with each key.
		cnt: A dictionary that stores the # of images in each emotion
	"""
	images = defaultdict(list)

	# Get the list of emotional directory:
	for e in listdir(data_dir):
		# excluding any non-directory files
		if not os.path.isdir(os.path.join(data_dir, e)):
			continue
		# Get the list of image file names
		all_files = listdir(os.path.join(data_dir, e))

		for file in all_files:
			# Load only image files as PIL images and convert to NumPy arrays
			if '.png' in file:
				img = Image.open(os.path.join(data_dir, e, file))
				images[e].append(np.array(img))

	print("Emotions: {} \n".format(list(images.keys())))

	cnt = defaultdict(int)
	for e in images.keys():
		print("{}: {} # of images".format(e, len(images[e])))
		cnt[e] = len(images[e])
	return images, cnt

def balanced_sampler(dataset, cnt, emotions):
	# this ensures everyone has the same balanced subset for model training, don't change this seed value
	random.seed(20)
	print("\nBalanced Set:")
	min_cnt = min([cnt[e] for e in emotions])
	balanced_subset = defaultdict(list)
	for e in emotions:
		balanced_subset[e] = copy.deepcopy(dataset[e])
		random.shuffle(balanced_subset[e])
		balanced_subset[e] = balanced_subset[e][:min_cnt]
		print('{}: {} # of images'.format(e, len(balanced_subset[e])))
	return balanced_subset

def display_face(img):
	""" Display the input image and optionally save as a PNG.

	Args:
		img: The NumPy array or image to display

	Returns: None
	"""
	# Convert img to PIL Image object (if it's an ndarray)
	if type(img) == np.ndarray:
		print("Converting from array to PIL Image")
		img = Image.fromarray(img)

	# Display the image
	img.show()


# example on how to use it
if __name__ == '__main__':
	# The relative path to your image directory
	data_dir = "./aligned/"
	dataset, cnt = load_data(data_dir)
	# test with happiness and anger
	images = balanced_sampler(dataset, cnt, emotions=['happiness', 'anger'])
	display_index = 0
	display_face(images['anger'][display_index])

Emotions: ['fear', 'surprise', 'sadness', 'happiness', 'anger', 'disgust'] 

fear: 25 # of images
surprise: 83 # of images
sadness: 28 # of images
happiness: 69 # of images
anger: 45 # of images
disgust: 59 # of images

Balanced Set:
happiness: 45 # of images
anger: 45 # of images
Converting from array to PIL Image


In [4]:
K = 10 # number of sections after folding
M = 50 # maximum number of epochs


## Training Procedure

In [5]:
#list happiness as a 1, anger as a 0
X = images['happiness'] + images['anger']
y = [1] * 45 + [0] * 45

#get my feature - label pairs zipped together
all_happy_angry = list(zip(X,y))

#randomize the dataset so I can fold properly

random.shuffle(all_happy_angry)

def kfold(data, k=10):
    folds = []
    size = int((1/k) * len(data))
    for i in range(0,k):
        folds.append(data[i*size : (i+1)*size])
    return folds


In [6]:
folds = kfold(all_happy_angry)
print(len(folds[0]) == len(folds[1]))
print(len(folds))

True
10


## Task 5 - Logistic Regression

In [7]:
class LogisticRegression:
    
    def __init__(self, lr, dim):
        self.lr = lr
        self.w = np.zeros(dim) 
    
    def stochastic_gradient_descent(self, X, labels):
        indices = [i for i in range(len(labels))]
        np.random.shuffle(indices)
        for i in indices:
            
            # make predition
            data = X[i]
            label = labels[i]
            predicted = sigmoid(data.dot(self.w))
            error = label - predicted
            
            # update weights
            for i in range(len(self.w)):
                grad = error * data[i]
                self.w[i] += self.lr * grad
            
    def probabilities(self, X):
        return sigmoid(X.dot(self.w)).reshape(-1)

    
    def batch_gradient_descent(self, X, labels):
        predicted = self.probabilities(X)
        error = labels - predicted
        grad = X.T.dot(error)
        self.w += self.lr * grad
        

    def loss(self, labels, predicted):
        #cost = np.log(predicted+ 0.0001) * labels + (1 - labels) * np.log(1 - predicted)
        #return - cost.sum() / len(labels)
        #print(cost)
        
        Yis1 = labels == 1
        cost = -(np.log(predicted[Yis1]).sum() + np.log(1 - predicted[~Yis1]).sum())/len(labels)
        return cost

In [8]:
lr = LogisticRegression(0.1, 3)

In [9]:
lr.loss(np.array([1,1]), np.array([1,1]))

-0.0

In [10]:
lr.loss(np.array([1,1]), np.array([0.8,1]))

0.11157177565710485

In [11]:
test_data = np.array([[0.9,0.22,0.44], [0.0,0.22,0.44]])

for i in range(50):
    lr.stochastic_gradient_descent(test_data, np.array([0, 1]))

In [12]:
lr.w

array([-1.52246431,  0.16109949,  0.32219898])

In [13]:
lr.probabilities(test_data)

array([0.23272131, 0.54418679])

In [14]:
lr.loss(np.array([0, 1]), lr.probabilities(test_data))

0.4366839636474693

In [15]:
5 

[0, 0, 0, 0, 1, 0]

[0, 0, 0, 0, 1, 0]

## Softmax

In [16]:
emotions_mapping = ["fear", "surprise", "sadness", "happiness", "anger", "disgust"]


class SoftmaxRegression:
    
    def __init__(self, lr, dim, c):
        self.lr = lr
        self.c = c
        self.w = np.zeros((dim, c)) 
    
    def stochastic_gradient_descent(self, X, labels):
        indices = [i for i in range(len(labels))]
        np.random.shuffle(indices)
        for i in indices:
            
            # make predition
            data = X[i]
            label = labels[i]
            predicted = self.probabilities(data)
            error = label - predicted
            
            # update weights
            for i in range(len(self.w)):
                grad = error * data[i]
                self.w[i] += self.lr * grad
            
    def probabilities(self, X):
        return np.exp(X.dot(self.w)) / np.sum(np.exp(X.dot(self.w)), axis=1)

    
    def batch_gradient_descent(self, X, labels):
        predicted = self.probabilities(X)
        error = labels - predicted
        grad = X.T.dot(error)
        #grad = np.sum(grad, axis=1).reshape(grad.shape[0], 1)
        self.w += self.lr * grad
        

    def loss(self, labels, predicted):
        #return -np.sum(np.sum(labels.T.dot(np.log(predicted)))) # can't have zero values 
        sum_of_score = 0
        for n in range(predicted.shape[0]):
            for cat in range(self.c):
                sum_of_score += labels[n, cat] * np.log(predicted[n, cat])

        return - sum_of_score / predicted.shape[0]


In [17]:
softmax = SoftmaxRegression(0.02, 4, 2)

test_data = np.array([[0.3,0.22,0.44, 0.44], [0.0,0.12,0.2, 0.2]])



#softmax.loss(np.array([[1, 0],[1,0]]), softmax.probabilities(test_data))

In [18]:
softmax.loss(np.array([[1, 0],[1,0]]), np.array([[0.99,0.01], [0.99,0.01]]))

0.01005033585350145

In [19]:
for i in range(50):
    softmax.batch_gradient_descent(test_data, np.array([[1, 0],[0,0]]))

In [20]:
softmax.w

array([[ 0.13271808, -0.13035603],
       [ 0.03345672, -0.15292185],
       [ 0.0882034 , -0.28673456],
       [ 0.0882034 , -0.28673456]])

In [21]:
softmax.probabilities(test_data)

array([[0.61060854, 0.3771708 ],
       [0.56057181, 0.45702117]])

In [22]:
class PCA:
    def __init__(self, square_data, k=10): # k defaults to 10
        self.square_data = square_data
        M, rows, columns = self.square_data.shape
        self.k = k
        self.num_examples = M
        self.image_vec = np.reshape(square_data, (M, rows*columns))
        self.mean_face = np.mean(self.image_vec, axis=0)
        self.std_face = np.std(self.image_vec, axis=0)
        self.components, self.singular_values = self.get_components()
    
    def get_components(self):
        Phi = (self.image_vec - self.mean_face) / self.std_face 
        A = Phi
        C = np.matmul(A, A.T)
        C = np.divide(C, self.num_examples - 1)
        evals, Vi = np.linalg.eigh(C)
        z = list(zip(evals,Vi))
        z.sort(reverse=True)        
        
        idx = np.argsort(evals)[::-1]
        evecs = Vi[:,idx]
        evecs = evecs[:, :self.k]
        pc = evecs
        
        #final components (num pixels by k matrix)
        components = np.matmul(A.T, pc)
        components = components / np.linalg.norm(components, axis=0)
        #get singluar values
        sorted_evals = np.array([x[0] for x in z])
        postive_evals = sorted_evals[:self.k]
        singular_values = np.sqrt(postive_evals.reshape(1, -1))
        assert np.allclose(np.linalg.norm(components, axis=0), 1)

        return components, singular_values
        
    def transform(self, images):
        return np.array([self.transform_single(i) for i in images])
        
    def transform_single(self, image): #take an image, and pc's, and output compressed image
        image =  image.reshape(1, -1) 
        image = (image - self.mean_face) / self.std_face
        compressed_image_vectors = np.matmul(image, self.components) / self.singular_values
        return compressed_image_vectors.reshape(-1,)
    
    '''
    def transform(self, images): #take an image, and pc's, and output compressed image
        M, rows, columns = images.shape
        flat_images = (np.reshape(images, (M, rows*columns)) - self.mean_face) / self.std_face
        print("Shape flat images")
        print(flat_images.shape)
        compressed_image_vectors = np.matmul(flat_images, self.components) / self.singular_values
        return compressed_image_vectors
    '''

In [28]:
import matplotlib.pyplot as plt
#from pca import PCA

EPOCHS = 100
K = 10
LOGISTIC = False
CATEGORIES = 2
LEARNING_RATE = 0.01

class EpochData:
    def __init__(self):
        self.acc = []
        self.error = []
    
    def save(self, error, acc):
        self.error.append(error)
        self.acc.append(acc)
        
    
    def score(self):
        return self.error[-1]
        
        
def transform(pca, data):
    labels = data[1]
    if not LOGISTIC: # if softmax
        labels = one_hot_encode(labels)
    return pca.transform(data[0]), labels
    #np.array([pca.transform(i)[0] for i in data[0]]), labels

def visualize_data(plots, legends, x_label, y_label):
    x = np.arange(1, len(plots[0]) + 1)
    for data in plots:
        plt.plot(x, data)
    plt.ylabel(y_label)
    plt.xlabel(x_label)
    plt.legend(legends)
    plt.show()
    
def split_x_y(data):
    return np.array([item[0] for item in data]), np.array([item[1] for item in data]) 

def one_hot_encode(labels):
    new_labels = np.zeros((len(labels), CATEGORIES))
    new_labels[np.arange(labels.size), labels] = 1
    return new_labels

def train():
    best_model = None
    
    folds = kfold(all_happy_angry)
    k = len(folds)
    for fold in range(k):
        # define the model
        if LOGISTIC:
            model = LogisticRegression(LEARNING_RATE, K)
        else:
            model = SoftmaxRegression(LEARNING_RATE, K, CATEGORIES)

        # split data
        val_data, test_data = split_x_y(folds[fold]), split_x_y(folds[(fold + 1) % k])
        train_data = None
        print("Fold: " + str(fold))
        for i in range(k):
            if i != fold and i != ((fold + 1) % k):
                print("Training index: " + str(i))
                if train_data is None:
                    train_data = folds[i]
                else:
                    train_data = np.concatenate((train_data, folds[i]))
        train_data = split_x_y(train_data)
        print(val_data[0].shape)
        print(test_data[0].shape)
        print(train_data[0].shape)

        print(train_data[0][0])
        pca = PCA(train_data[0], K) # PCA(10) #
        #pca.fit(train_data[0])
        # PCA and one_hot
        train_data, test_data, val_data = transform(pca, train_data), transform(pca, test_data), transform(pca, val_data)        
        print("PCA transformed")
        print(train_data[0][0])
        validation_performance = EpochData()
        training_performance = EpochData() 
        
        
        for epoch in range(EPOCHS):
            model.batch_gradient_descent(train_data[0], train_data[1])
            #print(model.probabilities(train_data[0]))
            training_error = model.loss(train_data[1], model.probabilities(train_data[0]))
            validation_error = model.loss(val_data[1], model.probabilities(val_data[0]))
            
            traning_acc = 0.3 #model.make_prediction()
            validation_acc = 0.4 #model.make_prediction()
            
            print("Training error: {}, validation error: {}, accuracy: {}".format(training_error, validation_error, traning_acc))
        
            # save
            validation_performance.save(validation_error, validation_acc)
            training_performance.save(training_error, traning_acc)
        
        
        # plot the graphs
        data_to_plot = [training_performance.error, validation_performance.error]
        legends = ["Training error", "Validation error"]
        visualize_data(data_to_plot,legends, "Epoch", "Cross entropy error")
                
        # save the validation data to the model
        model.epoch_data = validation_performance 
        
        # save the best model
        if best_model is None:
            best_model = model
        elif best_model.epoch_data.score() > model.epoch_data.score():
            best_model = model
        
            

train()


Fold: 0
Training index: 2
Training index: 3
Training index: 4
Training index: 5
Training index: 6
Training index: 7
Training index: 8
Training index: 9
(9, 224, 192)
(9, 224, 192)
(72, 224, 192)
[[50 50 51 ... 61 60 60]
 [50 49 50 ... 61 62 63]
 [51 51 51 ... 63 64 66]
 ...
 [40 40 40 ... 48 48 49]
 [41 41 40 ... 46 46 45]
 [40 39 38 ... 49 49 48]]
PCA transformed
[ 0.49845016  0.94623586 -0.05710495  0.25494645 -1.41748828  0.11601997
 -0.59675826  0.83598796  0.01925483 -0.31428448]


ValueError: operands could not be broadcast together with shapes (72,2) (72,) 

In [24]:
test_data = np.array([
    [[40, 33, 34], 
     [54, 34, 22]],
    
    [[34, 40, 34], 
     [23, 32, 32]],

    [[60, 30, 64], 
     [73, 62, 72]]])

test_img = np.array([[34, 36, 45], [34, 36, 45]])

In [25]:
pca = PCA2(np.array(test_data), 1)

NameError: name 'PCA2' is not defined

In [None]:
pca.transform_single(test_img)

In [None]:
print(pca.components)

In [None]:
print(pca.singular_values)

In [None]:
from pca import PCA

pca1 = PCA(1)

In [None]:
pca1.fit(test_data)

In [None]:
pca1.transform(test_img)

In [None]:
print(pca1.p_components)

In [None]:
print(pca1.s_vals)