# import data

In [1]:
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
train_file_path = '/Users/cao.yumin/Desktop/statistic_of_ml/3/2020_assignment3_data/mnist_train.csv'
test_file_path = '/Users/cao.yumin/Desktop/statistic_of_ml/3/2020_assignment3_data/mnist_test.csv'
train_df = pd.read_csv(train_file_path,header=None)
test_df = pd.read_csv(test_file_path,header=None)
train_df.head()

In [None]:
train_df.dtypes

In [None]:
train_label = train_df.pop(0)
train_features = train_df
test_label = test_df.pop(0)
test_features = test_df

In [None]:
train_label.shape

In [None]:
train_features.shape

In [None]:
train_features = train_features.to_numpy()
test_features = test_features.to_numpy()

In [None]:
train_features.shape

In [None]:
train_label = train_label.to_numpy()
test_label = test_label.to_numpy()

In [None]:
fig,axs = plt.subplots(1,3)
ax0,ax1,ax2 = axs.ravel()
ax0.imshow(train_features[0].reshape(28,28),cmap='gray')
ax1.imshow(train_features[1].reshape(28,28),cmap='gray')
ax2.imshow(train_features[2].reshape(28,28),cmap='gray')
plt.show()

## standard scaler

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
train_features=scaler.fit_transform(train_features)
test_features=scaler.fit_transform(test_features)

In [None]:
fig,axs = plt.subplots(1,3)
ax0,ax1,ax2 = axs.ravel()
ax0.imshow(train_features[0].reshape(28,28),cmap='gray')
ax1.imshow(train_features[1].reshape(28,28),cmap='gray')
ax2.imshow(train_features[2].reshape(28,28),cmap='gray')
plt.show()

# PCA using SVD

In [None]:
class PCA:
    def __init__(self,X,n): # X is the input training set(standardised)
                            # n is the number of reduced dimensions (from 256 to 10)
        self.X = X
        self.n = n
    def mean_vector(self,mean_X): # mean_X is a numpy array, [m1.T,m2.T,...]
        mean_x = []
        new_mean_X = mean_X.T
        length = len(new_mean_X)
        for i in range(length):
            sum = np.mean(new_mean_X[i])
            mean_x.append(sum)
        mean_x = np.array(mean_x)
        return mean_x # shape (1,m)
    def covariance_matrix(self): # finding covariance_matrix of X
        cov_matrix = np.cov(self.X)
        return cov_matrix
    def singular_value_decomposition(self): # singular value decomposition and get the n main components
        m,n = self.X.shape
        new_X = (1/(math.sqrt(n-1)))*self.X.T
#         print('new_X',new_X.shape)
        U, sigma, V_T = np.linalg.svd(new_X)
#         svd = np.zeros((U.shape[0],V.shape[1]))     
#         for i in range(0, self.n):                       
#             svd[i, i] = b[i]
#         print('V_T>>>,',V_T.shape)
        V_n = V_T[:self.n]
#         print('V_n>>>>,',V_n.shape)
        return V_n
    def fit(self,X):
        V_n = self.singular_value_decomposition()
        Y = np.matmul(V_n,X)
#         print('Y,',Y.shape)
#         print('----')
        return Y

In [None]:
from sklearn.neighbors import KNeighborsClassifier
neigh = KNeighborsClassifier(n_neighbors=1)
class KNN:
    def __init__(self,train_x,train_label,test_x):
        self.train_x = train_x
        self.train_y = train_label
        self.test_x = test_x
    def predict(self):
        neigh.fit(self.train_x.T, self.train_y)
        pred_y = neigh.predict(self.test_x.T)
        return pred_y
    def error(self,test_y):
        pred_y = self.predict()
        error_score = 0
        for i in range(len(pred_y)):
            if int(pred_y[i]) != int(test_y[i]):
                error_score += 1
        error = error_score/len(pred_y)
        return error

In [None]:
error_plot_list =[] 
for i in range(10,257):
    temp = PCA(train_features.T,i)
    train_x = temp.fit(train_features.T)
    test_x = temp.fit(test_features.T)
    error = KNN(train_x,train_label,test_x).error(test_label)
    error_plot_list.append((error,i))

In [None]:
error2 = []
for i in [512,784]:
    temp = PCA(train_features.T,i)
    train_x = temp.fit(train_features.T)
    test_x = temp.fit(test_features.T)
    error = KNN(train_x,train_label,test_x).error(test_label)
    error2.append((error,i))
