In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from typing import Callable

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

np.random.seed(42)

In [2]:
def sample_next_state(index: int, P: np.ndarray) -> int:
    probs = P[index]

    next = np.random.choice(range(P.shape[1]), p=probs)

    return int(next)

In [3]:
def td_loop(
        n_iter: int, 
        X: np.ndarray, 
        y: np.ndarray, 
        P: np.ndarray,
        link: Callable[[np.ndarray], np.ndarray], 
        inv_link: Callable[[np.ndarray], np.ndarray], 
        gamma: float, 
        alpha: float,
        epsilon: float,
    ) -> np.ndarray:
    n_samples, n_features = X.shape

    w = np.zeros(n_features)

    curr_index = int(np.random.randint(n_samples))
    curr_x = X[curr_index]
    curr_y = y[curr_index]

    i = 0
    grad = np.ones_like(w) * np.inf

    while i < n_iter and np.linalg.norm(alpha * grad, 2) > epsilon:
        # Next state samples
        next_index = sample_next_state(index=curr_index, P=P)
        next_x = X[next_index]
        next_y = y[next_index]

        # Find predictions
        curr_z = np.dot(curr_x, w)
        next_z = np.dot(next_x, w)

        # Find rewards
        r = inv_link(curr_y) - gamma * inv_link(next_y)

        # TD target
        z_t = r + gamma * next_z

        # Find gradient
        grad = (link(curr_z) - link(z_t)) * curr_x

        # Update weights
        w -= alpha * grad
        
        # Update state and index
        curr_index, curr_x, curr_y = next_index, next_x, next_y
        i += 1

    return w

In [6]:
housedata = np.loadtxt('data\\readyhousedata.txt', delimiter=',')

X = housedata[:, :-1]
y = housedata[:, -1]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

reg = LinearRegression() 
reg.fit(X_train, y_train)

weights = reg.coef_
intercept = reg.intercept_

print(weights)
print("Intercept:", intercept)

score = reg.score(X_test, y_test)
print("Model R^2 score:", score)

[-10.99366542   5.00578355   0.63581661   2.44872684 -10.31186138
  17.51763825   1.03114679 -17.77574728   7.34087816  -6.42763345
  -8.79966234   3.03563353 -20.51935608]
Intercept: 29.04431567479631
Model R^2 score: 0.7730570034661727


In [7]:
num_samples = X_train.shape[0]
P = np.ones((num_samples, num_samples)) / num_samples # Equal probability to move to any state

alpha = 0.01  # Learning rate
gamma = 0   # Discount factor
num_iterations = 1e6  # Number of iterations
epsilon = 1e-9

w_hat_house = td_loop(
    n_iter=num_iterations,
    X=X_train,
    y=y_train,
    P=P,
    link=lambda x : x,
    inv_link=lambda x : x,
    gamma=gamma,
    alpha=alpha,
    epsilon=epsilon,
)

print(w_hat_house)

[ -8.53979376   5.54501021   4.91634298   2.22193504  -4.23974712
  36.36392932   3.46280445  -5.98104928   6.59621793  -5.85474065
  -3.89650886  10.11729015 -11.9291673 ]


In [24]:
def mini_batch_sgd(X: np.ndarray, y: np.ndarray, learning_rate: float, n_iter: int, epsilon: float, batch_size: int) -> np.ndarray:
    """
    Performs Mini-Batch Stochastic Gradient Descent (SGD) for linear regression with shuffling.
    
    Parameters:
    X (np.ndarray): Feature matrix (n_samples, n_features).
    y (np.ndarray): Target vector (n_samples,).
    learning_rate (float): Step size for updating weights.
    n_iter (int): Number of iterations (epochs).
    epsilon (float): Convergence threshold.
    batch_size (int): Size of the mini-batches.
    
    Returns:
    np.ndarray: The learned weights.
    """
    n_samples, n_features = X.shape
    # Add bias term to the feature matrix
    X_bias = np.c_[np.ones(n_samples), X]  # Adds a column of ones for the bias term

    # Initialize weights to zeros
    weights = np.zeros(n_features + 1)

    for epoch in range(int(n_iter)):
        # Step 1: Shuffle the data at the beginning of each epoch
        indices = np.arange(n_samples)
        np.random.shuffle(indices)
        X_bias_shuffled = X_bias[indices]
        y_shuffled = y[indices]

        for i in range(0, n_samples, batch_size):
            # Step 2: Select the mini-batch
            X_batch = X_bias_shuffled[i:i + batch_size]
            y_batch = y_shuffled[i:i + batch_size]
            
            # Initialize gradients to zero
            gradient = np.zeros_like(weights)
            
            # Step 3: Compute the gradient over the mini-batch
            for j in range(X_batch.shape[0]):
                # Prediction
                prediction = np.dot(X_batch[j], weights)
                error = y_batch[j] - prediction
                
                # Update gradient
                gradient += -2 * X_batch[j] * error

            # Step 4: Update the weights
            weights -= learning_rate * gradient / batch_size

        # Optional: Check for convergence (if gradient is small enough)
        if np.linalg.norm(gradient) < epsilon:
            print(f"Converged after {epoch + 1} epochs")
            break

    return weights

# Hyperparameters
learning_rate = 0.01
n_iter = 1e4  # Number of epochs
epsilon = 1e-6  # Convergence threshold
batch_size = 16  # Mini-batch size

# Run SGD manually
weights_sgd = mini_batch_sgd(X_train, y_train, learning_rate, n_iter, epsilon, batch_size)

# Print the learned weights
print("Weights learned using SGD:\n", weights_sgd[1:])


Weights learned using SGD:
 [-10.99123084   5.0108542    0.63526613   2.44703682 -10.31842922
  17.498233     1.03114665 -17.7785983    7.33932725  -6.43290669
  -8.8054347    3.03159724 -20.52845324]


In [26]:
pred_TD = np.dot(X_test, w_hat_house)
pred_L2 = reg.predict(X_test)
pred_sgd = np.dot(X_test, weights_sgd[1:]) + weights_sgd[0]

rmse_TD = np.sqrt(mean_squared_error(y_test, pred_TD))
rmse_L2 = np.sqrt(mean_squared_error(y_test, pred_L2))
rmse_sgd = np.sqrt(mean_squared_error(y_test, pred_sgd))

print("RMSE on the test set using TD:", rmse_TD)
print("RMSE on the test set using L2 Regression:", rmse_L2)
print("RMSE on the test set using SGD:", rmse_sgd)
print("---------------")
print("Norm of weights from TD:", np.linalg.norm(w_hat_house, 2))
print("Norm of weights from L2 Regression:", np.linalg.norm(weights, 2))
print("Norm of weights from SGD:", np.linalg.norm(weights_sgd[1:], 2))
print("Norm of difference in weights for L2:", np.linalg.norm(weights - w_hat_house, 2))
print("Norm of difference in weights for sgd:", np.linalg.norm(w_hat_house - weights_sgd[1:], 2))


RMSE on the test set using TD: 4.65389069063977
RMSE on the test set using L2 Regression: 4.154652986817322
RMSE on the test set using SGD: 4.153674732376023
---------------
Norm of weights from TD: 43.11024087417123
Norm of weights from L2 Regression: 38.54163631701667
Norm of weights from SGD: 38.54217165140593
Norm of difference in weights for L2: 26.65769219399023
Norm of difference in weights for sgd: 26.679054613373875
