this jupyter notebook contains my implementations of work from COMP 551: Applied Machine Learning at McGill University. It contains a decision tree, multilayer perceptron, and textual data analysis.


# Decision Tree


In [None]:
# calculate the gini index/impurity classification cost
def gini_index(classes):
    class_probabilities = (
        np.bincount(classes) / classes.shape[0]
    )  # count the number of each class and divide it by the total number of classes (calculate class probabilities/proportions)
    return 1 - np.sum(np.square(class_probabilities))


# calculate the entropy classification cost
def entropy(classes):
    class_probabilities = (
        np.bincount(classes) / classes.shape[0]
    )  # calculate the class probabilities
    non_zero_indices = np.nonzero(
        class_probabilities
    )  # remove all probabilities that are 0 to prevent error with log(0)
    return -np.sum(
        class_probabilities[non_zero_indices]
        * np.log2(class_probabilities[non_zero_indices])
    )  # calculate the entropy


def misclassification(classes):
    class_probabilities = np.bincount(classes) / classes.shape[0]
    return 1 - np.max(class_probabilities)


class Node:  # assume it is a leaf, else add parent information
    def __init__(self, data_indices, parent):
        self.left_node = None  # store the left node of the parent node
        self.right_node = None  # store the right node of the parent node
        self.data_indices = data_indices  # store the data indices to access the data instances under/associated with this node
        self.split_feature = None  # store the split feature for this node
        self.split_threshold = (
            None  # store the split threshold/value of the feature for this node
        )
        if parent:
            self.depth = parent.depth + 1  # store the depth and increase it by 1
            self.num_classes = (
                parent.num_classes
            )  # store the number of classes for this node