print(error2)

In [None]:
print(error_plot_list)

In [None]:
y_list = [i[0] for i in error_plot_list]
x_list = [i[1] for i in error_plot_list]
plt.figure()
plt.title('Error curve of task 2')
plt.xlabel('reduced dimentions')
plt.ylabel('error rate')
plt.plot(x_list,y_list)
plt.show()

# PCA using eigenvalue and eigenvectors

In [None]:
class PCA2:
    def __init__(self,X,n): # X is the input training set(standardised)
                            # n is the number of reduced dimensions (from 256 to 10)
        self.X = X
        self.n = n
    def mean_vector(self,mean_X): # mean_X is a numpy array, [m1.T,m2.T,...]
        mean_x = []
        new_mean_X = mean_X.T
        length = len(new_mean_X)
        for i in range(length):
            sum = np.mean(new_mean_X[i])
            mean_x.append(sum)
        mean_x = np.array(mean_x)
        return mean_x # shape (1,m)
    def covariance_matrix(self): # finding covariance_matrix of X
        cov_matrix = np.cov(self.X)
        return cov_matrix
    def singular_value_decomposition(self): # singular value decomposition and get the n main components
        m,n = self.X.shape
        Rmatrix = np.dot(self.X,self.X.T)
        Rmatrix = (1/(n-1))*Rmatrix
        eigvalue,eigvector = np.linalg.eig(Rmatrix)
        sorted_eigvalue_index = np.argsort(eigvalue)
        new_eigvector = [eigvector[i] for i in sorted_eigvalue_index]
        new_eigvector = np.array(new_eigvector)
#         print('new_eigvector>>>,',new_eigvector.shape)
        V_n = new_eigvector[:self.n]
#         print('V_n>>>>,',V_n.shape)
        return V_n
    def fit(self,X):
        V_n = self.singular_value_decomposition()
        Y = np.dot(V_n,X)
#         print('Y,',Y.shape)
#         print('----')
        return Y

In [None]:
error_plot_list2 =[] 
for i in [10*(m+1) for m in range(25)]:
    temp = PCA2(train_features.T,i)
    train_x = temp.fit(train_features.T)
    test_x = temp.fit(test_features.T)
    error = KNN(train_x,train_label,test_x).error(test_label)
    error_plot_list2.append((error,i))

In [None]:
y_list = [i[0] for i in error_plot_list2]
x_list = [i[1] for i in error_plot_list2]
plt.figure()
plt.title('Error curve of task 2')
plt.xlabel('reduced dimentions')
plt.ylabel('error rate')
plt.plot(x_list,y_list)
plt.show()

In [None]:
np.random.RandomState(32)
random_idx = np.random.permutation(train_features.shape[0])
train_features[random_idx[:10]].shape

In [None]:
train_features[train_label == 1,:].shape

In [None]:
centroids = np.zeros((10,train_features.shape[1]))
for k in range(10):
    centroids[k, :] = np.mean(train_features[train_label == k, :], axis=0)
centroids.shape

# k means task 3

In [None]:
random_state = 32
np.random.RandomState(random_state)

class KMeans:
    def __init__(self,X,y,max_iter = 100,n_clusters = 10):
        self.X = X
        self.y = y
        self.max_iter = max_iter
        self.n_clusters = n_clusters
        
    def initial_centre_points(self):
        random_idx = np.random.permutation(self.X.shape[0])
        centroids = self.X[random_idx[:self.n_clusters]]
        return centroids
             
    def find_closest_cluster(self, distance):
        return np.argmin(distance, axis=1)
    
    def compute_centroids(self, X, labels):
        centroids = np.zeros((self.n_clusters, X.shape[1]))
        for k in range(self.n_clusters):
            centroids[k, :] = np.mean(X[labels == k, :], axis=0)
        return centroids
    
    def compute_distance(self, X, centroids):
        distance = np.zeros((X.shape[0], self.n_clusters))
        for k in range(self.n_clusters):
            row_norm = np.linalg.norm(X - centroids[k, :], axis=1)
            distance[:, k] = np.square(row_norm)
        return distance
    
    def compute_loss(self, X, labels, centroids):
        distance = np.zeros(X.shape[0])
        for k in range(self.n_clusters):
            distance[labels == k] = np.linalg.norm(X[labels == k] - centroids[k], axis=1)
        return np.sum(np.square(distance))
    
    def fit(self, X):
        self.centroids = self.initial_centre_points()
        error_list = []
        for i in range(self.max_iter):
            if i%10 == 0:
                print(i)
            distance = self.compute_distance(X, self.centroids)
            self.labels = self.find_closest_cluster(distance)
