**IWLS Code**

In [6]:
import numpy as np
import time

def check_data(X,y):
  if len(X) != len(y):
    return False, "Length of the answer vector doesn't fit the number of data points.\n"
  for i in range(1,len(X)):
    if len(X[i]) != len(X[0]):
      return False, "The row at the index "+str(i)+" seems to be missing an observation.\n"
  for i in range(len(y)):
    if y[i] < 0 or y[i] > 1:
      return False, "The answer at the index "+str(i)+" doesn't indicate a binary class nor does it indicate a probability of belonging to one.\n"
  return True, ""

def calc_loglike(X, Y, beta):
  """
  Calculates the value of the log-likelihood for a given training data,
  answer vector and model parameters
  :param X: a 2d numpy array containing the experiment matrix.
  :param Y: a 1d numpy array containing the answer vector.
  :param beta: a 1d numpy array containing the parameters of the model.
  :return: a single float with the value of the log-likelihood function.
  """
  Xbeta = X @ beta
  return Y @ Xbeta - np.sum(np.log(1+np.exp(Xbeta)))

def fit_with_IWLS(data, answers, intercept: bool = True,
                  relevant_variables = None, additional_interactions = None,
                  l2_reg: float = 1.0, beta0_gen = None,
                  max_iterations: int = 500, min_step_norm: float = 1e-4,
                  max_time: float = 3600.0, check_data: bool = False):
  """
  Calculates the coefficients of a Logistic Regression model using Iterative
  Weighted Least Squares method.
  :param data: The data on which the model will be fit.
  :param answers: The vector with answers (numbers belonging to the
    [0,1] interval).
  :param intercept: If True, the model will fit an intercept (meaning beta' @ x
    shall be replaced with beta' @ x + beta0 in all calculations).
  :param relevant_variables: A collection of indices, indicating on which
    columns of the data should the model be built.
    If None, all columns will be used.
  :param additional_interactions: A collection of pairs of indices, indicating
    which column element-wise products should be using for building the model.
    If None, no such variables will be considered.
  :param l2_reg: The strength of ridge regularization (the coefficient of
    the ridge penalty). 0 means no regularization. 1 is the default, same as
    in the scikit-learn implementation.
  :param beta0_gen: A generator used to determine the starting values of
    coefficients. Should include .generate(n: int) method, returning a numpy
    array of length n filled with floats.
    If None, all coefficients will be initialized to zeros.
  :param max_iterations: The maximum number of iterations the algorhithm will
    perform before stopping and proposing a solution.
    By default, 100 in accordance to scikit-learn implementation.
  :param min_step_norm: The minimum value for the euclidian norm of the change
    of a parameter vector in a single step. If the difference between
    iterations falls below that number, the algorhithm will stop and propose
    a solution.
  :param max_time: The maximum time the procedure can run in seconds. Once
    exceeded, the iterating will stop and the solution will be proposed.
  :param check_data: If True, the format of data and answers will be examined
    prior to running the algorhithm.

  :return: A numpy array containing the proposed coefficients and a dictionary,
    labeling said coefficients, a list containing the coefficients after each
    iteration and a list containing the values of the log-likelihood function
    after each iteration.
  """
  ### Ensuring the correct dimensionality of the data.
  if check_data:
    status, message = check_data(data, answers)
    assert status, message
  assert len(data) == len(answers), "For every data point, there has to be a correct class specified.\n"
  n = len(data)

  ### Filling up the default values of parameters.
  if additional_interactions is None:
    additional_interactions = []
  if relevant_variables is None:
    relevant_variables = np.arange(len(data[0]))

  ### Constructing the experiment matrix and labels for it.
  Y = np.array(answers)
  X = []
  labels = []
  for index in relevant_variables:
    X.append(np.array(data[:,index]).astype(float))
    labels.append("X"+str(index))

  for index1, index2 in additional_interactions:
    X.append(np.array(data[:,index1]).astype(float) * np.array(data[:,index2]).astype(float))
    labels.append("X"+str(index1)+"X"+str(index2))

  if intercept:
    X.append(np.ones(n))
    labels.append("intercept")

  X = np.column_stack(X)
  p = len(labels)
  # If the penalty is the l2_reg times the sum of squares of coefficients, this
  # is the matrix of the second order derivatives with respect to the coefs.
  penalty_hessian = 2 * l2_reg * np.eye(p)

  ### Initializing coefficients.
  if beta0_gen is None:
    beta = np.zeros(p)
  else:
    beta = beta0_gen.generate(p)

  start = time.time()
  beta_hist = []
  loglike_hist = []
  beta_hist.append(beta)
  loglike_hist.append(calc_loglike(X, Y, beta))
  ### Iterating the main algorhithm.
  for _ in range(max_iterations):
    P = X @ beta
    P = np.exp(P) / (np.exp(P) + 1)
    W = np.diag(P * (1 - P))

    # deriv is the derivative of the minus log-like + penalty with respect to beta
    deriv =  X.transpose() @ (P-Y) + 2 * l2_reg * beta
    # hessian is the matrix of second order derivatives of the previously mentioned function
    hessian = X.transpose() @ W @ X + penalty_hessian

    # In order to avoid numerical complexity of the matrix inversion,
    # new beta is defined as a solution to a linear equation.
    beta_new = np.linalg.solve(hessian, hessian @ beta - deriv)

    diff = beta - beta_new
    diff_norm = np.sqrt(diff @ diff)
    beta = beta_new

    beta_hist.append(beta)
    loglike_hist.append(calc_loglike(X, Y, beta))

    if diff_norm < min_step_norm:
      break
    curr = time.time()
    if curr - start > max_time:
      break

  ### Creating the coefficient dictionary.
  beta_dict = {}
  for i in range(p):
    beta_dict[labels[i]] = beta[i]

  return beta, beta_dict, beta_hist, loglike_hist

