<a href="https://colab.research.google.com/github/shah-zeb-naveed/machine-learning-system-design/blob/main/ml_algos_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# prompt: Implement cosine similarity between two vectors.

import numpy as np

def cosine_similarity(vec1, vec2):
  """Calculates the cosine similarity between two vectors.

  Args:
    vec1: The first vector.
    vec2: The second vector.

  Returns:
    The cosine similarity between the two vectors.
  """
  dot_product = np.dot(vec1, vec2)
  magnitude_vec1 = np.linalg.norm(vec1)
  magnitude_vec2 = np.linalg.norm(vec2)
  if magnitude_vec1 == 0 or magnitude_vec2 == 0:
    return 0  # Handle cases where either vector has zero magnitude.
  return dot_product / (magnitude_vec1 * magnitude_vec2)

vec1 = [1, 5]
vec2 = [3, 6]
cosine_similarity(vec1, vec2)

np.float64(0.9647638212377322)

In [None]:
def euclidean(vec1, vec2):
  """Calculates the Euclidean distance between two vectors.

  Args:
    vec1: The first vector.
    vec2: The second vector.

  Returns:
    The Euclidean distance between the two vectors.
  """
  # replace for loop with vectorization
  return np.sqrt(np.sum((vec1 - vec2) ** 2))

vec1 = np.array([1, 5])
vec2 = np.array([3, 6])
euclidean(vec1, vec2)

np.float64(2.23606797749979)

In [None]:
# prompt: create function that calculates softmax

import numpy as np

def softmax(x):
    """Calculates the softmax of a vector.

    Args:
      x: A NumPy array or list representing the input vector.

    Returns:
      A NumPy array representing the softmax of the input vector.
    """
    e_x = np.exp(x) # - np.max(x))  # Subtract max for numerical stability
    return e_x / np.sum(np.exp(x))

x = np.array([1, 2, 3])
softmax(x)

array([0.09003057, 0.24472847, 0.66524096])

In [None]:
# prompt: create code snippet for a simple softmax classifier

import numpy as np

# Example usage:
scores = np.array([1.0, 2.0, 3.0])  # Example scores for three classes
probabilities = np.argmax(softmax(scores))
probabilities


np.int64(2)

In [None]:
#Gini impurity and entropy calculators.

def gini_at_node(proportions):
  """Calculates the Gini impurity at a node given class proportions.

  Args:
    proportions: A list of proportions of each class at the node.
                 The proportions should sum to 1.

  Returns:
    The Gini impurity at the node.
  """
  gini = 1 - np.sum(np.square(proportions))
  return gini

def entropy_at_node(proportions):
  """Calculates the entropy at a node given class proportions.

  Args:
    proportions: A list of proportions of each class at the node.

  """

  entropy = -np.sum(proportions * np.log2(proportions))
  return entropy

proportions = [0.5, 0.5]
gini_at_node(proportions), entropy_at_node(proportions)


(np.float64(0.5), np.float64(1.0))

In [None]:
import numpy as np

# this is the gini of children. subtract from gini of parent to get gain.
def gini_of_children(left_child, right_child):
    n_left, n_right = len(left_child), len(right_child)
    n_total = n_left + n_right

    def gini(node):
        if len(node) == 0:
            return 0.0
        _, counts = np.unique(node, return_counts=True)
        probs = counts / counts.sum()
        return 1.0 - np.sum(probs**2)

    gini_left = gini(left_child)
    gini_right = gini(right_child)

    return (n_left / n_total) * gini_left + (n_right / n_total) * gini_right

left_child = np.array([0, 0, 0, 0, 0, 1, 1])
right_child = np.array([1, 1, 1, 1, 1, 0, 0])

gini_impurity_value = gini_of_children(left_child, right_child)
gini_impurity_value

np.float64(0.40816326530612246)

In [None]:
# prompt: generate code for # KNN (including distance computation)

import numpy as np


class KNN:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X, y):
        # lazy learner. nothing computed here.
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        predictions = [self._predict(x) for x in X]
        return np.array(predictions)

    def _predict(self, x):
        # compute distances
        distances = [euclidean(x, x_train) for x_train in self.X_train]

        # get k nearest samples, labels
        k_indices = np.argsort(distances)[:self.k]
        k_nearest_labels = [self.y_train[i] for i in k_indices]

        # majority vote, most common class label
        most_common = np.argmax(np.bincount(k_nearest_labels))

        # if regression, return np.mean(k_nearest_labels)
        return most_common