class DecisionTree:  # Adjust how the num_classes works
    def __init__(
        self,
        num_classes=None,
        max_depth=10,
        min_leaf_instances=1,
        cost_function=gini_index,
    ):
        self.root = None  # initialize the root as empty
        self.num_classes = int(num_classes)  # store the number of classes parameter
        self.max_depth = max_depth  # store the max depth paramater
        self.cost_function = cost_function  # store the cost function to be used
        self.min_leaf_instances = (
            min_leaf_instances  # store the mininum leaf instances parameter
        )

    def fit(self, X, y):
        self.X = X  # store the data
        self.y = y.astype(int)  # store the labels/classes
        self.root = Node(
            data_indices=np.arange(X.shape[0]), parent=None
        )  # initialize the root node with necessary data indices
        self.root.depth = 0  # initialize the root depth at 0
        self.root.num_classes = self.num_classes  # store the number of classes
        class_probabilities = np.bincount(
            self.y[self.root.data_indices], minlength=self.num_classes
        )  # calculate the class probabilities
        self.root.class_probabilities = class_probabilities / np.sum(
            class_probabilities
        )  # store the class probabiltiies in the root
        self.__fit_tree(self.root)  # build the decision tree from the input data
        return self

    # greedy test algorithm derived from the class slides and Decision Tree CoLab example
    def __greedy_test(self, node):
        best_cost = np.inf  # set the best cost to the maximum possible value
        best_feature = None  # set the split feature to None
        best_threshold = None  # set the split threshold/value to None
        num_instances, num_features = self.X[
            node.data_indices
        ].shape  # store the number of data instances and the amount of features of the dataset

        for feature in range(num_features):  # loop through each feature
            feature_values = self.X[
                node.data_indices, feature
            ]  # store the values/instances associated with this node for this feature
            unique_instances = np.unique(
                feature_values
            )  # get the unique values for this feature
            if len(unique_instances > 1):  # check if the number of unique values > 1
                test_thresholds = (
                    unique_instances[0:-1] + unique_instances[1:]
                ) / 2  # take the average of consecutive unique values
            else:
                test_thresholds = unique_instances  # if there is only one unique value set it as the test threshold

            for test_threshold in unique_instances:  # loop through each test threshold
                left_indices = node.data_indices[
                    feature_values <= test_threshold
                ]  # store the data indices where the associated feature instance <= test_threshold (boolean array indexing)
                right_indices = node.data_indices[
                    feature_values > test_threshold
                ]  # store the data indices where the associated feature instance > test_threshold (boolean array indexing)

                num_left = len(left_indices)  # count the number of left node indices
                num_right = len(right_indices)  # count the number of right node indices

                if (
                    num_left == 0 or num_right == 0
                ):  # if either is empty, skip this value, there's no extra node to add at this threshold
                    continue

                cost_left = self.cost_function(
                    self.y[left_indices]
                )  # calculate the cost for the classes associated with the left data split
                cost_right = self.cost_function(
                    self.y[right_indices]
                )  # calculate the cost for the classes associated with the right data split

                total_cost = (
                    ((num_left * cost_left) + (num_right * cost_right)) / num_instances
                )  # calculate the total cost with apporpriate "weight" given to the left and right indices respectively

                # if the calculated total cost is less than the best cost, store the new values/indices associated with it
                if total_cost < best_cost:
                    best_cost = total_cost
                    best_feature = feature
                    best_threshold = test_threshold

        return best_cost, best_feature, best_threshold

    def __fit_tree(self, node):
        # if we've reached the max depth or the amount of data indices at this node are <= min leaf instances, then stop building the tree
        if (
            node.depth == self.max_depth
            or len(node.data_indices) <= self.min_leaf_instances
        ):
            return

        # find the split cost, split feature, and split threshold value foer this node
        best_cost, best_feature, best_threshold = self.__greedy_test(node)

        # if the cost is still the max value, stop
        if np.isinf(best_cost):
            return

        # find all data instances <= split threshold
        test_logicals_1 = self.X[node.data_indices, best_feature] <= best_threshold
        # find all data instances > split threshold
        test_logicals_2 = self.X[node.data_indices, best_feature] > best_threshold

        node.split_feature = best_feature  # store the split feature for this node
        node.split_threshold = best_threshold  # store the split value for this node

        # innitalize the left node for this node
        left_node = Node(
            data_indices=node.data_indices[test_logicals_1], parent=node
        )  # filter the data indices based on the corresponding data indices <= split value
        class_probabilities = np.bincount(
            self.y[left_node.data_indices], minlength=self.num_classes
        )
        left_node.class_probabilities = class_probabilities / np.sum(
            class_probabilities
        )  # calculate the class probabilities for the left node and store it

        # innitalize the right node for this node
        right_node = Node(
            data_indices=node.data_indices[test_logicals_2], parent=node
        )  # filter the data indices based on the corresponding data indices > split value
        num_classes_right = np.unique(self.y[right_node.data_indices])
        class_probabilities = np.bincount(
            self.y[right_node.data_indices], minlength=self.num_classes
        )
        right_node.class_probabilities = class_probabilities / np.sum(
            class_probabilities
        )  # calculate the class probabilities for the right node and store it

        self.__fit_tree(left_node)  # continue to build the tree with the left node
        self.__fit_tree(right_node)  # continue to build the tree with the right node

        node.left_node = (
            left_node  # store the newly initialized left node as this node's left node
        )
        node.right_node = right_node  # store the newly initialized right node as this node's right node

    def predict(self, new_data):
        counter = 0  # initialize counter as 0
        class_probabilities = np.zeros((new_data.shape[0], self.num_classes))
        for datum in new_data:  # get each new data instance we want to classify
            node = self.root  # set the current node to the tree's root

            while node.left_node:  # white there is a left node to explore
                if (
                    datum[node.split_feature] <= node.split_threshold
                ):  # if the value of the new data at the split feature < split threshold
                    node = node.left_node  # then set the current node to the left node
                else:
                    node = (
                        node.right_node
                    )  # else set the current node to the right node

            # once a leaf/max depth has been reached set the class probabilities for the new data instance as the class probabilities associated with the leaf node
            class_probabilities[counter, :] = node.class_probabilities
            counter += 1  # increment counter by 1
        return class_probabilities

    # evaluate the accuracy of the predicted values
    def evaluate_accuracy(self, y_test, y_real):
        return np.sum(y_test == y_real) / y_real.shape[0]

# Multilayer Perceptron


In [None]:
# base classes
class NeuralNetLayer:
    def __init__(self):
        self.gradient = None
        self.parameters = None

    def forward(self, x):
        raise NotImplementedError

    def backward(self, gradient):
        raise NotImplementedError


# Linear Calculations x.T dot w + b
class LinearLayer(NeuralNetLayer):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.ni = input_size
        self.no = output_size
        self.w = np.random.randn(output_size, input_size)
        self.b = np.random.randn(output_size)
        self.cur_input = None
        self.parameters = [self.w, self.b]

    def forward(self, x):
        self.cur_input = x
        return (self.w[None, :, :] @ x[:, :, None]).squeeze() + self.b

    def backward(self, gradient):
        assert self.cur_input is not None, "Must call forward before backward"
        # dw = gradient.dot(self.cur_input)
        dw = gradient[:, :, None] @ self.cur_input[:, None, :]
        db = gradient
        self.gradient = [dw, db]
        return gradient.dot(self.w)


