<a href="https://colab.research.google.com/github/vedant75/AlgosFromScratch/blob/main/LogisticRegression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from typing import Optional, Callable, Tuple

In [2]:
def _add_intercept(X: np.ndarray) -> np.ndarray:
    """Return X with a leading column of ones.
    Input: X: (n, p) or (n,) -> output (n, p+1)
    """
    X = np.asarray(X)
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    ones = np.ones((X.shape[0], 1), dtype=float)
    return np.hstack([ones, X])

class LogisticRegression:
  def __init__(self, lr=0.01, n_iter=100_000, solver='batch', verbose=False, tol=1e-5, shuffle=True, batch_size=32):
    self.bias =None
    self.weight =None
    self.lr =lr
    self.n_iter = n_iter
    self.cost_history = []
    self.verbose = verbose
    self.tol = tol
    self.solver = solver
    self.shuffle = shuffle
    self.batch_size = batch_size

  @staticmethod
  def sigmoid(z):
    z = np.asarray(z)
    out = np.empty_like(z, dtype='float')
    pos = z>=0
    out[pos] = 1.0/(1.0 + np.exp(-z[pos]))
    neg = ~pos
    e = np.exp(z[neg])
    out[neg] = e/(1+e)
    return out

  def _loss(self, a, y):
    y = np.asarray(y)
    m = y.shape[0]
    epsilon = 1e-15
    a = np.clip(a, epsilon, 1-epsilon)
    loss = (-1/m)*np.sum(y*np.log(a) + (1-y)*np.log(1-a))
    return loss

  def fit(self, X, y):
    if self.solver == 'batch':
      return self.batch_gd(X,y)
    elif self.solver == 'sgd':
      return self.sgd_gd(X,y)
    elif self.solver == 'mini_batch_gd':
      return self.mini_batch_gd(X,y)
    elif self.solver == 'newton':
      return self.newton(X,y)


  def batch_gd(self, X, y):
    X=np.asarray(X, dtype='float')
    y=np.asarray(y, dtype='float')
    assert X.shape[0] == y.shape[0]
    m, n = X.shape
    if self.weight is None:
      self.weight = np.zeros(n, dtype='float')
    self.bias = 0.0

    prev_loss = 0.0
    for epoch in range(self.n_iter):
      # z = wx + b
      z = np.dot(X, self.weight) + self.bias
      a = LogisticRegression.sigmoid(z)

      dw = np.dot(X.T, (a-y))/m
      db = np.sum(a-y)/m

      self.weight -= self.lr*dw
      self.bias -= self.lr*db

      cur_loss = self._loss(a, y)

      # 2. Check for convergence for early stopping
      if abs(cur_loss - prev_loss) < self.tol and epoch > 0:
          print(f"Converged at iteration {epoch}. Stopping early.")
          break # Exit the loop

      prev_loss = cur_loss
      self.cost_history.append(cur_loss)
    return self

  def sgd_gd(self, X, y):
    X = np.asarray(X, dtype='float')
    y = np.asarray(y, dtype='float')
    assert X.shape[0] == y.shape[0]
    m, n = X.shape
    if self.weight is None:
      self.weight = np.zeros(n, dtype='float')
    if self.bias is None:
      self.bias =0.0
    prev_loss = float('inf')
    for epoch in range(self.n_iter):
      if self.shuffle:
        indices = np.arange(m)
        np.random.shuffle(indices)

      for i in indices:
        # z[i] = w*X[i] + b
        z = np.dot(X[i], self.weight) + self.bias
        a = LogisticRegression.sigmoid(z)
        error = (a-y[i])

        dw = X[i]*error
        db = error

        self.weight -= self.lr*dw
        self.bias -= self.lr*db

      full_pred = LogisticRegression.sigmoid(np.dot(X, self.weight)+self.bias)
      cur_loss = self._loss(full_pred, y)
      if self.verbose and (epoch % max(1, self.n_iter//10) ==0):
        print(f"Epoch {epoch:6d}  Loss={cur_loss:.6f}")
      if abs(cur_loss - prev_loss) < self.tol and epoch>0:
        print(f'Early stopping at epoch {epoch}')
        break

      prev_loss = cur_loss
      self.cost_history.append(cur_loss)
    return self

  def mini_batch_gd(self, X, y):
    X = np.asarray(X, dtype='float')
    y = np.asarray(y, dtype='float')
    assert X.shape[0] == y.shape[0]
    m,n = X.shape
    if self.weight is None:
      self.weight = np.zeros(n, dtype='float')
    if self.bias is None:
      self.bias = 0.0

    prev_loss = float('inf')
    for epoch in range(self.n_iter):
      if self.shuffle:
        indices= np.arange(m)
        np.random.shuffle(indices)
        X_shuffled = X[indices]
        y_shuffled = y[indices]
      else:
          X_shuffled = X
          y_shuffled = y

      for i in range(0, m, self.batch_size):
        # z[0:batch_size] = X_shuffled[0:batch_size] + b
        z = np.dot(X_shuffled[i:i+self.batch_size], self.weight) + self.bias
        a = LogisticRegression.sigmoid(z)
        error = (a-y_shuffled[i:i+self.batch_size])

        dw = np.dot(X_shuffled[i:i+self.batch_size].T, error)/len(y_shuffled[i:i+self.batch_size])
        db = np.sum(error)/len(y_shuffled[i:i+self.batch_size])

        self.weight -= self.lr*dw
        self.bias -= self.lr*db

      full_pred = LogisticRegression.sigmoid(np.dot(X_shuffled, self.weight)+self.bias)
      cur_loss = self._loss(full_pred, y)
      if self.verbose and (epoch % max(1, self.n_iter//10)==0):
         print(f"Epoch {epoch:6d}  Loss={cur_loss:.6f}")
      if abs(cur_loss - prev_loss) < self.tol and epoch>0:
        print(f'Early stopping at epoch {epoch}')
        break

      prev_loss = cur_loss
      self.cost_history.append(cur_loss)
    return self

  def newton(self, X, y):
    X = np.asarray(X, dtype='float')
    y = np.asarray(y, dtype='float')
    assert X.shape[0] == y.shape[0]
    X = _add_intercept(X)
    m,n = X.shape
    if self.weight is None:
      self.weight = np.zeros(n, dtype='float')
    prev_loss = float('inf')
    for epoch in range(self.n_iter):
      z = np.dot(X, self.weight)
      a = LogisticRegression.sigmoid(z)
      grad = np.dot(X.T, (a-y))/m

      W = a*(1-a)
      X_b = W[:, None] * X
      hessian = np.dot(X.T, X_b) / m

      try:
        step = np.linalg.solve(hessian + 1e-6 * np.eye(n), grad)
        self.weight -= step
      except np.linalg.LinAlgError:
        # Fallback to gradient descent if Hessian is singular
        self.weight -= self.lr * grad


      cur_loss = self._loss(a, y)

      # 2. Check for convergence for early stopping
      if abs(cur_loss - prev_loss) < self.tol and epoch > 0:
          print(f"Converged at iteration {epoch}. Stopping early.")
          break # Exit the loop

      prev_loss = cur_loss
      self.cost_history.append(cur_loss)
    return self

  def predict_proba(self, X):
    X = np.asarray(X)

    # Check which logic to use based on the solver
    if self.solver == 'newton':
        # Newton's method uses an intercept, no separate bias
        X_aug = _add_intercept(X)
        z = np.dot(X_aug, self.weight)
    else:
        # Other solvers use a separate weight and bias
        z = np.dot(X, self.weight) + self.bias

    return self.sigmoid(z)

  def predict(self, X, threshold=0.5):
    y_prob = self.predict_proba(X)
    return (y_prob >= threshold).astype(int)