def predict_proba(data, beta, intercept: bool = True, relevant_variables = None,
                  additional_interactions = None):
  """
  Predicts the probabilities of a given data points belonging to class 1,
  assuming that the parameters of the model are known.
  :param data: the datapoints the class of which you want to predict.
  :param beta: a numpy array containing the model parameters in order: column
    coefficients, product coefficients (if any additional interactions were
    involved), intercept (if it was included).
  :param intercept: choose True if model has been built with intercept in mind.
  :param relevant_variables: A list of indices of columns on which the model
    was built. If None, all columns are considered.
  :param additional_interactions: A collection of pairs of indices, indicating
    which column element-wise products were used for building the model.
    If None, no such variables will be considered.
  :return: a numpy array containing the probability of belonging to class 1
    for each observation.
  """
  ### Filling up the default values of parameters.
  if additional_interactions is None:
    additional_interactions = []
  if relevant_variables is None:
    relevant_variables = np.arange(len(data[0]))

  ### Constructing the data matrix.
  n = len(data)
  X = []
  for index in relevant_variables:
    X.append(np.array(data[:,index]).astype(float))

  for index1, index2 in additional_interactions:
    X.append(np.array(data[:,index1]).astype(float) * np.array(data[:,index2]).astype(float))

  if intercept:
    X.append(np.ones(n))

  X = np.column_stack(X)

  exp = np.exp(X @ beta)
  return exp/(1+exp)

def predict(data, beta, intercept: bool = True, relevant_variables = None,
            additional_interactions = None, threshold: float = 0.5):
  """
  A classification based on the 'predict_proba' method output.
  """
  probas = predict_proba(data, beta, intercept=intercept,
                         relevant_variables=relevant_variables,
                         additional_interactions=additional_interactions)
  return (probas > threshold).astype(int)

**Datasets**

In [None]:
!pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.6-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.6


In [7]:
from ucimlrepo import fetch_ucirepo
import pandas as pd
import heapq
from sklearn.linear_model import LinearRegression

In [8]:
def test_collinearity(columns, indices, threshold = 1e-10):
  """
  Helper function, testing whether a certain subset of columns is collinear.
  :param columns: the whole set of columns.
  :param indices: indices belonging to the subset.
  :param threshold: the value of determinant, going below which will be
    considered being numerically collinear.
  :return: True if collinear, False otherwise.
  """
  used_columns = []
  for index in indices:
    used_columns.append(columns[index])

  X = np.column_stack(used_columns)
  XX = X.transpose() @ X
  if np.linalg.det(XX) < threshold:
    return True
  else:
    return False

