## Machine Learning Materials

These materials are provided as part of the course credit transfer request for CS 6140. For this file, the main purpose is to demonstrate the implementation of machine learning algorithms and theory-level concepts in our assignments.

These are not all the assignments from the course, as some assessments were completed in virtual labs that are now inaccessible. However, I have tried to compile the available examples where we implemented algorithms by ourselves, rather than using external libraries directly.

---


### 1. K-Nearest Neighbors Implementation

#### Original Instructions

Please access the [full assignment - Non-parametric Models here](https://github.com/ynmu/ml-assignment-references/blob/main/Non-parametric%20Models.ipynb).


#### Module

Non-parametric Models, Supervised Learning

#### Description

In this assignment, we implemented our own KNN model from scratch instead of using the `KNeighborsClassifier` toolkit from `sklearn`. This was explicitly stated in the assignment requirement:

> Do not use Scikit-Learn's KNeighborsClassifier in this problem. We're implementing this ourselves.

The implementation includes the model constructor, along with the `classify`, `majority`, and `predict` methods. This process enhanced our understanding of KNN by reinforcing core concepts, especially in the `majority` method, where we count the frequency of each label and determine the most frequent label among the neighbors, which is central to KNN's functionality.



In [None]:
class KNN:
    """
    Class to store data for regression problems 
    """
    def __init__(self, x_train, y_train, K=5):
        """
        Creates a kNN instance

        :param x_train: numpy array with shape (n_rows,1)- e.g. [[1,2],[3,4]]
        :param y_train: numpy array with shape (n_rows,)- e.g. [1,-1]
        :param K: The number of nearest points to consider in classification
        """
        
        # Import and build the BallTree on training features 
        from sklearn.neighbors import BallTree
        self.balltree = BallTree(x_train)
        
        # Cache training labels and parameter K 
        self.y_train = y_train
        self.K = K 
        
    def majority(self, neighbor_indices, neighbor_distances=None):
        """
        Given indices of nearest neighbors in training set, return the majority label. 
        Break ties by considering 1 fewer neighbor until a clear winner is found. 

        :param neighbor_indices: The indices of the K nearest neighbors in self.X_train 
        :param neighbor_distances: Corresponding distances from query point to K nearest neighbors. 
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        if len(neighbor_indices) == 0:
            return

        yvals = self.y_train[neighbor_indices]
        frequency = {}

        if neighbor_distances is not None:
            weights = 1 / neighbor_distances
            for idx, val in enumerate(yvals[0]):
                frequency[val] = frequency.get(val, 0) + weights[idx]
        else:
            for val in yvals[0]:
                frequency[val] = frequency.get(val, 0) + 1

        max_val = max(frequency.values())
        max_labels = [key for key, val in frequency.items() if val == max_val]

        if len(max_labels) > 1:
            return max_labels[0]
        else:
            return max(frequency, key=frequency.get)

    def classify(self, x):
        """
        Given a query point, return the predicted label 
        
        :param x: a query point stored as an ndarray  
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here 
        dist, ind = self.balltree.query(x.reshape(1, -1), k = self.K)
        
        return self.majority(ind, dist)

    def predict(self, X):
        """
        Given an ndarray of query points, return yhat, an ndarray of predictions 

        :param X: an (m x p) dimension ndarray of points to predict labels for 
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        results = []

        for i in range(X.shape[0]):
            thispt = X[i, :].reshape(1, -1)
            results.append(self.classify(thispt))
        
        A = np.ndarray([])
        return np.append(results, A)[:-1]
        

---

### 2. AdaBoost Classifier Implementation

#### Original Instructions

Please access the [full assignment - Ensemble here](https://github.com/ynmu/ml-assignment-references/blob/main/Ensemble.ipynb).

#### Module

Ensemble methods, Supervised Learning

#### Description

In this assignment, we didn't use libraries like `sklearn.ensemble.AdaBoostClassifier`. Instead, we implemented our own AdaBoost classifier.

The implementation includes the model constructor, along with the `fit`, `error_rate`, `predict`, `score`, and `staged_score` methods. The requirement reviewed the process of the AdaBoost algorithm and key formulas thoroughly. For example, it was stated in the assignment:

> Recall that the model we attempt to learn in AdaBoost is given by 
>
> $$
> H({\bf x}) = \textrm{sign}\left[\displaystyle\sum_{k=1}^K\alpha_k h_k({\bf x}) \right]
> $$

This process helped us better understand ensemble learning by showing how to iteratively train weak learners, adjust their importance based on performance, and focus on the hardest-to-classify instances, ultimately combining weak learners to build a stronger classifier.



In [None]:
class AdaBoost:
    def __init__(self, n_learners=20, base=DecisionTreeClassifier(max_depth=3), random_state=1234):
        """
        Create a new adaboost classifier.
        
        Args:
            N (int, optional): Number of weak learners in classifier.
            base (BaseEstimator, optional): Your general weak learner 
            random_state (int, optional): set random generator.  needed for unit testing. 

        Attributes:
            base (estimator): Your general weak learner 
            n_learners (int): Number of weak learners in classifier.
            alpha (ndarray): Coefficients on weak learners. 
            learners (list): List of weak learner instances. 
        """
        
        np.random.seed(42)
        
        self.n_learners = n_learners 
        self.base = base
        self.alpha = np.zeros(self.n_learners)
        self.learners = []
        
    def fit(self, X_train, y_train):
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        w = np.ones(len(y_train), dtype=np.float128) / len(y_train)

        for k in range(self.n_learners):
            h = deepcopy(self.base)
            h.fit(X_train, y_train, sample_weight=w)
            
            y_pred = h.predict(X_train)
            errk = self.error_rate(y_train, y_pred, w)

            if errk == 0:
                alpha_k = np.inf
            elif errk == 1:
                alpha_k = -np.inf
            else:
                alpha_k = 0.5 * np.log((1 - errk) / errk)

            w *= np.exp(-alpha_k * y_train * y_pred)
            w /= np.sum(w)

            # store the alpha and the weak learner
            self.alpha[k] = alpha_k
            self.learners.append(h)
        
        return self  
            
    def error_rate(self, y_true, y_pred, weights):
        # =================================================================
        # TODO 

        # Implement the weighted error rate
        # =================================================================
        # Student code here
        
        numerator = copy.deepcopy(weights)
        for i in range(len(numerator)):
            numerator[i] = weights[i] * (y_true[i] != y_pred[i])
        
        error = numerator.sum() / weights.sum()
        
        return error
        
        
    def predict(self, X):
        """
        Adaboost prediction for new data X.
        
        Args:
            X (ndarray): [n_samples x n_features] ndarray of data 
            
        Returns: 
            yhat (ndarray): [n_samples] ndarray of predicted labels {-1,1}
        """

        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        yhat = np.zeros(X.shape[0])

        if len(self.learners) == 0:
            raise Exception("Training not started")

        for learner, alpha in zip(self.learners, self.alpha):
            yhat += alpha * learner.predict(X)

        yhat = np.sign(yhat)
        return yhat

    def score(self, X, y):
        """
        Computes prediction accuracy of classifier.  
        
        Args:
            X (ndarray): [n_samples x n_features] ndarray of data 
            y (ndarray): [n_samples] ndarray of true labels  
            
        Returns: 
            Prediction accuracy (between 0.0 and 1.0).
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        y_pred = self.predict(X)
        correct = np.sum(y_pred == y)
        return correct / len(y)
        
    
    def staged_score(self, X, y):
        """
        Computes the ensemble score after each iteration of boosting 
        for monitoring purposes, such as to determine the score on a 
        test set after each boost.
        
        Args:
            X (ndarray): [n_samples x n_features] ndarray of data 
            y (ndarray): [n_samples] ndarray of true labels  
            
        Returns: 
            scores (ndarary): [n_learners] ndarray of scores 
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        scores = []
        
        if len(self.learners) == 0:
            raise Exception("AdaBoost Classifier has not yet been trained! Please train first.")

        for i in range(len(self.learners)):
            yhat = np.zeros(X.shape[0])

            for j in range(i+1):
                yhat += self.alpha[j] * self.learners[j].predict(X)

            yhat = np.sign(yhat)

            correct = np.sum(yhat == y)
            scores.append(correct / len(y))

        return np.array(scores)

---

### 3. Random Forest Classifier Implementation

#### Original Instructions

Please access the [full assignment - Ensemble here](https://github.com/ynmu/ml-assignment-references/blob/main/Ensemble.ipynb).

#### Module

Ensemble methods, Supervised Learning

#### Description

In this assignment, we implemented our own Random Forest classifier, without using libraries like `sklearn.ensemble.RandomForestClassifier`. The algorithm involves training an ensemble of decision trees and aggregating their predictions through majority voting.

The implementation includes the model constructor, along with the `create_tree`, `predict`, and `score` methods. The requirement clearly outlined the process of training a Random Forest model and emphasized the following steps:

> Remember that training the random forest algorithms involves the following steps: 
> 
> `for k = 1 to K:`
> 
> $~~~~~~~~$ `a) Build the kth tree of depth d`
> 
> $~~~~~~~~$ `b) Train the kth tree on a subset of the dataset with random feature splits`
>
> Predicting the classification result on new data involves returning the majority vote by all the trees in the random forest.

#### Accompanying Written Exercises

Build a random forest classifier and train it on the MNIST data to classify 3s and 8s in the cell below. Then see how the classifier performs on the test data by computing the misclassification error. (Remember: error = 1-score)

What was the misclassification error for your random forest classifier? How did the misclassification error for the random forest classifier compare to the adaBoost classifier? 



In [None]:
class RandomForest():
    
    def __init__(self, x, y, sample_sz, n_trees=200, n_features='sqrt', max_depth=10, min_samples_leaf=5):
        """
        Create a new random forest classifier.
        
        Args:
            x : Input Feature vector
            y : Corresponding Labels
            sample_sz : Sample size
            n_trees : Number of trees to ensemble
            n_features : Method to select subset of features 
            max_depth : Maximum depth of the trees in the ensemble
            min_sample_leaf : Minimum number of samples per leaf 
        """
        np.random.seed(42)
        if n_features == 'sqrt':
            self.n_features = int(np.sqrt(x.shape[1]))
        elif n_features == 'log2':
            self.n_features = int(np.log2(x.shape[1]))
        else:
            self.n_features = n_features

        self.features_set = []
        self.x, self.y, self.sample_sz, self.max_depth, self.min_samples_leaf  = x, y, sample_sz, max_depth, min_samples_leaf
        self.trees = [self.create_tree(i) for i in range(n_trees)]

    def create_tree(self,i):
        """
        create a single decision tree classifier
        """
        
        idxs = np.random.permutation(len(self.y))[:self.sample_sz]
        idxs = np.asarray(idxs)

        f_idxs = np.random.permutation(self.x.shape[1])[:self.n_features]
        f_idxs = np.asarray(f_idxs)
        
        
        if i==0:
            self.features_set = np.array(f_idxs, ndmin=2)
        else:
            self.features_set = np.append(self.features_set, np.array(f_idxs,ndmin=2),axis=0)
        
        # =================================================================
        # TODO: build a decision tree classifier and train it with x and y that is a subset of data (use idxs and f_idxs)
        # =================================================================
        # Student code here
        
        clf = DecisionTreeClassifier(max_depth = self.max_depth, min_samples_leaf = self.min_samples_leaf)
        x, y = self.x[idxs, :][:, f_idxs], self.y[idxs]
        clf.fit(x, y)
        
        return clf
       
    def predict(self, x):
        
        # =================================================================
        # TODO: create a vector of predictions  and return
        # You will have to return the predictions of the final ensembles based on the individual trees' predicitons
        # =================================================================
        # Student code here

        res = np.zeros(x.shape[0])
        
        for i in range(x.shape[0]):
            predictions = {}
            for j in range(len(self.trees)):
                tree = self.trees[j]
                these_features = self.features_set[j]
                this_prediction = tree.predict(x[i, these_features].reshape(1, -1))[0]
                if this_prediction not in predictions.keys():
                    predictions.update({this_prediction : 1})
                else:
                    previous_count = predictions[this_prediction]
                    predictions.update({this_prediction : previous_count + 1})
                    
            current_best = (None, 0)
            for key, value in predictions.items():
                if value > current_best[1]:
                    current_best = (key, value)
                    
            res[i] = current_best[0]
        
        return res
    
    def score(self, X, y):
        # =================================================================
        # TODO: Compute the score using the predict function and true labels y
        # =================================================================
        # Student code here

        
        yhat = self.predict(X)
        correct_predictions = np.sum(yhat == y)
        total_predictions = len(y)
        acc = correct_predictions / total_predictions

        return acc
        

---

### 4. PCA Implementation

#### Original Instructions

Please access the [full assignment - PCA here](https://github.com/ynmu/ml-assignment-references/blob/main/PCA.ipynb).

#### Module

PCA, Unsupervised Learning

#### Description

In this assignment, we implemented our own PCA (Principal Component Analysis) algorithm instead of using existing libraries like `sklearn.decomposition.PCA`. This helped in gaining a deeper understanding of dimensionality reduction techniques and the mathematics behind PCA.

Before moving onto the implementation, the key points of the PCA algorithm were reviewed in the assignment requirement:

>
> The gist of PCA Algorithm to compute principal components is follows:
> - Calculate the covariance matrix X of data points.
> - Calculate eigenvectors and corresponding eigenvalues.
> - Sort the eigenvectors according to their eigenvalues in decreasing order.
> - Choose first k eigenvectors which satisfies target explained variance.
> - Transform the original data of shape m observations times n features into m observations times k selected features.
>

We also manually calculated the `mean` and `covariance` matrix, with the formulas we learnt from class:
$$
\mu_i = \frac{1}{m} \sum_{k=1}^{m} X_{k,i}
$$

$$
\text{Cov}(X_i, X_j) = \frac{1}{m-1} \sum_{k=1}^{m} (X_{k,i} - \mu_i)(X_{k,j} - \mu_j)
$$


This process helped us understand how PCA reduces the dimensionality of the dataset by projecting it onto the directions of maximum variance, while retaining the most important features based on the explained variance criterion.


In [None]:
class PCA:
    def __init__(self, target_explained_variance=None):
        """
        explained_variance: float, the target level of explained variance
        """
        self.target_explained_variance = target_explained_variance
        self.feature_size = -1

    def standardize(self, X):
        """
        standardize features using standard scaler
        :param X: input data with shape m (# of observations) X n (# of features)
        :return: standardized features (Hint: use skleanr's StandardScaler. Import any library as needed)
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here

        return StandardScaler().fit_transform(X)

    def compute_mean_vector(self, X_std):
        """
        compute mean vector
        :param X_std: transformed data
        :return n X 1 matrix: mean vector
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        m = X_std.shape[0]
        mean_vector = X_std.sum(axis=0) / m
        return mean_vector

    def compute_cov(self, X_std, mean_vec):
        """
        Covariance using mean, (don't use any numpy.cov)
        :param X_std:
        :param mean_vec:
        :return n X n matrix:: covariance matrix
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        m, n = X_std.shape
        cov_matrix = np.zeros((n, n))

        for i in range(n):
            for j in range(n):
                cov_ij = 0
                for k in range(m):
                    cov_ij += (X_std[k, i] - mean_vec[i]) * (X_std[k, j] - mean_vec[j])
                cov_matrix[i, j] = cov_ij / (m - 1) # normalize by m-1
        
        return cov_matrix

    def compute_eigen_vector(self, cov_mat):
        """
        Eigenvector and eigen values using numpy. Uses numpy's eigenvalue function
        :param cov_mat:
        :return: (eigen_values, eigen_vector)
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        return np.linalg.eig(cov_mat)

    def compute_explained_variance(self, eigen_vals):
        """
        sort eigen values and compute explained variance.
        explained variance informs the amount of information (variance)
        can be attributed to each of  the principal components.
        :param eigen_vals:
        :return: explained variance.
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        total = sum(eigen_vals)
        var_exp = [(i / total) for i in sorted(eigen_vals, reverse=True)]
        return var_exp


    def cumulative_sum(self, var_exp):
        """
        return cumulative sum of explained variance.
        :param var_exp: explained variance
        :return: cumulative explained variance
        """
        return np.cumsum(var_exp)

    def compute_weight_matrix(self, eig_pairs, cum_var_exp):
        """
        compute weight matrix of top principal components conditioned on target
        explained variance.
        (Hint : use cumilative explained variance and target_explained_variance to find
        top components)
        
        :param eig_pairs: list of tuples containing eigenvalues and eigenvectors, 
        sorted by eigenvalues in descending order (the biggest eigenvalue and corresponding eigenvectors first).
        :param cum_var_exp: cumulative expalined variance by features
        :return: weight matrix (the shape of the weight matrix is n X k)
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        matrix_w = []
        for i, (_, eig_vec) in enumerate(eig_pairs):
            if cum_var_exp[i] >= self.target_explained_variance:
                break
            matrix_w.append(eig_vec.reshape(-1, 1))
        return np.hstack(matrix_w)

    def transform_data(self, X_std, matrix_w):
        """
        transform data to subspace using weight matrix
        :param X_std: standardized data
        :param matrix_w: weight matrix
        :return: data in the subspace
        """
        return X_std.dot(matrix_w)

    def fit(self, X):
        """    
        entry point to the transform data to k dimensions
        standardize and compute weight matrix to transform data.
        The fit functioin returns the transformed features. k is the number of features which cumulative 
        explained variance ratio meets the target_explained_variance.
        :param   m X n dimension: train samples
        :return  m X k dimension: subspace data. 
        """
    
        self.feature_size = X.shape[1]
        
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        
        X_std = self.standardize(X)
        mean_vec = self.compute_mean_vector(X_std)
        cov_mat = self.compute_cov(X_std, mean_vec)
        
        eigen_vals, eigen_vecs = self.compute_eigen_vector(cov_mat)
        var_exp = self.compute_explained_variance(eigen_vals)
        cum_var_exp = self.cumulative_sum(var_exp)
        
        eig_pairs = [(np.abs(eigen_vals[i]), eigen_vecs[:, i]) for i in range(len(eigen_vals))]
        eig_pairs.sort(key=lambda x: x[0], reverse=True)
        
        matrix_w = self.compute_weight_matrix(eig_pairs, cum_var_exp)
        
        print(f"weight matrix dimensions: {len(matrix_w)} x {len(matrix_w[0])}")
        return self.transform_data(X_std=X_std, matrix_w=matrix_w)


After implementing our own PCA algorithm, we were also required to compare our implementation with the external library to validate the preformance. Below is quoted from the assignment instructions:

> **Result Comparison with Sklearn**
>
> The below cells should help you compare the output from your implementation against the sklearn implementation with a similar configuration. This is solely to help you validate your work. 

In [None]:
# Sklearn implementation to compare your results

from sklearn.decomposition import PCA as ExternalPCA
from sklearn.preprocessing import StandardScaler

# Scale data before applying PCA
scaling = StandardScaler()
 
# Use fit and transform method
# You may change the variable X if needed to verify against a different dataset
print("Sample data:", X)
scaling.fit(X)
Scaled_data=scaling.transform(X)
print("\nScaled data:", Scaled_data)
 
# Set the n_components=3
principal=ExternalPCA(n_components=2)
principal.fit(Scaled_data)
x=principal.transform(Scaled_data)
 
# Check the dimensions of data after PCA
print("\nTransformed Data", x)

---

### 5. Recommender Systems

#### Original Instructions

Please access the [full assignment - Recommender System here](https://github.com/ynmu/ml-assignment-references/blob/main/Recommender%20Systems.ipynb).


#### Module

Recommender Systems, Unsupervised Learning

#### Description

In this assignment, we implemented various recommender system algorithms without relying on pre-built libraries. In summary, we built the baseline `class RecSys`, and built `class ContentBased` and `class Collaborative` which inherit `class RecSys` and further add methods calculating item-item similarity matrix.

Due to the extensive amount of code in this module, I am not including the full code here. Notably, we also reviewed some fundamental concepts like Cosine Similarity and Jaccard Similarity, with the formulas given:

$$Cossim(A, B) = \frac{(A · B)}{(||A|| · ||B||)}$$
and
$$Jacsim(A, B) = \frac{|A ∩ B|}{|A ∪ B|}$$

#### Accompanying Written Exercises

Discuss which method(s) work better than others and why.

Write your response in the textbox below. 

In [None]:
class RecSys():
    def __init__(self,data):
        self.data=data
        self.allusers = list(self.data.users['uID'])
        self.allmovies = list(self.data.movies['mID'])
        self.genres = list(self.data.movies.columns.drop(['mID', 'title', 'year']))
        self.mid2idx = dict(zip(self.data.movies.mID,list(range(len(self.data.movies)))))
        self.uid2idx = dict(zip(self.data.users.uID,list(range(len(self.data.users)))))
        self.Mr=self.rating_matrix()
        self.Mm=None 
        self.sim=np.zeros((len(self.allmovies),len(self.allmovies)))
        
    def rating_matrix(self):
        """
        Convert the rating matrix to numpy array of shape (#allusers,#allmovies)
        """
        ind_movie = [self.mid2idx[x] for x in self.data.train.mID] 
        ind_user = [self.uid2idx[x] for x in self.data.train.uID]
        rating_train = list(self.data.train.rating)
        
        return np.array(coo_matrix((rating_train, (ind_user, ind_movie)), shape=(len(self.allusers), len(self.allmovies))).toarray())


    def predict_everything_to_3(self):
        """
        Predict everything to 3 for the test data
        """
        # Generate an array with 3s against all entries in test dataset
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        return np.full(len(self.data.test), 3)
        
        
    def predict_to_user_average(self):
        """
        Predict to average rating for the user.
        Returns numpy array of shape (#users,)
        """
        # Generate an array as follows:
        # 1. Calculate all avg user rating as sum of ratings of user across all movies/number of movies whose rating > 0
        # 2. Return the average rating of users in test data
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        user_avg = self.Mr.sum(axis=1) / (self.Mr > 0).sum(axis=1)

        test_users = self.data.test['uID'].map(self.uid2idx)
        return user_avg[test_users]
    
    
    def predict_from_sim(self,uid,mid):
        """
        Predict a user rating on a movie given userID and movieID
        """
        # Predict user rating as follows:
        # 1. Get entry of user id in rating matrix
        # 2. Get entry of movie id in sim matrix
        # 3. Employ 1 and 2 to predict user rating of the movie
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        user_idx = self.uid2idx[uid]
        movie_idx = self.mid2idx[mid]
        
        uf = self.Mr[user_idx]
        mf = self.sim[movie_idx]
        
        return np.dot(uf, mf) / np.dot(mf, uf > 0)
    
    
    def predict(self):
        """
        Predict ratings in the test data. Returns predicted rating in a numpy array of size (# of rows in testdata,)
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        predictions = []

        for _, row in self.data.test.iterrows():
            uid, mid = row['uID'], row['mID']
            predictions.append(self.predict_from_sim(uid, mid))

        return np.array(predictions)
    
    
    def rmse(self,yp):
        yp[np.isnan(yp)] = 3 #In case there is nan values in prediction, it will impute to 3.
        yt = np.array(self.data.test.rating)
        return np.sqrt(((yt-yp)**2).mean())

    
class ContentBased(RecSys):
    def __init__(self,data):
        super().__init__(data)
        self.data=data
        self.Mm = self.calc_movie_feature_matrix()  
        
    def calc_movie_feature_matrix(self):
        """
        Create movie feature matrix in a numpy array of shape (#allmovies, #genres) 
        """
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        return self.data.movies[self.genres].values
    
    def calc_item_item_similarity(self):
        """
        Create item-item similarity using Jaccard similarity
        """
        # Update the sim matrix by calculating item-item similarity using Jaccard similarity
        # Jaccard Similarity: J(A, B) = |A∩B| / |A∪B| 
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        n = len(self.allmovies) 
        X = self.Mm.T
        csr0 = csr_matrix((X > 0).astype(int))
        intersec = np.array(csr0.T.dot(csr0).toarray()).astype(int)   
        rowsum = X.sum(axis=0)
        rsumtile = np.repeat(rowsum[:, np.newaxis], n, axis=1)
        union = rsumtile.T + rsumtile - intersec
        # avoid division by zero
        self.sim = np.divide(intersec, union, where=(union != 0))
        # self-similarity
        np.fill_diagonal(self.sim, 1)
        
                
class Collaborative(RecSys):    
    def __init__(self,data):
        super().__init__(data)
        
    def calc_item_item_similarity(self, simfunction, *X):  
        """
        Create item-item similarity using similarity function. 
        X is an optional transformed matrix of Mr
        """    
        # General function that calculates item-item similarity based on the sim function and data inputed
        if len(X)==0:
            self.sim = simfunction()            
        else:
            self.sim = simfunction(X[0]) # *X passes in a tuple format of (X,), to X[0] will be the actual transformed matrix

    def cossim(self):    
        """
        Calculates item-item similarity for all pairs of items using cosine similarity (values from 0 to 1) on utility matrix
        Returns a cosine similarity matrix of size (#all movies, #all movies)
        """
        # Return a sim matrix by calculating item-item similarity for all pairs of items using Jaccard similarity
        # Cosine Similarity: C(A, B) = (A.B) / (||A||.||B||) 
        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        t0 = time.perf_counter()

        mu = self.Mr.sum(axis=1) / (self.Mr>0).sum(axis=1)
        mu_array = np.repeat(np.expand_dims(mu, axis=1), self.Mr.shape[1], axis=1)
        X = self.Mr+(self.Mr==0) * mu_array-mu_array 
        
        norm_X = np.linalg.norm(X, axis=0, keepdims=True)
        Y = np.divide(X, norm_X, where=(norm_X != 0))

        cos_sim = np.dot(Y.T, Y)
        # fill the diagonal with 1
        for i in range(len(self.allmovies)):
            cos_sim[i, i]=1

        print('Total time:', time.perf_counter() - t0)
        return 0.5 + 0.5 * cos_sim
    
    def jacsim(self,Xr):
        """
        Calculates item-item similarity for all pairs of items using jaccard similarity (values from 0 to 1)
        Xr is the transformed rating matrix.
        """    
        # Return a sim matrix by calculating item-item similarity for all pairs of items using Jaccard similarity
        # Jaccard Similarity: J(A, B) = |A∩B| / |A∪B| 

        # =================================================================
        # TODO
        # =================================================================
        # Student code here
        n = Xr.shape[1]
        max_rating = int(Xr.max())

        if max_rating > 1:
            intersec = np.zeros((n, n)).astype(int)
            for i in range(1, max_rating + 1):
                csr = csr_matrix((Xr == i).astype(int))
                intersec = intersec + np.array(csr.T.dot(csr).toarray()).astype(int)

        csr_binary = csr_matrix((Xr > 0).astype(int))
        intersec_binary = np.array(csr_binary.T.dot(csr_binary).toarray()).astype(int)

        A = (Xr > 0).astype(bool)
        rowsum = A.sum(axis=0)
        rsumtile = np.repeat(rowsum.reshape((n, 1)), n, axis=1)
        union = rsumtile.T + rsumtile - intersec_binary

        if max_rating > 1:
            jac_sim = intersec / union
        else:
            jac_sim = intersec_binary / union

        if np.isnan(jac_sim).sum() > 0:
            jac_sim[np.isnan(jac_sim)] = 0
            for i in range(n):
                jac_sim[i, i] = 1

        return jac_sim

---

### 6. Feedforward Neural Network

#### Original Instructions

Please access the [full assignment - Neural Network here](https://github.com/ynmu/ml-assignment-references/blob/main/Neural%20Network.ipynb).

#### Module

Neural Networks, Deep Learning

#### Description


In this assignment, we implemented a general feed-forward neural network from scratch, without relying on deep learning libraries like TensorFlow or PyTorch. The network utilized sigmoid activation functions and included key methods for `forward propagation`, `back propagation`, and `train` using stochastic gradient descent (SGD). 

The assignment provided the signatures of the neural network class, which we were tasked with completing. We specifically focused on the implementation of forward propagation through layers of the network, calculating gradients during backpropagation, and updating the weights and biases using SGD.

The assignment instructions stated:

> 
> In this problem you'll implement a general feed-forward neural network class that utilizes sigmoid activation functions. Your tasks will be to implement `forward propagation`, `prediction`, `back propagation`, and a general train routine to learn the weights in your network via stochastic gradient descent.
> 
> The skeleton for the network class is below. Befor filling out the codes below, read the PART X instruction. The place you will complete the code is indicated as "TODO" in the code. Pleaes do not modify other parts of the code.


The completed model also included helper functions such as gradient checking (`gradient_check`) to ensure that backpropagation was implemented correctly, and visualization (`pretty_pictures`) to plot decision boundaries and show the network's progress during training.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import colorConverter, ListedColormap
%matplotlib inline

class Network:
    def __init__(self, sizes):
        """
        Initialize the neural network 
        
        :param sizes: a list of the number of neurons in each layer 
        """
        # save the number of layers in the network 
        self.L = len(sizes) 
        self.sizes = sizes  
        
        # bias vectors and matrices of weights
        self.b = [np.random.randn(n, 1) for n in self.sizes[1:]]
        self.W = [np.random.randn(n, m) for (m, n) in zip(self.sizes[:-1], self.sizes[1:])]
        
        # derivatives of biases and weights for backprop
        self.db = [np.zeros((n, 1)) for n in self.sizes[1:]]
        self.dW = [np.zeros((n, m)) for (m,n) in zip(self.sizes[:-1], self.sizes[1:])]
        
        # activities and activations
        self.z = [np.zeros((n, 1)) for n in self.sizes]
        self.a = [np.zeros((n, 1)) for n in self.sizes]

        # deltas
        self.delta = [np.zeros((n, 1)) for n in self.sizes]
        
    def g(self, z):
        """
        sigmoid activation function 
        
        :param z: vector of activities to apply activation to 
        """
        return 1.0 / (1.0 + np.exp(-z))
    
    def g_prime(self, z):
        """
        derivative of sigmoid activation function 
        
        :param z: vector of activities to apply derivative of activation to 
        """
        return self.g(z) * (1.0 - self.g(z))
    
    def grad_loss(self, a, y):
        """
        evaluate gradient of cost function for squared-loss C(a,y) = (a-y)^2/2 
        
        :param a: activations on output layer 
        :param y: vector-encoded label 
        """
        return (a - y)
    
    def forward_prop(self, x):
        """
        take an feature vector and propagate through network 
        
        :param x: input feature vector 
        """
        if len(x.shape) == 1:
            x = x.reshape(-1, 1)
        # =================================================================
        # TODO: step 1. Initialize activation on initial layer to x 
        # =================================================================
        # Student code here
        self.a[0] = x
        
        
        # =================================================================
        # TODO: step 2-4. Loop over layers and compute activities and activations 
        # Use Sigmoid activation function defined above
        # =================================================================
        # Student code here
        for ll in range(self.L - 1):
            self.z[ll + 1] = np.dot(self.W[ll], self.a[ll]) + self.b[ll]
            self.a[ll + 1] = self.g(self.z[ll + 1])
        
        
    def back_prop(self, x, y):
        """
        Back propagation to get derivatives of C wrt weights and biases for given training example
        
        :param x: training features  
        :param y: vector-encoded label 
        """
        
        if len(y.shape) == 1:
            y = y.reshape(-1, 1)

        # =================================================================
        # TODO: step 1. forward prop training example to fill in activities and activations 
        # =================================================================
        # Student code here
        self.forward_prop(x)

        # =================================================================
        # TODO: step 2. compute deltas on output layer (Hint: python index numbering starts from 0 ends at N-1)
        # Correction in Instructions: From the instructions mentioned below for backward propagation,
        # Use normal product instead of dot product in Step 2 and 6
        # The derivative and gradient functions have already been implemented for you
        # =================================================================
        # Student code here
        self.delta[self.L - 1] = self.g_prime(self.z[self.L - 1]) * self.grad_loss(self.a[self.L - 1], y)

        # =================================================================
        # TODO: step 3-6. loop backward through layers, backprop deltas, compute dWs and dbs
        # =================================================================
        # Student code here
        for ll in range(self.L - 2, -1, -1):
            self.dW[ll] = np.dot(self.delta[ll + 1], self.a[ll].T)
            self.db[ll] = self.delta[ll + 1]
            self.delta[ll] = np.dot(self.W[ll].T, self.delta[ll + 1]) * self.g_prime(self.z[ll])
        
            
    def train(self, X_train, y_train, X_valid=None, y_valid=None,
              eta=0.25, num_epochs=10, isPrint=True, isVis=False):
        """
        Train the network with SGD 
        
        :param X_train: matrix of training features 
        :param y_train: matrix of vector-encoded labels 
        """
        # initialize shuffled indices 
        shuffled_inds = list(range(X_train.shape[0]))
        
        # loop over training epochs (step 1.)
        for ep in range(num_epochs):
            
            # shuffle indices 
            np.random.shuffle(shuffled_inds)
            
            # loop over training examples (step 2.) 
            for ind in shuffled_inds: 
                # =================================================================
                # TODO
                # =================================================================
                # Student code here
                
                
                # TODO: step 3. back prop to get derivatives 
                # your code here
                x, y = X_train[ind], y_train[ind]
                
                
                # TODO: step 4. update all weights and biases for all layers
                # your code here
                for l in range(self.L-1):
                    self.W[l] -= eta * self.dW[l]
                    self.b[l] -= eta * self.db[l]
                
                
            # print mean loss every 10 epochs if requested 
            if isPrint and (ep % 10) == 0:
                print("epoch {:3d}/{:3d}: ".format(ep, num_epochs), end="")
                print("  train loss: {:8.3f}".format(self.compute_loss(X_train, y_train)), end="")
                if X_valid is not None:
                    print("  validation loss: {:8.3f}".format(self.compute_loss(X_valid, y_valid)))
                else:
                    print("")
                    
            if isVis and (ep % 20) == 0:
                self.pretty_pictures(X_train, y_train, decision_boundary=True, epoch=ep)


    # other methods in the starter code..

#### Accompanying Written Exercises

**Part A**

State whether a single-layer perceptron can learn the concepts. Briefly justify your response by providing weights and biases as applicable.

**Note:** Consider learning the following concepts with either a single-layer or multilayer perceptron where all hidden and output neurons utilize indicator activation functions.

- NOT $x_1$

- $x_1$ NOR $x_2$

- $x_1$ XNOR $x_2$ (output 1 when $x_1$ = $x_2$ and 0 otherwise.)


**Part B**

Determine an architecture and specific values of the weights and biases in a single-layer or multilayer perceptron with indicator activation functions that can learn $x_1$ XNOR $x_2$. Describe your architecture and state your weight matrices and bias vectors.

Use the text box below for your solution.