# ReLU activation function
class ReLULayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        self.gradient = np.where(x > 0, 1.0, 0.0)
        return np.maximum(0, x)

    def backward(self, gradient):
        assert self.gradient is not None, "Must call forward before backward"
        return gradient * self.gradient


# Tanh activation function
class TanhLayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        tanh = lambda a: (np.exp(a) - np.exp(-a)) / (np.exp(a) + np.exp(-a))
        self.gradient = 1 - np.square(tanh(x))
        return tanh(x)

    def backward(self, gradient):
        assert self.gradient is not None, "Must call forward before backward"
        return gradient * self.gradient


class LeakyReLULayer(NeuralNetLayer):
    def __init__(self, leakage_coefficient):
        super().__init__()
        self.leak_coef = leakage_coefficient

    def forward(self, x):
        self.gradient = np.where(x > 0, 1.0, self.leak_coef)
        return np.maximum(x, 0) + self.leak_coef * np.minimum(0, x)

    def backward(self, gradient):
        assert self.gradient is not None, "Must call forward before backward"
        return gradient * self.gradient


# Softmax output for multiclass predictions
class SoftmaxOutputLayer(NeuralNetLayer):
    def __init__(self):
        super().__init__()
        self.cur_probs = None

    def forward(self, x):
        x_small = x - np.max(x, axis=-1, keepdims=True)
        exps = np.exp(x_small)
        probs = exps / np.sum(exps, axis=-1)[:, None]
        self.cur_probs = probs
        return probs

    def backward(self, target):
        assert self.cur_probs is not None, "Must call forward before backward"
        return self.cur_probs - target


class MLP:
    def __init__(
        self,
        num_features,
        num_classes,
        activation_function,
        num_hidden_layers,
        num_hidden_units,
    ):  # option for activation function are ['relu', 'tanh', 'leaky']
        assert activation_function in [
            "relu",
            "tanh",
            "leaky",
        ], "options for activation function are ['relu', 'tanh', 'leaky']."
        assert num_hidden_layers == len(
            num_hidden_units
        ), "the length of the hidden units array must match the number of hidden layers"

        if num_hidden_layers == 0:
            self.layers = [
                LinearLayer(input_size=num_features, output_size=num_classes)
            ]
            self.layers.append(SoftmaxOutputLayer())
        else:
            # Initialize the input layer
            self.layers = [LinearLayer(num_features, num_hidden_units[0])]
            if activation_function == "relu":
                self.layers.append(ReLULayer())
            elif activation_function == "tanh":
                self.layers.append(TanhLayer())
            elif activation_function == "leaky":
                self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))

            # Initialize each hidden layer
            for i in range(0, num_hidden_layers - 1):
                self.layers.append(
                    LinearLayer(
                        input_size=self.layers[i * 2].no,
                        output_size=num_hidden_units[i],
                    )
                )
                if activation_function == "relu":
                    self.layers.append(ReLULayer())
                elif activation_function == "tanh":
                    self.layers.append(TanhLayer())
                elif activation_function == "leaky":
                    self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))

            # Initialize the output layer
            self.layers.append(
                LinearLayer(input_size=num_hidden_units[-1], output_size=num_classes)
            )
            self.layers.append(SoftmaxOutputLayer())

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target)

    def fit(self, X, y, optimizer, epochs=10000, mini_batch_size=10, verbose=True):
        # No need to perform one-hot encoding since it's done within the dataset preprocessing

        losses = []  # This now records the loss of each minibatch

        # Shuffle the training dataset
        shuffler = np.random.permutation(X.shape[0])
        X = X[shuffler]
        y = y[shuffler]

        number_of_mini_batch = math.floor(X.shape[0] / mini_batch_size)
        leftover = X.shape[0] % mini_batch_size

        # batch_slice
        for i in tqdm(range(epochs)):
            losses_batch = []
            for batch_index in range(number_of_mini_batch):
                # Get a batch per slice
                if batch_index != (number_of_mini_batch - 1):
                    batch_slice = slice(
                        batch_index * mini_batch_size,
                        (batch_index + 1) * mini_batch_size,
                    )
                else:
                    batch_slice = slice(
                        batch_index * mini_batch_size,
                        (batch_index + 1) * mini_batch_size + leftover,
                    )

                X_batch = X[batch_slice]
                y_batch = y[batch_slice]

                # Forward
                epsilon = 1e-8
                y_hat_batch = self.forward(X_batch)
                y_hat_batch = y_hat_batch + epsilon
                y_hat_batch[y_hat_batch > 1] = 1
                loss_per_batch = -(y_batch * np.log(y_hat_batch)).sum(axis=-1).mean()
                losses_batch.append(loss_per_batch)
                # Backward
                self.backward(y_batch)
                optimizer.step()
            loss_epoch_avg = np.mean(losses_batch)
            losses.append(loss_epoch_avg)

        plt.plot(losses)
        plt.xlabel("Epoch")
        plt.ylabel("Cross entropy loss")

    def predict(self, X):
        y_hat = self.forward(X)
        return y_hat