#             print(self.labels)
            self.new_centroids = self.compute_centroids(X, self.labels)
            if np.all(self.new_centroids == self.centroids):
                break
            else:
                self.centroids = self.new_centroids
            error = self.compute_loss(X,self.labels,self.new_centroids)
            error_list.append(error)
        self.error_list = error_list
        self.iteration = i
        
    def loss(self):
        return self.error_list,self.iteration
    
    def accuracy(self,y):
        count = 0
        for i in range(len(y)):
            if int(y[i]) == int(self.labels[i]):
                count += 1
        return count

In [None]:
k_object = KMeans(train_features,train_label,max_iter = 100)
k_object.fit(train_features)
loss_list,iteration = k_object.loss()

In [None]:
y_list = [i for i in loss_list]
x_list = [i for i in range(len(loss_list))]
plt.figure()
plt.title('loss curve of task 2')
plt.xlabel('iterations')
plt.ylabel('loss')
plt.plot(x_list,y_list)
plt.show()

# task 4 -random create centroids

In [None]:
random_idx = np.random.permutation(train_features.shape[0])
centroids = np.zeros((10,train_features.shape[1]))
templist = [i for i in range(10)]
tempindex = []
j_index = []
for i in range(10):
    for j in random_idx:
        if train_label[j] in templist:
            index = templist.pop(templist.index(train_label[j]))
#             print(templist)
            tempindex.append(index)
            j_index.append(j)
            centroids[i,:] = train_features[j]
            break
        else:
            continue

In [None]:
new_cent = centroids[np.argsort(tempindex)]

# task4 create k means

In [None]:
from sklearn.metrics import classification_report
class KMeans_task4:
    def __init__(self,X,y,centroids,max_iter = 40,n_clusters = 10):
        self.X = X
        self.y = y
        self.max_iter = max_iter
        self.n_clusters = n_clusters
        self.centroids = centroids
                 
    def find_closest_cluster(self, distance):
        return np.argmin(distance, axis=1)
    
    def compute_centroids(self, X, labels):
        centroids = np.zeros((self.n_clusters, X.shape[1]))
#         print('>>>>begin')
        for k in range(self.n_clusters):
#             print(X[labels == k, :])
            if len(labels[labels == k]) == 0:
                continue
            else:
                centroids[k, :] = np.mean(X[labels == k, :], axis=0)
#         print('>>>end')
        return centroids
    
    def compute_distance(self, X, centroids):
        distance = np.zeros((X.shape[0], self.n_clusters))
        for k in range(self.n_clusters):
            row_norm = np.linalg.norm(X - centroids[k, :], axis=1)
            distance[:, k] = np.square(row_norm)
        return distance
    
    def compute_loss(self, X, labels, centroids):
        distance = np.zeros(X.shape[0])
        for k in range(self.n_clusters):
            distance[labels == k] = np.linalg.norm(X[labels == k] - centroids[k], axis=1)
        return np.sum(np.square(distance))
    
    def fit(self, X):
        error_list = []
        for i in range(1,self.max_iter):
            if i%20 == 0:
                print(i)
            distance = self.compute_distance(X, self.centroids)
#             print('distance',distance.shape,distance)
            self.labels = self.find_closest_cluster(distance)
#             print('selflabels',self.labels.shape,self.labels)
            self.new_centroids = self.compute_centroids(X, self.labels)
#             print('>',self.new_centroids.shape,self.new_centroids)
            if np.all(self.new_centroids == self.centroids):
                break
            else:
                self.centroids = self.new_centroids
#             print('----------------')
#             error = self.compute_loss(X,self.labels,self.new_centroids)
#             error_list.append(error)
#         self.error_list = error_list
#         self.iteration = i
        
    def calculate_percentage(self,y):
        originallist = []
        totallist = []
        truelist = np.zeros(10)
