In [58]:
import numpy as np
from scipy.optimize import linprog
from sklearn.linear_model import QuantileRegressor

In [62]:
X = np.c_[np.ones(100), np.random.normal(0, 5, size=(100, 5))]
beta = np.array([10, 1, 2, 3, 4, 5])
y = X @ beta + np.random.normal(0, 1, 100)

In [110]:
class QuantileRegression:
    def __init__(self, theta, verbose=False):
        self.theta = theta # theta-quantile
        self.verbose = verbose # to print the loss
        
    def fit(self, X, y, learning_rate=0.001, tol=1e-10):
        self.n, self.p = X.shape
        self.params = np.random.uniform(0, 1, self.p)
        
        # initialize the params for gradient descent
        self.tol = tol
        self.learning_rate = learning_rate
        
        self.gradient_descent(X, y)
    
    def loss(self, y, y_hat):
        # sum of absolute error
        dev = y - y_hat
        return np.sum(np.maximum(self.theta * dev, (self.theta - 1) * dev))
    
    def gradient_descent(self, X, y):
        y_hat = X @ self.params
        loss = self.loss(y, y_hat)

        while True:
            # update params
            grads = self.gradient(X, y, self.params)
            self.params -= grads * self.learning_rate
            
            # print loss
            y_hat = X @ self.params
            current_loss = self.loss(y, y_hat)
            if loss - current_loss < self.tol:
                break
            else:
                loss = current_loss
                
    def gradient(self, X, y, params):
        y_hat = X @ self.params
        dev = y - y_hat
        grads = np.zeros(self.p)
        
        # this gradient for sum(|y - xb|) only
        for i in range(len(grads)):
            xbeta = X[:, i] * self.params[i]
            xbeta = np.where(abs(xbeta) < 1, xbeta, np.sign(xbeta))
            grads[i] = np.sum(np.where(dev > 0,
                                       self.theta * -xbeta, 
                                       (self.theta-1) * -xbeta))
        return grads
    
    def recursively_fit(self, returns, learning_rate=0.001, tol=1e-10):
        # initialize the params for gradient descent
        self.tol = tol
        self.learning_rate = learning_rate
        self.p = 4
        self.params = np.random.uniform(0, 1, 4)
        loss = np.inf
        while True:
            sigmas = self.asymmetric_slope(returns, self.params)
            constant_term = np.ones_like(returns)
            X = np.c_[constant_term, sigmas[:-1], np.maximum(returns, 0), np.minimum(returns, 0)]
            y = sigmas[1:]
            
            # update params
            grads = self.gradient(X, y, self.params)
            self.params -= grads * self.learning_rate
            
            # print loss
            y_hat = X @ self.params
            current_loss = self.loss(y, y_hat)
            if loss - current_loss < self.tol:
                break
            else:
                loss = current_loss
        

    def asymmetric_slope(self, returns, params):
        b1, b2, b3, b4 = params
        sigmas = np.zeros(returns.shape[0]+1)
        sigmas[0] = np.std(returns) # im not sure
        for t in range(1, len(sigmas)):
            sigmas[t] = b1 + b2 * sigmas[t-1] + max(b3 * returns[t-1], 0) + b4 * min(b3 * returns[t-1], 0)
        return sigmas

In [111]:
qr = QuantileRegression(0.5)

In [112]:
qr.recursively_fit(np.array(range(10)))

In [113]:
qr.params

array([0.56951569, 0.60021941, 0.36394334, 0.51556232])