class Optimizer:
    def __init__(self, net: MLP):
        self.net = net

    def step(self):
        for layer in self.net.layers[::-1]:
            if layer.parameters is not None:
                self.update(layer.parameters, layer.gradient)

    def update(self, params, gradient):
        raise NotImplementedError


class GradientDescentOptimizer(Optimizer):
    def __init__(self, net: MLP, lr: float):
        super().__init__(net)
        self.lr = lr

    def update(self, params, gradient):
        for p, g in zip(params, gradient):
            p -= self.lr * g.mean(axis=0)

In [None]:
# MLP class with numerical gradient checking
class MLP_GradCheck:
    def __init__(self,num_features, num_classes, activation_function, num_hidden_layers, num_hidden_units): # option for activation function are ['relu', 'tanh', 'leaky']
        assert activation_function in ['relu', 'tanh', 'leaky'], 'options for activation function are [\'relu\', \'tanh\', \'leaky\'].'
        assert num_hidden_layers == len(num_hidden_units), 'the length of the hidden units array must match the number of hidden layers'
        
        if num_hidden_layers == 0:
            self.layers = [LinearLayer(input_size=num_features, output_size=num_classes)]
            self.layers.append(SoftmaxOutputLayer())
        else:
            # # Initialize the input layer
            # self.layers = [LinearLayer(num_features, num_hidden_units[0])]
            # if activation_function == 'relu':
            #     self.layers.append(ReLULayer())
            # elif activation_function == 'tanh':
            #     self.layers.append(TanhLayer())
            # elif activation_function == 'leaky':
            #     self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))

            # # Initialize each hidden layer
            # for i in range(0, num_hidden_layers-1):
            #     self.layers.append(LinearLayer(input_size=self.layers[i*2].no, output_size=num_hidden_units[i]))    
            #     if activation_function == 'relu':
            #         self.layers.append(ReLULayer())
            #     elif activation_function == 'tanh':
            #         self.layers.append(TanhLayer())
            #     elif activation_function == 'leaky':
            #         self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))
            
            # # Initialize the output layer
            # self.layers.append(LinearLayer(input_size=num_hidden_units[-1], output_size=num_classes))
            # self.layers.append(SoftmaxOutputLayer())
            
    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target)

    def fit(self, X, y, optimizer, epochs=10000, mini_batch_size=10, verbose=True):
        # No need to perform one-hot encoding since it's done within the dataset preprocessing
          
        losses = [] #This now records the loss of each minibatch

        #Shuffle the training dataset
        shuffler = np.random.permutation(X.shape[0])
        X = X[shuffler]
        y = y[shuffler]

        number_of_mini_batch = math.floor(X.shape[0] / mini_batch_size)
        leftover = X.shape[0] % mini_batch_size

        # batch_slice
        for i in tqdm(range(epochs)):
          losses_batch = []
          for batch_index in range(number_of_mini_batch):
            # Get a batch per slice
            if batch_index != (number_of_mini_batch-1):
              batch_slice = slice(batch_index*mini_batch_size, (batch_index+1)*mini_batch_size)
            else:
              batch_slice = slice(batch_index*mini_batch_size, (batch_index+1)*mini_batch_size + leftover)
            
            X_batch = X[batch_slice]
            y_batch = y[batch_slice]

            #Forward
            epsilon = 1e-8
            y_hat_batch = self.forward(X_batch)
            y_hat_batch = y_hat_batch + epsilon
            y_hat_batch[y_hat_batch>1] = 1
            loss_per_batch = -(y_batch * np.log(y_hat_batch)).sum(axis=-1).mean()
            losses_batch.append(loss_per_batch)

            #Backward
            self.backward(y_batch)

            # Gradient Check
            eps = 1e-4
            for i in range(len(self.layers)):
              if self.layers[i].parameters is not None:
                layer_gradients = [g for g in np.nditer(self.layers[i].gradient[0])]
                for j, x in enumerate(np.nditer(self.layers[i].w, op_flags=['readwrite'])):
                  x += eps
                  y_hat_batch = self.forward(X_batch)
                  y_hat_batch = y_hat_batch + epsilon
                  y_hat_batch[y_hat_batch>1] = 1
                  print(y_hat_batch)
                  loss_per_batch_plus = -(y_batch * np.log(y_hat_batch)).sum(axis=-1).mean()

                  x -= (2*eps)
                  y_hat_batch = self.forward(X_batch)
                  y_hat_batch = y_hat_batch + epsilon
                  y_hat_batch[y_hat_batch>1] = 1
                  print(y_hat_batch)
                  loss_per_batch_minus = -(y_batch * np.log(y_hat_batch)).sum(axis=-1).mean()

                  x += eps
                  numerical_grad = (loss_per_batch_plus - loss_per_batch_minus) / (2 * eps)

                  if not np.isclose(numerical_grad, layer_gradients[j], atol=0.0001):
                    raise ValueError((f'Numerical gradient of {numerical_grad:.4f} is not close to the gradient of {layer_gradients[j]:.4f}.'))
                    
              print('No gradient errors.')

            optimizer.step()
          loss_epoch_avg = np.mean(losses_batch)
          losses.append(loss_epoch_avg)
        
        plt.plot(losses)
        plt.xlabel("Epoch")
        plt.ylabel("Cross entropy loss")
        
    def predict(self, X):
        y_hat = self.forward(X)
        return y_hat