#         print(self.labels[0])
#         print(y)
#         print(self.labels[:20],self.labels)
#         print(y[:20],y)
#         print(self.labels[2],y[2],self.labels[2]==y[2],int(self.labels[2])==int(y[2]))
        for i in range(10):
            number = len(y[y==i])
            originallist.append(number)
        for i in range(10):
            number = len(self.labels[self.labels==i])
            totallist.append(number)
        for i in range(len(y)):
            if int(self.labels[i]) == int(y[i]):
                truelist[self.labels[i]] += 1
            else:
                continue
        self.originallist = originallist
        self.totallist = totallist
        self.truelist = truelist
        self.percentage_list = [self.truelist[i]/self.totallist[i] for i in range(10)]
    
    def a(self):
        return self.labels
    
    def report(self):
        print('TP: True Positive')
        print('FP: False Positive')
        print('Precision = TP/(TP+FP)')
        print('class      original     predict      true      TP      FP       precision')
        for m in range(10):
            a,b,c = self.originallist[m],self.totallist[m],self.truelist[m]
            print(str(m),'\t',a,'\t',b,'\t',int(c),'\t',\
                 int(c),'\t',b-int(c),'\t',\
                 int(c)/(b))
        print('\n')
        
    def average_precision(self):
        return np.mean(np.array(self.percentage_list))

In [None]:
k_object2 = KMeans_task4(train_features,train_label,new_cent)
k_object2.fit(train_features)
k_object2.calculate_percentage(train_label)
k_object2.report()
k_object2.average_precision()

In [None]:
error_plot_list_kmeans =[] 
for i in [10*(m+1) for m in range(25)]:
    if i%50 == 0:
        print(i)   
        
    new_temp_cent = PCA(new_cent.T,i)
    new_train_x = new_temp_cent.fit(new_cent.T)
    new_cent2 = new_train_x.T
    print(new_cent2.shape)
    temp = PCA(train_features.T,i)
    train_x = temp.fit(train_features.T)
    test_x = temp.fit(test_features.T)
    print(train_x.shape)
#     print(train_x.T.shape)ii
#     print(new_cent2.shape)
#     print(new_cent2)ii
    k_object3 = KMeans_task4(train_x.T,train_label,new_cent2)
    k_object3.fit(train_x.T)
    k_object3.calculate_percentage(train_label)
    error = k_object3.average_precision()
    print(error)
    error_plot_list_kmeans.append(error)

In [None]:
y_list = [i for i in error_plot_list_kmeans]
x_list = [10*(m+1) for m in range(25)]
plt.figure()
plt.title('curve of task 4')
plt.xlabel('reduced dimension')
plt.ylabel('percentage')
plt.plot(x_list,y_list)
plt.show()

# task 5 add noise and doing scaler

In [None]:
train_features = scaler.inverse_transform(train_features)
test_features = scaler.inverse_transform(test_features)

new_train_features = np.zeros((train_features.shape[0],1040))
new_test_features = np.zeros((test_features.shape[0],1040))

for i in range(train_features.shape[0]):
    b = np.random.randint(2, size=256)
    new_train_features[i,:] = np.concatenate((train_features[i], b), axis=None)
for i in range(test_features.shape[0]):
    c = np.random.randint(2, size=256)
    new_test_features[i,:] = np.concatenate((test_features[i], c), axis=None)

new_train_features = scaler.fit_transform(new_train_features)
new_test_features = scaler.fit_transform(new_test_features)



In [None]:
train_features = scaler.fit_transform(train_features)
test_features =  scaler.fit_transform(test_features)

In [None]:
error_plot_list =[]
l = [10,30,50,70,90,110,130,150,170,190,210,230,250]
for i in l:
    temp = PCA(new_train_features.T,i)
    train_x = temp.fit(new_train_features.T)
    test_x = temp.fit(new_test_features.T)
    error = KNN(train_x,train_label,test_x).error(test_label)
    error_plot_list.append((error,i))

In [None]:
y_list = [i[0] for i in error_plot_list]
x_list = [i[1] for i in error_plot_list]
plt.figure()
plt.title('Error curve of task 5')
plt.xlabel('reduced dimentions')
plt.ylabel('error rate')
plt.plot(x_list,y_list)
plt.show()

# svm

In [None]:
from sklearn.svm import SVC
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# build a svc without scaling the data

clf = SVC(C=1.0, 
          kernel='rbf',
          gamma='scale', 
          tol=0.01, 
          class_weight=None, 
          max_iter=-1)


X_train, X_test, y_train, y_test =\
                     train_features, test_features, train_label.ravel(), test_label.ravel()

# First scale the data, then build svc

# training svc using training dataset
clf.fit(X_train, y_train)

# predict test dataset

y_pred_with_scale = clf.predict(X_test)

print("Prediction report with scale:")
print(classification_report(y_test, y_pred_with_scale))