def remove_collinear(X):
  """
  Removes the minimum number of columns to ensure the result matrix will be
  full rank.
  :param X: a numpy matrix one needs a non-collinear version of.
  :return: a numpy matrix with collinearities removed and a set containing
    indices of removed columns.
  """
  columns = []
  p = len(X[0])
  for i in range(p):
    columns.append(X[:,i])

  columns_used = []
  columns_stashed = set()
  columns_removed = set()
  for i in range(p):
    columns_used.append(i)

  heapq.heapify(columns_used)

  last_removed = -1
  while(True):
    if len(columns_used) == 0:
      break

    if test_collinearity(columns, columns_used):
      last_removed = heapq.heappop(columns_used)
      columns_stashed.add(last_removed)
    else:
      if last_removed == -1:
        # If the whole remaining subset is non-collinear, it's time to stop.

        break
      else:
        # If removing a certain column made the subset non-collinear, it means
        # that this column is a good candidate for removal.

        columns_stashed.remove(last_removed)
        columns_removed.add(last_removed)

        # Returning stashed away columns back to the subset.
        for index in columns_stashed:
          columns_used.append(index)
        heapq.heapify(columns_used)
        columns_stashed.clear()
        last_removed = -1

  # Recreating the matrix
  is_used = [False for i in range(p)]
  for index in columns_used:
    is_used[index] = True

  columns_used = []
  for i in range(p):
    if is_used[i]:
      columns_used.append(columns[i])

  X_clean = np.column_stack(columns_used)
  return X_clean, columns_removed


In [None]:
def fetch_heart_disease():
  # fetch dataset
  data_heart_disease = fetch_ucirepo(id=45)

  # data (as pandas dataframes)
  X = data_heart_disease.data.features
  y = data_heart_disease.data.targets

  # removing missing variables
  X = X.dropna()
  y = y.loc[X.index]

  # one-hot-encoding the detected multi-valued categorical variables
  X = pd.get_dummies(X, columns=['cp', 'restecg', 'slope', 'thal'],
                     drop_first=True, dtype=int)

  # changing the format to numpy arrays
  # (flattening is necessary for y as .values isn't smart enough to notice that
  # y had only one column)
  X = X.values
  y = y.values.flatten()

  # mapping the answers to {0,1}
  y = (y==0)
  y = y.astype(int)

  # removing collinearities
  X, _ = remove_collinear(X)

  return X, y

def fetch_parkinsons():
  # fetch dataset
  data_parkinsons = fetch_ucirepo(id=174)

  # data (as pandas dataframes)
  X = data_parkinsons.data.features
  y = data_parkinsons.data.targets

  # changing the format to numpy arrays
  # (flattening is necessary for y as .values isn't smart enough to notice that
  # y had only one column)
  X = X.values
  y = y.values.flatten()

  # removing collinearities
  X, _ = remove_collinear(X)

  return X, y

def fetch_hcv():
  # fetch dataset
  data_hcv = fetch_ucirepo(id=571)

  # data (as pandas dataframes)
  X = data_hcv.data.features
  y = data_hcv.data.targets

  # mapping the 'Sex' column to numeric values
  X.loc[:,'Sex'] = X['Sex'].map({'m': 0, 'f': 1})
  # mapping the target variable to {0,1}
  y.loc[:,'Category'] = y['Category'].map({'0=Blood Donor': 0,
                                           '0s=suspect Blood Donor': 0,
                                           '1=Hepatitis': 1, '2=Fibrosis': 1,
                                           '3=Cirrhosis': 1})

  # removing rare missing values
  X = X.dropna(subset=['ALB','PROT','ALT'])
  y = y.loc[X.index]

  # Regressing for the remaining missing values
  X_for_lr = X.dropna()
  y_for_lr1 = X_for_lr['ALP'].values
  y_for_lr2 = X_for_lr['CHOL'].values
  X_for_lr = X_for_lr.drop(columns=['ALP','CHOL']).values

  lr = LinearRegression()

  lr.fit(X_for_lr, y_for_lr1)
  ALP_missing = X[X['ALP'].isna()]
  data_for_ALP_predicting = ALP_missing.drop(columns=['ALP', 'CHOL']).values
  ALP_predictions = lr.predict(data_for_ALP_predicting)
  X.loc[X['ALP'].isna(), 'ALP'] = ALP_predictions

  lr.fit(X_for_lr, y_for_lr2)
  CHOL_missing = X[X['CHOL'].isna()]
  data_for_CHOL_predicting = CHOL_missing.drop(columns=['ALP', 'CHOL']).values
  CHOL_predictions = lr.predict(data_for_CHOL_predicting)
  X.loc[X['CHOL'].isna(), 'CHOL'] = CHOL_predictions

  # changing the format to numpy arrays
  # (flattening is necessary for y as .values isn't smart enough to notice that
  # y had only one column)
  X = X.values
  y = y.values.flatten()

  # removing collinearities
  X, _ = remove_collinear(X)

  return X, y

In [None]:
from sklearn.datasets import load_breast_cancer