In [None]:
# Layers with L2 regularization implementation
class LinearLayer_L2(NeuralNetLayer):
    def __init__(self, input_size, output_size, lmbda):
        super().__init__()
        self.ni = input_size
        self.no = output_size
        self.w = np.random.randn(output_size, input_size)
        self.b = np.random.randn(output_size)
        self.cur_input = None
        self.parameters = [self.w, self.b]
        self.lmbda = lmbda

    def forward(self, x):
        self.cur_input = x
        return (self.w[None, :, :] @ x[:, :, None]).squeeze() + self.b

    def backward(self, gradient):
        assert self.cur_input is not None, "Must call forward before backward"
        # dw = gradient.dot(self.cur_input)
        dw = gradient[:, :, None] @ (self.cur_input[:, None, :])
        dw += self.lmbda * self.w
        db = gradient
        self.gradient = [dw, db]
        return gradient.dot(self.w)


# Instatiation of MLP with L2 Regularization
class MLP_L2:
    def __init__(
        self,
        num_features,
        num_classes,
        activation_function,
        num_hidden_layers,
        num_hidden_units,
        lmbda,
    ):  # option for activation function are ['relu', 'tanh', 'leaky']
        assert activation_function in [
            "relu",
            "tanh",
            "leaky",
        ], "options for activation function are ['relu', 'tanh', 'leaky']."
        assert num_hidden_layers == len(
            num_hidden_units
        ), "the length of the hidden units array must match the number of hidden layers"

        if num_hidden_layers == 0:
            self.layers = [
                LinearLayer_L2(
                    input_size=num_features, output_size=num_classes, lmbda=lmbda
                )
            ]
            self.layers.append(SoftmaxOutputLayer())
        else:
            # # Initialize the input layer
            # self.layers = [LinearLayer_L2(num_features, num_hidden_units[0, lmbda=lmbda])]
            # if activation_function == 'relu':
            #     self.layers.append(ReLULayer())
            # elif activation_function == 'tanh':
            #     self.layers.append(TanhLayer())
            # elif activation_function == 'leaky':
            #     self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))

            # # Initialize each hidden layer
            # for i in range(0, num_hidden_layers-1):
            #     self.layers.append(LinearLayer_L2(input_size=self.layers[i*2].no, output_size=num_hidden_units[i, lmbda=lmbda]))
            #     if activation_function == 'relu':
            #         self.layers.append(ReLULayer())
            #     elif activation_function == 'tanh':
            #         self.layers.append(TanhLayer())
            #     elif activation_function == 'leaky':
            #         self.layers.append(LeakyReLULayer(leakage_coefficient=0.001))

            # # Initialize the output layer
            # self.layers.append(LinearLayer_L2(input_size=num_hidden_units[-1], output_size=num_classes, lmbda=lmbda))
            # self.layers.append(SoftmaxOutputLayer())

            # Initialize the output layer
            self.layers.append(
                LinearLayer_L2(
                    input_size=num_hidden_units[-1],
                    output_size=num_classes,
                    lmbda=lmbda,
                )
            )
            self.layers.append(SoftmaxOutputLayer())
        self.lmbda = lmbda

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target)

    def fit(self, X, y, optimizer, epochs=10000, mini_batch_size=10, verbose=True):
        # No need to perform one-hot encoding since it's done within the dataset preprocessing

        losses = []  # This now records the loss of each minibatch

        # Shuffle the training dataset
        shuffler = np.random.permutation(X.shape[0])
        X = X[shuffler]
        y = y[shuffler]

        number_of_mini_batch = math.floor(X.shape[0] / mini_batch_size)
        leftover = X.shape[0] % mini_batch_size

        # batch_slice
        for i in tqdm(range(epochs)):
            losses_batch = []
            for batch_index in range(number_of_mini_batch):
                # Get a batch per slice
                if batch_index != (number_of_mini_batch - 1):
                    batch_slice = slice(
                        batch_index * mini_batch_size,
                        (batch_index + 1) * mini_batch_size,
                    )
                else:
                    batch_slice = slice(
                        batch_index * mini_batch_size,
                        (batch_index + 1) * mini_batch_size + leftover,
                    )

                X_batch = X[batch_slice]
                y_batch = y[batch_slice]

                # Forward
                epsilon = 1e-8
                y_hat_batch = self.forward(X_batch)
                y_hat_batch = y_hat_batch + epsilon
                y_hat_batch[y_hat_batch > 1] = 1
                s = (self.lmbda / 2) * np.mean(
                    [
                        np.square(self.layers[i * 2].w).sum()
                        for i in range(math.floor(len(self.layers) / 2))
                    ]
                )
                L = -(y_batch * np.log(y_hat_batch)).sum(axis=-1).mean()
                loss_per_batch = L + s
                losses_batch.append(loss_per_batch)
                # Backward
                self.backward(y_batch)
                optimizer.step()
            loss_epoch_avg = np.mean(losses_batch)
            losses.append(loss_epoch_avg)

        plt.plot(losses)
        plt.xlabel("Epoch")
        plt.ylabel("Cross entropy loss")

    def predict(self, X):
        y_hat = self.forward(X)
        return y_hat