example_X = np.array([[6, 11], [25, 24], [3, 4], [4, 5]])
example_y = np.array([0, 0, 1, 1])

knn = KNN(k=3)
knn.fit(example_X, example_y)
knn.predict(np.array([[2, 2]]))

array([1])

In [None]:
import numpy as np

class LogisticRegression:
    def __init__(self, learning_rate=0.01, epochs=1000):
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.weights = None
        self.bias = None

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))

    def forward(self, X):
        # Linear transformation
        z = np.dot(X, self.weights) + self.bias
        # Sigmoid activation (probabilities)
        predictions = self.sigmoid(z)
        return predictions

    def fit(self, X, y):
        num_samples, num_features = X.shape
        self.weights = np.zeros(num_features)
        self.bias = 0

        for epoch in range(self.epochs):
            # Forward pass
            predictions = self.forward(X)

            # Compute gradients via chain rule
            # log loss simplifies to
            dw = (1 / num_samples) * np.dot(X.T, (predictions - y))
            db = (1 / num_samples) * np.sum(predictions - y)

            # Update weights
            self.weights += self.learning_rate * (-dw)
            self.bias += self.learning_rate * (-db)

# Example input
X_train = np.array([[1, 2], [2, 3], [3, 4]])  # 3 samples, 2 features
y_train = np.array([0, 1, 0])  # Target labels

# Create and fit the model
model = LogisticRegression(learning_rate=0.01, epochs=1000)
model.fit(X_train, y_train)

# Get predictions for new data
predictions = model.forward(np.array([[1, 1]]))  # Forward pass with new sample
print("Predictions:", predictions)


Predictions: [0.42013584]


In [None]:
# prompt: create a function for cross-entropy loss

import numpy as np

def cross_entropy_loss(y_true, y_pred):
    """Calculates the cross-entropy loss between true and predicted labels.

    Args:
        y_true: A NumPy array of true labels (one-hot encoded).
        y_pred: A NumPy array of predicted probabilities.

    Returns:
        The cross-entropy loss.
    """
    epsilon = 1e-15  # Small value to avoid log(0) errors
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon) #numerical stability. for 0 and 1
    loss = -np.sum(y_true * np.log(y_pred)) / len(y_true)
    return loss


In [None]:
# prompt: create func for MSE loss

import numpy as np

def mse_loss(y_true, y_pred):
    """Calculates the mean squared error (MSE) loss.

    Args:
        y_true: A NumPy array of true labels.
        y_pred: A NumPy array of predicted labels.

    Returns:
        The MSE loss.
    """
    return np.mean((y_true - y_pred)**2)


In [None]:
# prompt: Decision tree node splitting (Gini impurity or entropy).

In [None]:
# prompt: A function to find best split in DT

import numpy as np
def best_split(X, y):
    """Finds the best split for a decision tree node.

    Args:
        X: The feature matrix.
        y: The target vector.

    Returns:
        A tuple containing the best split feature index, the best split threshold,
        and the minimum Gini impurity.
    """
    best_feature_index = None
    best_threshold = None
    num_features = X.shape[1]
    min_gini_impurity = 1.0  # Initialize with the maximum possible value

    for feature_index in range(num_features):
        thresholds = np.unique(X[:, feature_index])
        for threshold in thresholds:
            left_child_indices = X[:, feature_index] <= threshold
            right_child_indices = X[:, feature_index] > threshold
            left_child_labels = y[left_child_indices]
            right_child_labels = y[right_child_indices]
            gini_impurity_gain = gini_of_children(left_child_labels, right_child_labels)

            if gini_impurity_gain < min_gini_impurity:
                min_gini_impurity = gini_impurity_gain
                best_feature_index = feature_index
                best_threshold = threshold

    return best_feature_index, best_threshold, min_gini_impurity

# example

# data for current split
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])
y = np.array([0, 0, 1, 1])

best_feature_index, best_threshold, min_gini_impurity = best_split(X, y)
print(best_feature_index, best_threshold, min_gini_impurity)