def fetch_sonar():
  # fetch dataset
  connectionist_bench_sonar_mines_vs_rocks = fetch_ucirepo(id=151)

  # data (as pandas dataframes)
  X = connectionist_bench_sonar_mines_vs_rocks.data.features
  y = connectionist_bench_sonar_mines_vs_rocks.data.targets

  #Mapping target classes R - rock to 0, M - mine to 1
  y['class'] = y['class'].replace({'R': 0, 'M': 1})
  target_counts = y['class'].value_counts()

  X = X.values
  y = np.asarray(y)
  y = y.flatten()
  return X, y

def fetch_breast_cancer():
  data = load_breast_cancer()
  X = data['data']
  y = data.target

  X, _ = remove_collinear(X)
  return X, y

def fetch_ionsphere():
  # fetch dataset
  ionosphere = fetch_ucirepo(id=52)

  # data (as pandas dataframes)
  X = ionosphere.data.features
  y = ionosphere.data.targets

  #Mapping target classes b - bad to 0, g - good to 1
  y['Class'] = y['Class'].replace({'b': 0, 'g': 1})

  X = X.values
  y = np.asarray(y)
  y = y.flatten()
  return X, y

In [5]:
def fetch_diabetes():
  df = pd.read_csv("diabetes.csv")
  X = df.values
  y = X[:,-1]
  X = X[:,:-1]

  X, _ = remove_collinear(X)
  return X, y

def fetch_fraud_detection():
  df = pd.read_csv("fraud_detection_dataset.csv")
  X = df[['amount', 'age', 'income', 'debt', 'credit_score']]
  y = df['is_fraud']

  X = X.values
  y = y.values.flatten()

  X, _ = remove_collinear(X)
  return X, y

def fetch_banknote_authentication():
  df = pd.read_csv("data_banknote_authentication.txt", header=None)

  X = df.values
  y = X[:,-1]
  X = X[:,:-1]

  X, _ = remove_collinear(X)
  return X, y

**Testing functions**

In [3]:
from sklearn.metrics import balanced_accuracy_score
import matplotlib.pyplot as plt

In [4]:
def generate_interactions(n: int):
  result = []
  for i in range(n):
    for j in range(i+1,n):
      result.append((i,j))
  return result

In [2]:
def test_with_cv(X, y, count=5, include_interactions=False):
  batches_X = [[] for i in range(count)]
  batches_y = [[] for i in range(count)]
  for i in range(len(y)):
    batches_X[i % count].append(X[i])
    batches_y[i % count].append(y[i])

  interactions = None
  if include_interactions:
    interactions = generate_interactions(len(X[0]))

  results = {}
  results['acc'] = []
  results['beta_hists'] = []
  results['loglike_hists'] = []
  for k in range(count):
    X_train = []
    y_train = []
    for j in range(k+1, k+count):
      index = j % count
      for i in range(len(batches_y[index])):
        X_train.append(batches_X[index][i])
        y_train.append(batches_y[index][i])

    X_train = np.row_stack(X_train)
    y_train = np.array(y_train)

    X_test = np.row_stack(batches_X[k])
    y_test = np.array(batches_y[k])


    beta, _, beta_hist, loglike_hist = fit_with_IWLS(X_train, y_train, intercept=True, additional_interactions=interactions)

    results['beta_hists'].append(beta_hist)
    results['loglike_hists'].append(loglike_hist)

    y_preds = predict(X_test, beta, intercept=True, additional_interactions=interactions)
    results['acc'].append(balanced_accuracy_score(y_test, y_preds))

  return results

In [1]:
def calc_avg_loglike(results):
  min_length = 500
  for history in results['loglike_hists']:
    if len(history) < min_length:
      min_length = len(history)

  result = [0 for i in range(min_length)]
  for i in range(min_length):
    for history in results['loglike_hists']:
      result[i] += history[i]
    result[i] /= len(results['loglike_hists'])
  return result

**Testing**

In [59]:
X, y = fetch_heart_disease()
results = test_with_cv(X, y)

In [60]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.8665183537263625, 0.7767857142857143, 0.846551724137931, 0.9288194444444444, 0.7827380952380952]
avg acc:  0.8402826663665095
avg-log-like:  [-164.691770101043, -90.87104779347564, -80.85068442189352, -78.90350565105332, -78.70219934604549, -78.6973602395561, -78.69735526662458]


In [61]:
X, y = fetch_parkinsons()
results = test_with_cv(X, y)

In [63]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.7272727272727273, 0.6888888888888889, 0.8333333333333333, 0.6827586206896552, 0.7611111111111111]
avg acc:  0.7386729362591431
avg log-like:  [-108.13096016735149, -61.364635149873436, -54.433155656471875, -52.92259525667531, -52.81262995626339, -52.811891772692114]