# Textual Data Preprocessing & Analysis


In [None]:
train_labeled_bow_file = open("/content/aclImdb/train/labeledBow.feat", "r")
train_labeled_bow = {}
train_total_appear_in_reviews = {}
for line in train_labeled_bow_file.readlines():
    for i, feature_index_occurence in enumerate(
        line.strip().split(" ")
    ):  # split the values in each review into word index/number of occurences pairs
        if i == 0:  # skip the rating as it is the first element in the split
            continue
        tmp_arr = feature_index_occurence.split(":")
        word_index, word_occurence = (
            int(tmp_arr[0]),
            int(tmp_arr[1]),
        )  # extract word index/word occurence
        if (
            not imdb_corpus[word_index] in train_labeled_bow
        ):  # if word is not in dict, instantiate its review appearance at 1
            train_labeled_bow[imdb_corpus[word_index]] = word_occurence
            train_total_appear_in_reviews[imdb_corpus[word_index]] = 1
        else:
            train_labeled_bow[imdb_corpus[word_index]] += word_occurence
            train_total_appear_in_reviews[imdb_corpus[word_index]] += (
                1  # if it is already in dict, increment review appearance by 1
            )

train_labeled_rf = {
    k: v / 25000.0 for k, v in train_total_appear_in_reviews.items()
}  # contains the review frequencies for all words in the corpus
train_labeled_idf = {
    k: np.log(1 + (25000.0 / train_total_appear_in_reviews[k]))
    for k, v in train_labeled_bow.items()
}  # computes inverse document frequency to determine importance of words
print(train_labeled_rf)

In [None]:
# find words that appear in less than 1% of the documents and words that appear in more than 50% of the documents
filter_train = {}
for key, value in train_labeled_rf.items():
    if value > 0.01 and value < 0.5:
        filter_train[key] = value

print("Number of  features in the first train samples are:", len(train_labeled_rf))
print("Number of filtered features in train samples are:", len(filter_train))
# print(filter_train)

In [None]:
test_labeled_bow_file = open("/content/aclImdb/test/labeledBow.feat", "r")
test_labeled_bow = {}
test_total_appear_in_reviews = {}
for line in test_labeled_bow_file.readlines():
    for i, feature_index_occurence in enumerate(
        line.strip().split(" ")
    ):  # split the values in each review into word index/number of occurences pairs
        if i == 0:  # skip the rating as it is the first element in the split
            continue
        tmp_arr = feature_index_occurence.split(":")
        word_index, word_occurence = (
            int(tmp_arr[0]),
            int(tmp_arr[1]),
        )  # extract word index/word occurence
        if (
            not imdb_corpus[word_index] in test_labeled_bow
        ):  # if word is not in dict, instantiate its review appearance at 1
            test_labeled_bow[imdb_corpus[word_index]] = word_occurence
            test_total_appear_in_reviews[imdb_corpus[word_index]] = 1
        else:
            test_labeled_bow[imdb_corpus[word_index]] += word_occurence
            test_total_appear_in_reviews[imdb_corpus[word_index]] += (
                1  # if it is already in dict, increment review appearance by 1
            )

