# Your info

Full name: Yasmin Madani

Student ID: 97532265

# Q1. Kohonen

In [33]:
# Q1_graded
# Do not change the above line.
import numpy as np
from copy import copy
import tensorflow as tf
import keras
import matplotlib.pyplot as plt
import matplotlib.animation
import matplotlib as mpl
from collections import Counter
from operator import itemgetter
from scipy import spatial
from sklearn.metrics import pairwise_distances_argmin_min
from IPython.display import HTML
mpl.rcParams['animation.embed_limit'] = 500
np.random.seed(42)
import random

SOM

In [34]:
# Q1_graded
class SOM_2D():
    def __init__(self, init_lr, init_radius, radius_floor=0.6, weights_size=(20,20),  num_frames=10, lr_decay = 1, rad_decay=5, num_train=4500, num_test=500, image_dims="square", epochs=10, problem="mnist",):
        x = np.load(open("/content/mnist_x.npy", "rb"))
        y = np.load(open("/content/mnist_y.npy", "rb"))
        num_train_cols = x.shape[1]
        tot = np.concatenate((x, y), axis=1)
        np.random.shuffle(tot)
        x = tot[:,:num_train_cols]
        y = tot[:,num_train_cols:]
        sep1 = num_train 
        self.data_x = x[0:sep1]
        self.data_y = y[0:sep1]
        self.test_data_x = x[sep1:sep1+num_test]
        self.test_data_y = y[sep1:sep1+num_test]
        self.train_ind, self.test_ind = 0,0
        self.num_train = len(self.data_x)
        self.num_test = len(self.test_data_x)
        
        self.weight_dims = (weights_size[0], weights_size[1], self.data_x.shape[1])
        self.weights = self.init_weights(self.weight_dims)
        self.image_dims = (int(np.sqrt(self.data_x.shape[1])), int(np.sqrt(self.data_x.shape[1]))) if image_dims=="square" else image_dims
        self.it_limit = epochs*num_train
        self.num_frames = num_frames
        self.plot_interval = self.it_limit // self.num_frames
        self.init_lr = init_lr
        self.lr_func = lambda t: self.init_lr*np.exp(-lr_decay*t/self.it_limit)
        self.init_radius = init_radius
        self.radius_func = lambda t: max([self.init_radius*np.exp(-rad_decay*t/self.it_limit), radius_floor])
        self.neigh_func = lambda dist, rad: np.exp((-(dist**2)/(2*rad**2)))
        self.node_winners = {i*self.weight_dims[0]+j: [] for i in range(self.weight_dims[0]) for j in range(self.weight_dims[1])}
        self.top_nodes = {n:(0,0) for n in range(self.data_y.shape[1])}
        
    def get_next_train(self):
        x, y = self.data_x[self.train_ind], self.data_y[self.train_ind]
        self.train_ind += 1
        if self.train_ind >= self.num_train:
            self.train_ind = 0
        return x, y
    
    def get_next_test(self):
        x, y = self.test_data_x[self.test_ind], self.test_data_y[self.test_ind]
        self.test_ind += 1
        if self.test_ind >= self.num_test:
            self.test_ind = 0
        return x, y
    
    def init_weights(self, dims):
        weights = np.random.rand(*dims)
        return weights

    def get_ord_dist(self, x, y):
        # Calculate euclidean distance in topology space between two (x,y)-coordinates.
        return np.linalg.norm(np.array(x)-np.array(y))
        
    def get_dist(self, x, y):
        # Calculate euclidean distance between two vectors.
        return np.linalg.norm(x-y)
    
    def int_to_xy(self, i):
        # Convert an integer representation of weight index to (x,y)-coordinates.
        return np.unravel_index(i, (self.weight_dims[0], self.weight_dims[1]))
    
    def update_weights(self, input_vector, bmu_index, t):
        # Update the weights for the winning neuron and its neighborhood according to neighborhood function.
        for i in range(self.weight_dims[0]):
            for j in range(self.weight_dims[1]):
                lr = self.lr_func(t)
                radius = self.radius_func(t)
                dist = self.get_ord_dist(bmu_index, (i,j))
                lamb  =  self.neigh_func(dist, radius) 
                self.weights[i,j] = self.weights[i,j] + lr*lamb*(input_vector-self.weights[i,j])
    
    def get_winner_OLD(self, node, weights):
        return min([((i,j), self.get_dist(node, weights[i][j])) for i in range(self.weight_dims[0]) for j in range(self.weight_dims[1])], key=lambda x: x[1])

    def get_winner(self, node):
        ind, dist = pairwise_distances_argmin_min(node.reshape(1,-1), self.weights.reshape(self.weight_dims[0]*self.weight_dims[1],-1), metric="cosine")
        return self.int_to_xy(ind[0]), dist[0] #(ind[0]//self.weight_dims[0], ind[0]-(ind[0]//self.weight_dims[0])*self.weight_dims[1]))
    
    def get_win_class(self, node):
        top_labels = [x[0] for x in self.top_nodes.values()]
        ind, dist = pairwise_distances_argmin_min(node.reshape(1,-1), self.weights.reshape(self.weight_dims[0]*self.weight_dims[1],-1)[top_labels], metric="cosine")
        return (self.int_to_xy(top_labels[ind[0]])), dist[0]
        
    def one_hot_to_int(self, onehot):
        # Convert a one-hot-vector to integer.
        return np.where(onehot==1)[0][0]
    
    def get_most_common(self, l):
        # Get the most common value from a list.
        try:
            res = max((Counter(l).most_common(1)[0] for e in l), key=itemgetter(1))[0]
        except ValueError:
            res = "NaN"
        return res
    
    def run(self):
        # Initialize the plot that will be updated in animation.
        fig, axmat = plt.subplots(self.weight_dims[0], self.weight_dims[1], figsize=(20,20), squeeze=True)
        self.ims = []
        axs = axmat.flatten()
        for i in range(self.weight_dims[0]):
            for j in range(self.weight_dims[1]):
                axs[i*self.weight_dims[0]+j].set_yticklabels([])
                axs[i*self.weight_dims[0]+j].set_xticklabels([])
                im = axs[i*self.weight_dims[0]+j].imshow(self.weights[i,j].reshape(self.image_dims), cmap="gray", animated=True)
                self.ims.append(im)
        fig.suptitle("Iteration: 0 Lr:  Radius:  ", size=30)
        
        def update(frame, som):
            for iteration in range(som.plot_interval):
                # Select a random sample for training
                n, label = som.get_next_train()
                # Find the neuron closest to the sample.
                bmu_index, bmu_dist = som.get_winner(n)
                if frame==0:
                    break
                # Update weights for the winning neuron and its neighborhood
                som.update_weights(n, bmu_index, (som.plot_interval*frame)+iteration)
                # Update dict to keep track of which cases a neuron has won.
                som.node_winners[bmu_index[0]*som.weight_dims[0]+bmu_index[1]].append(som.one_hot_to_int(label))
            # Calculate the majority label for a neuron.
            som.node_labels = {k: som.get_most_common(v) for k, v in som.node_winners.items()}
            # Get the learning rate and radius for the current iteration.
            lr = som.lr_func(som.plot_interval*frame)
            radius = som.radius_func(som.plot_interval*frame)
            # Calculate train and test accuracy.
            train_acc = som.get_train_acc()
            test_acc = som.get_test_acc()
            # Update plot title
            fig.suptitle("Iteration: "+str(frame*som.plot_interval)+ " Lr: {:0.2f}".format(lr) +
                         " Radius: {:0.2f}".format(radius)+"\n Train acc: {:0.2f}".format(train_acc)+" Test acc: {:0.2f}".format(test_acc) , size=50)
            # Update images from weight arrays and set title according to majority label.
            for i in range(som.weight_dims[0]):
                for j in range(som.weight_dims[1]):
                    som.ims[i*som.weight_dims[0]+j].set_array(som.weights[i,j].reshape(som.image_dims))
                    axs[i*som.weight_dims[0]+j].set_title(str(som.node_labels[i*som.weight_dims[0]+j]), size=12, fontweight="bold")
            return som.ims
        
        animation = mpl.animation.FuncAnimation(fig, func=update, blit=True, interval=100, frames=self.num_frames+1, fargs=[self]);
        plt.close(fig)
        return animation
    
    def get_train_acc(self):
        # Calculation fraction correct predictions on the training data given the majority label as a classifier.
        num_correct = 0
        for iteration in range(self.num_train):
            n, label = self.get_next_train()
            bmu_index, bmu_dist = self.get_winner(n)
            if self.one_hot_to_int(label) == self.node_labels[bmu_index[0]*self.weight_dims[0]+bmu_index[1]]:
                num_correct += 1
        acc = num_correct/self.num_train
        return acc
    
    def get_test_acc(self):
        # Calculation fraction correct predictions on the test data given the majority label as a classifier.
        num_correct = 0
        for iteration in range(self.num_test):
            n, label = self.get_next_test()
            bmu_index, bmu_dist = self.get_winner(n)
            # Check if the input vectors label matches the winning neurons majority label.
            if self.one_hot_to_int(label) == self.node_labels[bmu_index[0]*self.weight_dims[0]+bmu_index[1]]:
                num_correct += 1
        acc = num_correct/self.num_test
        return acc
    
    def test(self):
        # Plot a random sample from the test set and display the input image and weight of the winning neuron for that input.
        test, label = self.get_test_sample()
        fig, (inp_ax, win_ax) = plt.subplots(2, figsize=(10,10))
        inp_ax.imshow(test.reshape((self.image_dims[0], self.image_dims[1])), cmap="gray")
        true_label = self.one_hot_to_int(label)
        inp_ax.set_title(str(true_label))
        winner, dist = self.get_winner(test)
        win_ax.imshow(self.weights[winner].reshape((self.image_dims[0], self.image_dims[1])), cmap="gray")
        winners_label = self.node_labels[winner[0]*self.weight_dims[0]+winner[1]]
        win_ax.set_title(str(winners_label))
        plt.show()