In [64]:
X, y = fetch_hcv()
results = test_with_cv(X, y)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X.loc[:,'Sex'] = X['Sex'].map({'m': 0, 'f': 1})
  X.loc[:,'Sex'] = X['Sex'].map({'m': 0, 'f': 1})
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  y.loc[:,'Category'] = y['Category'].map({'0=Blood Donor': 0,
  y.loc[:,'Category'] = y['Category'].map({'0=Blood Donor': 0,


In [65]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.8287037037037037, 0.9287037037037037, 0.9239417989417988, 0.7764550264550265, 0.8121693121693122]
avg acc:  0.853994708994709
avg log-like:  [-339.36485960214924, -116.48502158269687, -78.5616628012908, -64.00660117348124, -58.802705602141394, -57.728210296601176, -57.64189004251489, -57.63915586902315, -57.6391402058686]


In [66]:
X, y = fetch_sonar()
results = test_with_cv(X, y)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  y['class'] = y['class'].replace({'R': 0, 'M': 1})


In [67]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.8772727272727273, 0.8772727272727273, 0.7242562929061784, 0.7248803827751196, 0.6949760765550239]
avg acc:  0.7797316413563553
avg log-like:  [-115.33969084517491, -83.96401543701369, -81.89783083413846, -81.83692015062695, -81.8368518968259]


In [68]:
X, y = fetch_breast_cancer()
results = test_with_cv(X, y)

In [69]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.922972972972973, 0.9144736842105263, 0.974375, 0.9315476190476191, 0.9404761904761905]
avg acc:  0.9367690933414619
avg log-like:  [-315.52059659088707, -129.92649976260924, -88.71729988525442, -66.78235058622298, -52.83391369037097, -46.70816255411982, -45.25572876124456, -45.06471234608607, -45.05538263220019, -45.05534573241543]


In [70]:
X, y = fetch_ionsphere()
results = test_with_cv(X, y)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  y['Class'] = y['Class'].replace({'b': 0, 'g': 1})


In [71]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.7743589743589744, 0.8088888888888889, 0.88, 0.8347902097902098, 0.75]
avg acc:  0.8096076146076147
avg log-like:  [-194.63572830123266, -94.58296980448672, -79.53875600116562, -76.81387647497023, -76.64295466219465, -76.64151419978084, -76.64151390763423]


In [72]:
X, y = fetch_diabetes()
results = test_with_cv(X, y)

In [73]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.7772988505747127, 0.7002551020408163, 0.6770833333333334, 0.7095392231530846, 0.646236559139785]
avg acc:  0.7020826136483465
avg log-like:  [-425.86962773603045, -312.21558454230745, -304.75721366088936, -304.42429951200586, -304.4225206774464]


In [76]:
X, y = fetch_diabetes()
results = test_with_cv(X, y, include_interactions=True)

In [77]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.7564655172413793, 0.721938775510204, 0.7529761904761905, 0.6853579588728104, 0.6771505376344086]
avg acc:  0.7187777959469985
avg log-like:  [-425.86962773603045, -286.9877443055924, -272.70018214198075, -270.94175795302266, -270.87295644205676, -270.8721976942828]


In [None]:
# No RAM for this
X, y = fetch_fraud_detection()
results = test_with_cv(X, y)

In [9]:
X, y = fetch_banknote_authentication()
results = test_with_cv(X, y)

In [10]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [0.9828297439194257, 0.9828297439194257, 0.9967105263157895, 0.9959016393442623, 0.9926121656600517]
avg acc:  0.990176763831791
avg log-like:  [-760.798345382596, -215.58025733631357, -111.44024578605612, -69.21652565770437, -49.60424019985176, -39.72051077098813, -35.374569015704495, -34.305654412693, -34.23834422174868, -34.238080498187266]


In [11]:
X, y = fetch_banknote_authentication()
results = test_with_cv(X, y, include_interactions=True)

In [12]:
print("acc: ", results['acc'])
print("avg acc: ", np.mean(results['acc']))
print("avg log-like: ", calc_avg_loglike(results))

acc:  [1.0, 1.0, 1.0, 1.0, 1.0]
avg acc:  1.0
avg log-like:  [-760.798345382596, -192.29705942063072, -89.68717565005963, -47.51610743630963, -26.366779412160575, -16.114835293085797, -11.98423249518537, -10.48226496232719, -10.016626682581864, -9.94321156603337, -9.937434670455422, -9.93738756385228]