test_labeled_rf = {
    k: v / 25000.0 for k, v in test_total_appear_in_reviews.items()
}  # contains the review frequencies for all words in the corpus
test_labeled_idf = {
    k: np.log(1 + (25000.0 / test_total_appear_in_reviews[k]))
    for k, v in test_labeled_bow.items()
}  # computes inverse document frequency to determine importance of words
# print(test_labeled_rf)

In [None]:
# find words that appear in less than 1% of the documents and words that appear in more than 50% of the documents
filter_test = {}
for key, value in test_labeled_rf.items():
    if value > 0.01 and value < 0.5:
        filter_test[key] = value

print("Number of  features in the first test samples are:", len(test_labeled_rf))
print("Number of filtered features in test samples are:", len(filter_test))
# print(filter_test)

In [None]:
# Compute top features based on their absolute z-scores associated with continuous ratings (1-10) and build the training dataset
train_labeled_bow_file = open("/content/aclImdb/train/labeledBow.feat", "r")
filtered_words_train = (
    filter_train.keys()
)  # isolate only the words from the filtered dict

# create an empty dataframe to store word occurence at each review
train_review_words_occurence_df = pd.DataFrame(
    np.zeros((25000, len(filtered_words_train))), columns=filtered_words_train
)

review_index = 0
ratings = []
for line in train_labeled_bow_file.readlines():
    for i, feature_index_occurence in enumerate(line.strip().split(" ")):
        if i == 0:
            rating = int(
                feature_index_occurence
            )  # store rating as first element in review info split
            ratings.append(rating)
            continue
        tmp_arr = feature_index_occurence.split(":")
        word_index, word_occurence = int(tmp_arr[0]), int(tmp_arr[1])
        if imdb_corpus[word_index] in filtered_words_train:
            train_review_words_occurence_df.at[
                review_index, imdb_corpus[word_index]
            ] = word_occurence  # store the overall word occurence for the word at that rating
    review_index += 1
train_review_words_occurence = train_review_words_occurence_df.to_numpy()
display(train_review_words_occurence_df)

In [None]:
train_rating_words_standardized = np.zeros((25000, len(filtered_words_train)))

# standardize each word (using absolute value) at each review
for feature in range(
    train_rating_words_standardized.shape[1]
):  # loop through each word (col) of rating/word frequency df
    feature_mean = np.mean(train_review_words_occurence[:, feature])  # mean for column
    feature_sd = np.std(
        train_review_words_occurence[:, feature] - feature_mean
    )  # calculate standard deviation over the mean of each word
    for instance in range(train_rating_words_standardized.shape[0]):
        train_rating_words_standardized[instance, feature] = (
            train_review_words_occurence[instance, feature] - feature_mean
        ) / feature_sd

train_rating_words_standardized_df = pd.DataFrame(
    train_rating_words_standardized, columns=filtered_words_train
)
# display(train_rating_words_standardized_df)
ratings_standardized = (np.asarray(ratings) - np.std(ratings)) / np.mean(ratings)
# print(ratings_standardized)

In [None]:
z_scores = np.abs(
    np.dot(train_rating_words_standardized.T, ratings_standardized) / np.sqrt(25000)
)  # compute z-score list from Hypothesis Testing slides

words_zscores = {}
index = 0
for word in (
    train_rating_words_standardized_df
):  # associate each word with its newly calculated, total z-score
    words_zscores[word] = z_scores[index]
    index += 1

words_zscores_sorted_reverse = sorted(
    words_zscores, key=words_zscores.get, reverse=True
)  # sort z-scores by least to maximum
words_zscores_sorted_reverse = words_zscores_sorted_reverse[:1000]
important_features = {}

for word in (
    words_zscores_sorted_reverse
):  # populate new dict with words that have greatest z-scores
    important_features[word] = words_zscores[word]

important_features_index = {}
for important_word in important_features.keys():
    for i, word in enumerate(imdb_corpus):
        if word == important_word:
            important_features_index[i] = words_zscores[word]
print(important_features)
print(important_features_index)

In [None]:
# filter out only the important words from each review
train_labeled_bow_file = open("/content/aclImdb/train/labeledBow.feat", "r")

# create an empty dataframe to store word occurence at each review
X_train = pd.DataFrame(
    np.zeros((25000, len(important_features_index))), columns=important_features_index
)

