# P1 - Eucidean Distance

In [1]:
import numpy as np
import tensorflow as tf
from __future__ import print_function
import matplotlib.pyplot as plt

In [5]:
def euclidean_dist(x, z):
    
    #||x - z||^2 = x.T*x - 2*x*z.T + z.T*z
    
    #can also be tried using linalg.norm function
    
    x2 = tf.matmul(x ,tf.transpose(x))
    z2 = tf.matmul(z ,tf.transpose(z))
    
    x2_sum = tf.reduce_sum(x2, axis = 1)
    z2_sum = tf.reduce_sum(z2, axis = 1)
    
    xt = tf.transpose(x2_sum)
    zt = tf.transpose(z2_sum)
    
    xz = tf.matmul(x, tf.transpose(z))
    minus_xz = tf.scalar_mul(-2, xz)
    
    xt = tf.expand_dims(xt, axis = 1)
    zt = tf.expand_dims(zt, axis = 1)
    zt = tf.transpose(zt)
    
    xz2 = xt + zt + minus_xz
    
    return xz2

In [6]:
sess = tf.Session()

a = tf.constant([[1, 1], [2, 2], [3, 3], [4, 4]])
b = tf.constant([[2, 2], [3, 3], [4, 4]])

c = euclidean_dist(a, b)
print(sess.run(c))


[[48 62 76]
 [60 70 80]
 [72 78 84]
 [84 86 88]]


# P2 

In [8]:
def pairwise_dist(data1, data2):
    data1 = tf.convert_to_tensor(data1)
    data2 = tf.convert_to_tensor(data2)
    
    dist = euclidean_dist(data1, data2)
    return dist

def knn_predict(_trainData, _trainTarget, _X, _k):
    dist = pairwise_dist(_trainData, _X)
    near_k, near_id = tf.nn.top_k(dist, _k)
    respon = tf.gather(_trainData, near_id)
    respon = tf.reduce_mean(respon, axis = 1)
    
    return respon
    

In [13]:
sess = tf.Session()

a = tf.constant([[1, 1], [2, 2], [3, 3]])
b = tf.constant([[2, 2], [3, 3], [4, 4]])

c = pairwise_dist(a, b)
print(sess.run(c))

[[40 54 68]
 [44 54 64]
 [48 54 60]]


In [None]:
def mse_loss(trainData, trainTarget, testData, testTarget):
    
    
    return loss

In [10]:
def plot(_trainData, _trainTarget):
    X = np.linspace(0.0, 11.0, num = 1000) [:, np.newaxis]
    X = tf.convert_to_tensor(X)
    
    k_list = [1, 3, 5, 50]
    
    for k in k_list:
        

In [7]:
np.random.seed(521)
Data = np.linspace(1.0 , 10.0 , num =100) [:, np. newaxis]
Target = np.sin( Data ) + 0.1 * np.power( Data , 2) \
+ 0.5 * np.random.randn(100 , 1)
randIdx = np.arange(100)
np.random.shuffle(randIdx)
trainData, trainTarget = Data[randIdx[:80]], Target[randIdx[:80]]
validData, validTarget = Data[randIdx[80:90]], Target[randIdx[80:90]]
testData, testTarget = Data[randIdx[90:100]], Target[randIdx[90:100]]

trainData = tf.convert_to_tensor(trainData)
trainTarget = tf.convert_to_tensor(trainTarget)
testData = tf.convert_to_tensor(testData)
testTarget = tf.convert_to_tensor(testTarget)
validData = tf.convert_to_tensor(validData)
validtarget = tf.convert_to_tensor(validTarget)

# P3 (Using Violet's Code)