In [35]:
# Q1_graded
mnistparams = {}
mnistsom = SOM_2D(0.2, 2.5, 0.50,(20,20),10,1.1,5,4500,500,"square",10)
anim = mnistsom.run()
#HTML(anim.to_jshtml())

# Q4. RBF



In [36]:
# Q4_graded
# Do not change the above line.
from re import X
from keras.models import Sequential
from keras.layers import Dense
import numpy as np
import random
import matplotlib.pylab as plt
from sklearn.model_selection import train_test_split

In [37]:
# Q4_graded
# Do not change the above line.
n=200
X_lst=[]
Y_lst=[]
def make_dataset(n):
  for i in range(0,n):
      x=random.uniform(0,1)
      X_lst.append(x)
      mu=random.uniform(-0.2,0.2)
      y=1/3 + 0.5*np.sin(3*x*np.pi)+mu
      Y_lst.append(y)

  X_train, X_test, y_train, y_test = train_test_split(X_lst, Y_lst,test_size=0.2)
  return (X_train, X_test, y_train, y_test)

X_train, X_test, y_train, y_test= make_dataset(n)
X= np.array(X_train)
Y = np.array(y_train)
X_Test= np.array(X_test)
Y_Test= np.array(y_test)

In [None]:
# Q4_graded
model = Sequential()
model.add(Dense(128, input_shape=(1,), activation='tanh'))
model.add(Dense(64, activation='tanh'))
model.add(Dense(32, activation='tanh'))
model.add(Dense(16, activation='tanh'))
model.add(Dense(1, activation='tanh'))
model.compile(loss='mean_squared_error', optimizer='Adam')
result=model.fit(X, Y, epochs=1000, batch_size=30,verbose=0)
y_predict = model.predict(X)
plt.plot(X, Y, 'bo', X, y_predict, 'go')
plt.ylabel('Y / Predicted Value')
plt.xlabel('X Value')
plt.show()
y_Test_predict = model.predict(X_Test)
plt.plot(X_Test, Y_Test, 'bo', X_Test, y_Test_predict, 'ro')
plt.ylabel('Y / Predicted Value')
plt.xlabel('X Test Value')
plt.show()