review_index = 0
y_train = []
for line in train_labeled_bow_file.readlines():
    for i, feature_index_occurence in enumerate(line.strip().split(" ")):
        if i == 0:
            rating = int(
                feature_index_occurence
            )  # store rating as first element in review info split
            if rating >= 5:
                rating = 1
            else:
                rating = 0
            y_train.append(rating)
            continue
        tmp_arr = feature_index_occurence.split(":")
        word_index, word_occurence = int(tmp_arr[0]), int(tmp_arr[1])
        if word_index in important_features_index.keys():
            X_train.at[review_index, word_index] = (
                word_occurence  # store the overall word occurence for the word at that rating
            )
    review_index += 1
X_train = X_train.to_numpy()
y_train = np.asarray(y_train)

In [None]:
# Build the data matrix and label vector for the testing dataset
# filter out only the important words from each review
test_labeled_bow_file = open("/content/aclImdb/test/labeledBow.feat", "r")

# create an empty dataframe to store word occurence at each review
X_test = pd.DataFrame(
    np.zeros((25000, len(important_features_index))), columns=important_features_index
)

review_index = 0
y_test = []
for line in test_labeled_bow_file.readlines():
    for i, feature_index_occurence in enumerate(line.strip().split(" ")):
        if i == 0:
            rating = int(
                feature_index_occurence
            )  # store rating as first element in review info split
            if rating >= 5:  # no positive rating is less than 5
                rating = 1
            else:  # no negative rating is greater than 5
                rating = 0
            y_test.append(rating)
            continue
        tmp_arr = feature_index_occurence.split(":")
        word_index, word_occurence = int(tmp_arr[0]), int(tmp_arr[1])
        if word_index in important_features_index.keys():
            X_test.at[review_index, word_index] = (
                word_occurence  # store the overall word occurence for the word at that rating
            )
    review_index += 1
X_test = X_test.to_numpy()
y_test = np.asarray(y_test)

In [None]:
# Bar Plot of Top 10 Postive and Negative Words from Simple Linear Regression Hypothesis Testing

# standardize each word (using absolute value) at each review
# we leave this version with no absolute value so that the question posed at the end of Task 1.1 can be answered.
train_rating_words_standardized = np.zeros((25000, len(filtered_words_train)))

for feature in range(
    train_rating_words_standardized.shape[1]
):  # loop through each word (col) of rating/word frequency df
    feature_mean = np.mean(train_review_words_occurence[:, feature])
    feature_sd = np.std(train_review_words_occurence[:, feature])
    for instance in range(train_rating_words_standardized.shape[0]):
        train_rating_words_standardized[instance, feature] = (
            train_review_words_occurence[instance, feature] - feature_mean
        ) / feature_sd

train_rating_words_standardized_df = pd.DataFrame(
    train_rating_words_standardized, columns=filtered_words_train
)
# display(train_rating_words_standardized_df)
ratings_standardized = [
    (r - np.std(ratings)) / np.mean(ratings) for r in ratings
]  # CONVERT TO NUMPY ARRAY
# print(ratings_standardized)

z_scores = np.dot(train_rating_words_standardized.T, ratings_standardized) / np.sqrt(
    25000
)  # compute z-score list from Hypothesis Testing slides
words_zscores = {}
index = 0

for word in (
    train_rating_words_standardized_df
):  # associate each word with its newly calculated, total z-score
    words_zscores[word] = z_scores[index]
    index += 1

words_zscores_sorted_reversed_list = sorted(
    words_zscores, key=words_zscores.get, reverse=True
)  # sort z-scores by least to maximum
words_zscores_sorted_reversed = {}
for word in (
    words_zscores_sorted_reversed_list
):  # populate new dict with words that have greatest z-scores
    words_zscores_sorted_reversed[word] = words_zscores[word]
print(words_zscores_sorted_reversed)

In [None]:
top_10_positive_x = []
top_10_positive_y = []
top_10_negative_x = []
top_10_negative_y = []

index = 0
for word, z_score in words_zscores_sorted_reversed.items():
    if index >= 0 and index <= 9:
        top_10_positive_x.append(z_score)
        top_10_positive_y.append(word)
    elif index >= (len(words_zscores_sorted_reversed.keys()) - 10) and index <= (
        len(words_zscores_sorted_reversed.keys()) - 1
    ):
        top_10_negative_x.append(z_score)
        top_10_negative_y.append(word)
    index += 1

In [None]:
plt.barh(top_10_positive_y, top_10_positive_x)
plt.barh(top_10_negative_y, top_10_negative_x)
plt.title("Top 10 Positive And Negative Words With z-scores")
plt.ylabel("word")
plt.xlabel("z-score")
plt.show()