In [2]:
def euclideanDistance(x, z):
    '''
    We vectorized the operation with matrix operations
    the ||x - z ||^2 is = Sum(x - z)^2 = Sum from 1 to D
    of (x^2 - 2xz + z^2)
    
    x^2 = x^T times x
    z^2 = z^T times z
    we can find the sum of each row by using the reduce sum function
    
    sum of -2xz 
    we can use matrix multiplication, x times z^T to obtain xz,
    then multiply by -2
    '''
    n1 = x.shape[0]
    n2 = z.shape[0]    
    
    x_squared = tf.square(x)
    z_squared = tf.square(z) #tf.matmul(z ,tf.transpose(z))
    #print("x_squared.eval()")
    #print(x_squared.eval())
    x_2_sum = tf.reduce_sum(x_squared, 1)
    z_2_sum = tf.reduce_sum(z_squared, 1)
    
    x_2_sum = tf.transpose(x_2_sum)
    z_2_sum = tf.transpose(z_2_sum)
    #x_2_sum = tf.reshape(x_2_sum, [-1, 1])
    #z_2_sum = tf.reshape(z_2_sum, [-1, 1])
    #print("x_2_sum.eval()")
    #print(x_2_sum.eval())
    
    xz = tf.matmul(x, tf.transpose(z))
    #print(xz.eval())
    minus_2xz = tf.scalar_mul(-2, xz)
    #print(minus_2xz.eval())
 
    x_2_tile =tf.tile(tf.expand_dims(x_2_sum, 1), [1, n2])
    z_2_tile = tf.tile(tf.expand_dims(z_2_sum ,1), [1, n1])
    z_2_tile_T = tf.transpose(z_2_tile)
    #print(x_2_tile.eval())
    #print(z_2_tile_T.eval())
    
    result = x_2_tile + minus_2xz +z_2_tile_T
    
    return result

"""VERIFIED"""

def testi():
    A = tf.constant([[1, 1], [2,2], [3, 3], [4,4]])
    B = tf.constant([[1, 1], [2, 2],[3,3]])
    res_mine = euclideanDistance(A, B)
    
    print("----my func----")
    print(res_mine.eval())
    print("---diff square---")
    res_lib = PairwiseDistances(A, B)
    print(res_lib.eval())
    

In [3]:
def nearestIndices(_dist_mat, _k):

    nearest_k_data, nearest_k_indices = tf.nn.top_k(tf.negative(_dist_mat), _k)
    return nearest_k_data, nearest_k_indices


#unit testing
#dist_mat = tf.constant([ 4, 9, 16, 25 ], tf.int32)
#topk = nearestIndices(dist_mat, 2)
#responsibility(topk, 2, 4)

def pairDist(_data, _data1):
    data_t = tf.convert_to_tensor(_data)
    data1_t = tf.convert_to_tensor(_data1)
    dist = euclideanDistance(data_t, data1_t)
    return dist

In [4]:
def data_segmentation(data_path, target_path, task):
    # task = 0 >> select the name ID targets for face recognition task
    # task = 1 >> select the gender ID targets for gender recognition task data = np.load(data_path)/255
    data = np.load(data_path)/255
    data = np.reshape(data, [-1, 32*32])
    target = np.load(target_path)
    np.random.seed(45689)
    rnd_idx = np.arange(np.shape(data)[0])
    np.random.shuffle(rnd_idx)
    trBatch = int(0.8*len(rnd_idx))
    validBatch = int(0.1*len(rnd_idx))
    trainData, validData, testData = data[rnd_idx[1:trBatch],:], \
                                   data[rnd_idx[trBatch+1:trBatch + validBatch],:],\
                                   data[rnd_idx[trBatch + validBatch+1:-1],:]
    trainTarget, validTarget, testTarget = target[rnd_idx[1:trBatch], task], \
                              target[rnd_idx[trBatch+1:trBatch + validBatch], task],\
                              target[rnd_idx[trBatch + validBatch + 1:-1], task]
    #print("train data dim", trainData.shape, "valid data dim", validData.shape,
             #"test data dim", testData.shape, "trainTarget shape", trainTarget.shape,
             #"validTarget SHAPE", validTarget.shape, "testTarget shape", testTarget.shape)
    return trainData, validData, testData, trainTarget, validTarget, testTarget