In [None]:
# Q4_graded
np.random.seed(1)
def rbf(x, center, radius):
  return np.exp(-1 / (2 * radius ** 2) * (x - center) ** 2)
def kmeans(X, k):
  clusters = np.random.choice(np.squeeze(X), size=k)
  prevClusters = clusters.copy()
  stds = np.zeros(k)
  converged = False

  while not converged:
    distances = np.squeeze(np.abs(X[:, np.newaxis] - clusters[np.newaxis, :]))
    closestCluster = np.argmin(distances, axis=1)

    for i in range(k):
      pointsForCluster = X[closestCluster == i]
      if len(pointsForCluster) > 0:
        clusters[i] = np.mean(pointsForCluster, axis=0)

    converged = np.linalg.norm(clusters - prevClusters) < 1e-6
    prevClusters = clusters.copy()

  distances = np.squeeze(np.abs(X[:, np.newaxis] - clusters[np.newaxis, :]))
  closestCluster = np.argmin(distances, axis=1)

  clustersWithNoPoints = []
  for i in range(k):
    pointsForCluster = X[closestCluster == i]
    if len(pointsForCluster) < 2:
      clustersWithNoPoints.append(i)
      continue
    else:
      stds[i] = np.std(X[closestCluster == i])

  if len(clustersWithNoPoints) > 0:
    pointsToAverage = []
    for i in range(k):
      if i not in clustersWithNoPoints:
        pointsToAverage.append(X[closestCluster == i])

    pointsToAverage = np.concatenate(pointsToAverage).ravel()
    stds[clustersWithNoPoints] = np.mean(np.std(pointsToAverage))

  return clusters, stds