0 2 0.0


In [None]:
# prompt: write a code snippet for a very basic decision tree algorithm

import numpy as np
class Node:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, value=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value # the leaf value. if not leaf, None.

class DecisionTree:
    def __init__(self, min_samples_split=2, max_depth=2):
        self.root = None
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth

    def _build_tree(self, X, y, depth=0):
        num_samples, num_features = X.shape
        num_labels = len(np.unique(y))

        # Splitting criteria
        if depth >= self.max_depth or num_samples < self.min_samples_split or num_labels == 1:
            leaf_value = self._most_common_label(y)
            return Node(value=leaf_value)

        best_feature_index, best_threshold, _ = best_split(X,y)

        left_child_indices = X[:, best_feature_index] <= best_threshold
        right_child_indices = X[:, best_feature_index] > best_threshold

        left = self._build_tree(X[left_child_indices], y[left_child_indices], depth + 1)
        right = self._build_tree(X[right_child_indices], y[right_child_indices], depth + 1)
        return Node(best_feature_index, best_threshold, left, right)


    def _most_common_label(self, y):
        return np.argmax(np.bincount(y))

    def fit(self, X, y):
        self.root = self._build_tree(X, y)

    def _predict_one(self, x, node):
        if node.value is not None:
            # this is a leaf
            return node.value
        if x[node.feature_index] <= node.threshold:
            return self._predict_one(x, node.left)
        return self._predict_one(x, node.right)

    def predict(self, X):
        return [self._predict_one(x, self.root) for x in X]


In [None]:
# prompt: Linear Regression class with batch and mini-bastch gradient descent, with regularization option

import numpy as np

class LinearRegression:
    def __init__(self, learning_rate=0.01, n_iters=1000, regularization=None, lambda_param=0.1):
        self.learning_rate = learning_rate
        self.n_iters = n_iters
        self.weights = None
        self.bias = None
        self.regularization = regularization
        self.lambda_param = lambda_param

    def _run_batch_gradient_descent(self, X, y):
      n_samples = X.shape[0]
      y_predicted = self.predict(X)

      # chain rule
      dw = (1 / n_samples) * np.dot(X.T, (y_predicted - y))
      db = (1 / n_samples) * np.sum(y_predicted - y)

      # Regularization
      if self.regularization == 'l1':
        dw += (self.lambda_param / n_samples) * np.sign(self.weights)
      elif self.regularization == 'l2':
        dw += (2 * self.lambda_param / n_samples) * self.weights

      self.weights -= self.learning_rate * dw
      self.bias -= self.learning_rate * db

    def fit(self, X, y, batch_size=None):
        n_samples, n_features = X.shape

        # init parameters
        self.weights = np.zeros(n_features)
        self.bias = 0

        # train model
        for _ in range(self.n_iters):
          if batch_size is None: # gradient descent
            for _ in range(self.n_iters):
                self._run_batch_gradient_descent(X, y)
          else: # Mini-batch gradient descent
            for i in range(0, n_samples, batch_size):
                X_batch = X[i:i+batch_size]
                y_batch = y[i:i+batch_size]
                self._run_batch_gradient_descent(X_batch, y_batch)


    def predict(self, X):
        y_approximated = np.dot(X, self.weights) + self.bias
        return y_approximated

# example

X = np.array([[1, 2], [2, 3], [3, 4], [4, 5]])
y = np.array([1, 2, 3, 4])
reg = LinearRegression(learning_rate=0.01, n_iters=1000, regularization='l1', lambda_param=0.1)
reg.fit(X, y, batch_size=2)
print(reg.predict(X))


[1.06721502 2.02743833 2.98766165 3.94788496]


In [None]:
# prompt: implement a simple KFold class

import numpy as np

class KFoldMy:
    def __init__(self, n_splits=5):
        self.n_splits = n_splits

    def split(self, X, y=None):
        n_samples = len(X)

        # n_splits is really the number of possible test folds
        # or the combination

        # divide by n_samples to get an appropriate fold_size
        # that will achieve n_splits

        fold_size = n_samples // self.n_splits
        indices = np.arange(n_samples)

        print('fold_size', fold_size)
        print('n_samples', n_samples)


        for i in range(self.n_splits):
            start = i * fold_size
            end = start + fold_size
            #end = start + fold_size
            test_indices = indices[start:end]
            train_indices = np.concatenate([indices[:start], indices[end:]])
            print(len(train_indices), len(test_indices))
            print(train_indices, test_indices)
            yield train_indices, test_indices