In [5]:
def knnVote(_trainData, _trainTarget, _new_data, _new_target, _k):
    '''
    KNN using majority vote
    '''
    #nearest indices
    dist_mat = pairDist( _new_data, _trainData ) 
    
    nearest_k, nearest_k_idx = nearestIndices(dist_mat, _k)
    neighbours = tf.gather(_trainTarget, nearest_k_idx)
    
    s1 = neighbours.shape[0]
    n_unstack = tf.unstack(neighbours, axis = 0)

    nearest_k_y, idx, votes = [], [], []
    predict_res = []
    
    check = False
    curr_id = 0
    
    for i in n_unstack:
        y, i, v = tf.unique_with_counts(i) 
        nearest_k_y.append(y)
        idx.append(i)
        votes.append(v)
        predict_res.append(y[tf.argmax(v)])
        votes.append(tf.argmax(v)) #record the indices chosen
        
        if k == 10 and check == False:
            display_k10(_trainData, _trainTarget, _new_data, _new_target, nearest_k_idx, curr_id, y, v)
            curr_id += 1
            check = True
        
    predict_res = tf.convert_to_tensor(predict_res)

    return predict_res


In [6]:
def testQ2KNN(trainData, trainTarget, testData, testTarget):
    num_neighbour_list = [1, 5, 10, 25, 50, 100, 200]
    loss_list = []
    for j in num_neighbour_list:
        #print("running knn, k = ", j)
        y_hat = knnVote(trainData, trainTarget, testData, testTarget, j)
        y_hat = tf.cast(y_hat, tf.float16)
        testTarget = tf.cast(testTarget, tf.float16)
        # all the indices where y = target, bool
        loss = tf.not_equal(y_hat, testTarget )
        #cast bool to int
        as_ints = tf.cast(loss, tf.int32)
        error = tf.reduce_sum(as_ints)
        loss_list.append(error.eval())
    return loss_list


In [7]:
def display_k10(_trainData, _trainTarget, _newData, _newTarget, k_id, curr_id, y, v):
    
    test_data = tf.gather(_newData, curr_id)
    neighbour = tf.gather(_trainData, k_id)
    curr_target = tf.cast(_newTarget[curr_id], tf.uint8)
    if tf.not_equal(y[tf.argmax(v)], curr_target):
        plot_k10(test_data, neighbour)
        plt.imshow(test_data)
        plt.Title("Failure Case")
        plt.show()
        
        for i in range(10):
            neighbor_img = tf.reshape(neighbor[i, :], [32, 32])
            plt.imshow(neighbor_img)
            title = i + "st nearest neighbor"
            plt.Title(title)
            plt.show()

In [8]:
def q2Task(t):
    data_path = 'data.npy'
    target_path = 'target.npy'
    trainData, validData, testData, trainTarget, validTarget, testTarget = data_segmentation(data_path, target_path, t)
       
    # convert numpy array to tensors
    trainData = tf.stack(trainData)
    trainTarget = tf.stack(trainTarget)
    testData = tf.stack(testData)
    testTarget = tf.stack(testTarget)
    validData = tf.stack(validData)
    validtarget = tf.stack(validTarget)
    
    print("********** BEGIN Q2 task ", t, " ***********")
    loss_train = testQ2KNN(trainData, trainTarget, trainData, trainTarget) 
    print("q2 task", t, "loss train", loss_train)
    loss_test = testQ2KNN(trainData, trainTarget, testData, testTarget)
    print("q2 task", t,  "loss test", loss_test) 
    loss_valid = testQ2KNN(trainData, trainTarget, validData, validTarget)
    print("q2 task", t , "loss valid", loss_valid)
    print("********** END Q2 task ", t, " ***********")



In [9]:
from scipy import spatial as sp
from sklearn import metrics as skm


if __name__ == "__main__":
    
    init = tf.global_variables_initializer()
    sessMain = tf.InteractiveSession()
    sessMain.run(init)
    #q1()
    q2Task(0)
    q2Task(1)
    
    

********** BEGIN Q2 task  0  ***********


TypeError: Failed to convert object of type <class 'list'> to Tensor. Contents: [1, Dimension(747)]. Consider casting elements to a supported type.