class RBF():
  def __init__(self, k):
    self.k = k
    self.w = np.random.randn(k)
    self.b = np.random.randn(1)

    self.centers = None
    self.stds = None

    self.iterations = None
    self.learning_rate = None

  def predict(self, x):
    y_predict = []
    for i in range(x.shape[0]):
      z = np.array([rbf(x[i], center, sigma) for center, sigma, in zip(self.centers, self.stds)])
      a = z.T.dot(self.w) + self.b
      y_predict.append(a)
    return np.array(y_predict)

  def update_parameters(self, a, error):
    self.w = self.w - self.learning_rate * a * error
    self.b = self.b - self.learning_rate * error

  def fit(self, x, y, iterations, learning_rate):
    self.iterations = iterations
    self.learning_rate = learning_rate
    self.centers, self.stds = kmeans(x, self.k)

    # SGD
    for i in range(self.iterations):
      for j in range(x.shape[0]):
        z = np.array([rbf(x[j], center, sigma) for center, sigma, in zip(self.centers, self.stds)])
        a = z.T.dot(self.w) + self.b
        difference = y[j] - a
        loss = (difference.flatten() ** 2) / 2
        error = (-1) * difference.flatten()
        self.update_parameters(z, error)

ITERATIONS = 500
LEARNING_RATE = 0.01

my_rbf = RBF(k=3)
my_rbf.fit(X, Y, ITERATIONS, LEARNING_RATE) 
y_predict = my_rbf.predict(X) 
plt.plot(X, Y, 'bo', X, y_predict, 'ro')
plt.ylabel('Y / Predicted Value')
plt.xlabel('X Value')
plt.show()

y_predict = my_rbf.predict(X_Test) 
plt.plot(X_Test, Y_Test, 'bo', X_Test, y_predict, 'go')
plt.ylabel('Y / Predicted Value')
plt.xlabel('X_Test Value')
plt.show()

# <font color='red'>Submission</font>

1. Sign up in [Gradescope](https://www.gradescope.com) with proper name and student ID.
2. Fill in your full name (seperated by single spaces) and student ID in the beginning of this notebook.
3. After you're done with this notebook, you should do the following:
  - Clear all outputs of the notebook.
  ![clear all outputs](https://i.ibb.co/y6FrttB/Screen-Shot-2021-03-21-at-01-51-42.png)
  - Run all of the cells (if you skipped a question just leave the cell unchanged), and make sure all of your outputs are correct.
  ![run all](https://i.ibb.co/cgRcBZ0/Screen-Shot-2021-03-21-at-01-54-58.png)
  - Save your notebook.
  
  - If you're using Colab, download your notebook.
  ![download ipynb](https://i.ibb.co/2KxYM6K/Screen-Shot-2021-03-21-at-02-03-50.png)
  
  - Put the notebook file you just downloaded and `convert.py` in the same folder run the following command:
  ```bash
  python convert.py
  ```
  This will export your code for each question into a `.py` file.
    - **Note**: if you want to add more cells, add this to the **first** line of the cell:
  ```python
  # Q1_graded
  ```
  according to the question number.
  - There are 2 assignments in Gradescope:

    ![assignments](https://i.ibb.co/bdpVdvr/image.png)
  
    You should upload your **codes** and your **notebook** in `HW2` section and your final report for all of the questions as a **single pdf** file in `HW2 - Report`. Autograder will automatically check for:
    - `CI002_HW2.ipynb`
    - `Q1.py`
    - `Q4.py`
    - Your name and ID in the beginning of `.ipynb` file.

    It is important that you <font color='red'>**don't**</font> change the names of these files before submission.

4. If you pass the autograder, you're good to go.