In [None]:
# prompt: K-fold cross validation

import numpy as np
from sklearn.model_selection import KFold

def k_fold_cross_validation(model, X, y, k=5):
    kf = KFoldMy(n_splits=k) # , shuffle=True, random_state=42
    scores = []
    for train_index, test_index in kf.split(X):
        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        #model.fit(X_train, y_train)
        predictions = y_test#model.predict(X_test)
        score = mse_loss(y_test, predictions)
        scores.append(score)

    return np.mean(scores), np.std(scores)


# Example usage:
X = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [5,6],[6,7], [2, 3], [3, 4], [4, 5], [5,6],[6,7]])
y = np.array([1, 2, 3, 4, 5, 6, 2, 3, 4, 5, 6])

print('input:', len(X))
reg = LinearRegression()
mean_score, std_score = k_fold_cross_validation(reg, X, y, k=3)
print(f"Mean loss: {mean_score:.4f}, Standard deviation: {std_score:.4f}")


input: 11
fold_size 3
n_samples 11
8 3
[ 3  4  5  6  7  8  9 10] [0 1 2]
8 3
[ 0  1  2  6  7  8  9 10] [3 4 5]
8 3
[ 0  1  2  3  4  5  9 10] [6 7 8]
Mean loss: 0.0000, Standard deviation: 0.0000


In [None]:
# prompt: generate func for stratified sampling

import pandas as pd
import numpy as np

def stratified_sampling(df, column, num_samples):
    """
    Performs stratified sampling on a Pandas DataFrame.

    Args:
      df: The input DataFrame.
      column: The column to stratify by.
      num_samples: The total number of samples to draw.

    Returns:
      A new DataFrame with the stratified sample.
    """

    # Calculate proportions for each stratum
    proportions = df[column].value_counts(normalize=True)

    # Calculate the number of samples for each stratum
    samples_per_stratum = (proportions * num_samples).astype(int)

    # Adjust sample sizes to match total num_samples (due to rounding)
    diff = num_samples - samples_per_stratum.sum()
    if diff > 0 :
        samples_per_stratum[samples_per_stratum.index[0]] += diff

    # Sample from each stratum
    stratified_sample = []
    for stratum, num_samples_stratum in samples_per_stratum.items():
      stratified_sample.append(df[df[column] == stratum].sample(n=num_samples_stratum, random_state=42))

    # Concatenate the samples
    return pd.concat(stratified_sample)


In [None]:
# prompt: create code snippet to implement simple kmeans from scratch

import numpy as np

class KMeans:
  def __init__(self, n_clusters=8, max_iters=300):
      self.n_clusters = n_clusters
      self.max_iters = max_iters
      self.centroids = None

  def fit(self, X):
      # Initialize centroids randomly. Initialize from dataset but eventually, no restriction.
      # They can NOT belong to the dataset after training.
      n_samples, n_features = X.shape
      random_sample_idxs = np.random.choice(n_samples, self.n_clusters, replace=False)
      self.centroids = X[random_sample_idxs]

      for _ in range(self.max_iters):
          # Assign each data point to the nearest centroid
          clusters = [[] for _ in range(self.n_clusters)]
          for data_point in X:
              distances = [euclidean(data_point, centroid) for centroid in self.centroids]
              closest_centroid_idx = np.argmin(distances)
              clusters[closest_centroid_idx].append(data_point)

          # Update centroids based on the mean of assigned data points
          prev_centroids = self.centroids
          self.centroids = []
          for cluster in clusters:
              if cluster: #check if the cluster is not empty
                  new_centroid = np.mean(cluster, axis=0)
                  self.centroids.append(new_centroid)
              else:
                  # If a cluster is empty, keep the old centroid OR even better, re-init to avoid dead centroid
                  self.centroids.append(prev_centroids[len(self.centroids)])
          self.centroids = np.array(self.centroids)  # Convert back to NumPy array

          # Check for convergence
          if np.all(prev_centroids == self.centroids):
              break

  def predict(self, X):
      # Predict the cluster for each data point
      predictions = []
      for data_point in X:
          distances = [euclidean(data_point, centroid) for centroid in self.centroids]
          closest_centroid_idx = np.argmin(distances)
          predictions.append(closest_centroid_idx)
      return np.array(predictions)


In [None]:
# prompt: simulate classification data and calculate recall, precision, f1 score and ROC from scratch

import numpy as np
from sklearn.metrics import recall_score, precision_score, f1_score, roc_auc_score, roc_curve
import matplotlib.pyplot as plt

# Simulate classification data
np.random.seed(0)
n_samples = 100
X = np.random.rand(n_samples, 2)
y = (X[:, 0] + X[:, 1] > 1).astype(int)  # Example decision boundary

# Example predictions (replace with your model's predictions)
y_pred = (X[:, 0] + X[:, 1] > 0.8).astype(int)  # Another decision boundary
y_prob = X[:, 0] + X[:, 1]

In [None]:
# prompt: create code snippet that calculates recall, precision, f1 and roc from scratch (do not use libraries like sklearn)

import numpy as np
def calculate_metrics(y_true, y_pred, y_prob):
    """Calculates recall, precision, F1-score, and ROC AUC from scratch.

    Args:
        y_true: True binary labels (0 or 1).
        y_pred: Predicted binary labels (0 or 1).
        y_prob: Predicted probabilities for the positive class.

    Returns:
        A dictionary containing recall, precision, F1-score, and ROC AUC.
    """

    tp = sum((y_true == 1) & (y_pred == 1))
    tn = sum((y_true == 0) & (y_pred == 0))
    fp = sum((y_true == 0) & (y_pred == 1))
    fn = sum((y_true == 1) & (y_pred == 0))

    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    # ROC AUC calculation
    sorted_indices = np.argsort(y_prob)[::-1]  # Sort probabilities in descending order
    y_true_sorted = y_true[sorted_indices]
    y_prob_sorted = y_prob[sorted_indices]

    tpr_values = []
    fpr_values = []

    # 0.9
    # 0.5
    # 0.1

    for i in range(len(y_true_sorted)):
      threshold = y_prob_sorted[i]
      y_pred_threshold = (y_prob >= threshold).astype(int)

      tp_t = sum((y_true == 1) & (y_pred_threshold == 1))
      tn_t = sum((y_true == 0) & (y_pred_threshold == 0))
      fp_t = sum((y_true == 0) & (y_pred_threshold == 1))
      fn_t = sum((y_true == 1) & (y_pred_threshold == 0))

      tpr = tp_t / (tp_t + fn_t) if (tp_t + fn_t) > 0 else 0
      fpr = fp_t / (fp_t + tn_t) if (fp_t + tn_t) > 0 else 0

      tpr_values.append(tpr)
      fpr_values.append(fpr)

    # trapezoidal rule
    # just base multiplied by avg. height
    auc = 0
    for i in range(len(fpr_values) - 1):

        auc += (fpr_values[i+1] - fpr_values[i]) * (tpr_values[i+1] + tpr_values[i]) / 2

    return {"recall": recall, "precision": precision, "f1": f1, "roc_auc": auc}

# Example usage (replace with your actual data):
metrics = calculate_metrics(y, y_pred, y_prob)
metrics


{'recall': np.float64(1.0),
 'precision': np.float64(0.7647058823529411),
 'f1': np.float64(0.8666666666666666),
 'roc_auc': np.float64(1.0)}

In [None]:
# Calculate metrics
recall = recall_score(y, y_pred)
precision = precision_score(y, y_pred)
f1 = f1_score(y, y_pred)
roc_auc = roc_auc_score(y, y_prob)  # Use probabilities for ROC AUC

# Calculate ROC curve
fpr, tpr, thresholds = roc_curve(y, y_prob)


# Print the metrics
print(f"Recall: {recall:.2f}")
print(f"Precision: {precision:.2f}")
print(f"F1-score: {f1:.2f}")
print(f"ROC AUC: {roc_auc:.2f}")

# Plot the ROC curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')  # Diagonal line
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()
