In [1]:
# import module
import m as a

In [1]:
# import library
import numpy as np
import matplotlib.pyplot as plt

import pandas as pd
import pandas_datareader as web

from datetime import datetime

# sklearn libraries
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import shuffle
from sklearn.utils.multiclass import unique_labels
from sklearn.svm import SVR
from sklearn.model_selection import (train_test_split,
                                     cross_validate,
                                     GridSearchCV
                                    )
from sklearn.metrics import (r2_score, 
                             mean_squared_error
                            )
from sklearn.utils.estimator_checks import check_estimator
from sklearn.utils.validation import (check_array, 
                                      check_is_fitted, 
                                      check_X_y,
                                      _check_sample_weight
                                     )

from sklearn.base import BaseEstimator, TransformerMixin

In [None]:
class SupportVectorReg(BaseEstimator, TransformerMixin):
    """
    This is an example of the support vector machine regression class written from scratch.
    """
    test_size = 0.3
    random_state = 42
    missing_vals = 10
    
    def __init__(self,
                 X:pd.DataFrame,
                 y:list,
                 base_regressor = SVR
                 kernel:str = None,
                 degree:int = 3,
                 gamma:float = 1.0,
                 coef0:float = 0.0,
                 C:float = 1.0,
                 epsilon:float = 0.1,
                 verbose:bool = False,
                 max_iter:int = 1000):
        
        BaseEstimator.__init__(self)
        TransformerMixin.__init__(self)
        
        self.base_regressor = base_regressor()
        
        
        
        self.kernel = self.fkernel(kernel, **kwargs)
        self.degree = degree
        self.gamma = gamma # specified further in fit method
        self.coef0 = coef0
        self.epsilon = epsilon
        self.verbose = verbose
        self.max_iter = max_iter
        
        if self.C is None:
            self.C = C
        else:

        # model parameters set to None (I have to come up with them later)
        self.X = X
        self.y = y
        self.coef_ = None
        self.intercept_ = None

        
        
    def fkernel(self, kernel:str = None, **kwargs):
        
        if not kernel:
            pass
        
        if kernel:
            if kernel == "linear":
                def linear_kernel():
                    return lambda X,y: np.dot(X,y.T)
        
            if kernel == "poly":
                def _polynomial_kernel(bias = 0, power = 2):
                    return lambda X,y: (self.gamma * np.dot(X,y)+bias)**power
        
            if kernel == "rbf":
                def _rbf_kernel():
                    return lambda X_i,y_i: np.exp(-self.gamma * np.dot(X_i-y_i, X_i-y_i))  
        
        
        kernel_mapping = {
            "linear": _linear_kernel,
            "poly": _polynomial_kernel,
            "rbf": _rbf_kernel
        }
            
    
        if kernel not in kernel_mapping.keys():
            raise ValueError(
                "The provided kernel {} is invalid. Choose from the following {}.".format(
                    kernel, list(kernel_mapping.keys())
                )
            )
    
        return 
      
        
    def decorator_function(original_function):
        def wrapper_function(*args, **kwargs):
            print("wrapper executed this before {}".format(original_function))
            return original_function(*args, **kwargs)
        return wrapper_function
    
    original_function = decorator_function(original_function)
    original_function()   
    
    
    @decorator_function
    def create_Xy(self, X:float, y:float): # tells the reader that it should be float
        """
        The below function sets the dependent and independent variables.
        """
        
        X = np.array(
            t.bollinger_bands()[
                ["Bollinger Bands Lower", "Bollinger Bands Middle", "Bollinger Bands Upper"]
                                ]).reshape(-1,3)
        
        y = np.array(ticker["Close"]).reshape(len(ticker["Close"]),1) # or -1   

        
    @decorator_function
    def feature_scaling(self):
        """
        The following function scales the data. Step 2: Data pre-processing. 
        """
        scaler = MinMaxScaler(feature_range=(0, 1))
        
        X_train = scaler.fit_transform(X_train)
        y_train = scaler.fit_transform(y_train)
        
        
    def train_test_split(self, X:np.ndarray, y:np.ndarray):    
        """
        The following function splits the data into train and test set. Step 3: feature engineering. 
        """
        X = X.values.reshape(-1,1) # -1 unknown dimension and we want numpy to figure it out, 1 column and numpy figures out the number of rows
        y = y.values.reshape(-1,1)
        
        # reshape must be done before train test split so that X,y are passed in the format required (1 col and many rows)
        X_train, X_test, y_train, y_test = train_test_split(X, 
                                                            y, 
                                                            test_size = SupportVectorRegression.test_size, 
                                                            random_state = SupportVectorRegression.random_state,
                                                            stratify = y
                                                           )
        # Setting ‘stratify’ to y makes our training split represent the proportion of each value in the y variable. 
        # For example, in our dataset, if 25% of patients have diabetes and 75% don’t have diabetes, setting ‘stratify’ to y will ensure 
        # that the random split has 25% of patients with diabetes and 75% of patients without diabetes.

        return X_train, X_test, y_train, y_test
    
    
    def fit(self, X_train:np.ndarray, y_train:np.ndarray):
        """
        The below function fits the model on the training data. Step 4: fit the model. 
        """
        if y is None:
            raise ValueError("target variable not defined")
        
        
        X_train, y_train = train_test_split()
        
        y_ = np.where(y <= 0, -1,1)
        
        SupportVectorReg = SVR(kernel = self.kernel,
                               degree = self.degree,
                               gamma = self.gamma,
                               coef0 = self.coef0,
                               tol = 0.001,
                               C = self.C,
                               epsilon = self.epsilon,
                               verbose = False,
                               max_iter = -1 
                              )
        
        SupportVectorReg.fit(X_train, y_train)
        
        
        self.base_regressor.fit(X,y)
        
        
        if self.fit_intercept:
            self.intercept_ = coef0
            self.coef_ = coefs[1:]
        else:
            self.intercept_ = 0
            self.coef_ = coefs
        
        
        n_samples, n_features = X.shape
        
        # if gamma is not specified in init, it is specified as below
        if not self.gamma:
            self.gamma = 1/(n_features*X.var())
        
        
        
        
        return self
        
        
        

    def predict(self, X_test:np.array, y = None):
        if not y:
            
            check_is_fitted(self) # checks trailing underscore         
            X = check_array(X)
            y_pred = self.base_regressor.predict(X_test)
        
        return y_pred
        
    def evaluation(self):
        """
        The following function evaluates how well the model performs on the test data. Step 6: model evaluation. 
        """
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        return rmse
    
    def check(self):
        X = check_array(X)
        if X.shape[1] != self.n_features:
            raise ValueError("{} != {}".format(X.shape[1], self.n_features))
            
    def f(self):
        for i, n in range(self.max_iter):
        
        pass
    
    def missing_val(self, X, verbose = True):
        missing_val = [len(isnull(X[:,i])) for i in range(X.shape[1])]
        if verbose:
            print("Missing vals %s" % missing_val)
        
        delete = [i for i,v in enumerate(missing_val) if v >= SupportVectorRegression.missing_vals]
        
        if verbose:
            print("Deleting %s" % delete)
        
        if len(delete) is 0:
            return X
# select all columns except the ones that have missing values

    notMisVals = [i for i in range(X.shape[1]) if i not in delete]
    result = X[:, notMisVals]
    
    
    
    def get_params(self, deep = True):
        """
        The below function returns parameter values.
        """
        return {
            "C": self.C,
            "kernel": self.kernel
            "epsilon": self.epsilon
        }
    
    def set_params(self, **params):
        for param, val in params.items():
            setattr(self, param, val)
        return self    

In [None]:
# load data and test abstract class
if __name__ == "__main__":
    ticker = "ES=F" #snp500 futures
    end = datetime.today()
    start = datetime(end.year-1, end.month, end.day)
    ticker = web.DataReader(ticker, "yahoo", start, end)
    
    X = 
    y = 
    
    model = SupportVectorRegression()
    model.fit()
    model.predict()

In [None]:
# t = a.TechnicalIndicators(ticker["Close"], ticker["High"], ticker["Low"], ticker["Volume"])

# t = a.TechnicalIndicators(ticker["Close"], ticker["High"], ticker["Low"], ticker["Volume"])
# df = pd.DataFrame(t)

# t.set_technical_indicators()
# t.rsi()
# t.rate_change()
# t.stochastic_oscillator()
# t.WilliamsR()

# t.macd()
# t.ema()
# t.wma()
# t.moving_average()

# t.bollinger_bands()
# t.average_true_range()

# t.on_balance_volume()
# t.money_flow_index()

In [None]:
np.array(ticker["Close"]).reshape(len(ticker["Close"]),1)

In [None]:
# print("C: {}".format(fitted_svr_model.C))
#     print("Epsilon: {}".format(fitted_svr_model.epsilon))
    
#     sc_X = StandardScaler()
# sc_y = StandardScaler()
# X_train = sc_X.fit_transform(X_train)
# y_train = sc_y.fit_transform(y_train)

# X_test = sc_X.fit_transform(X_test)
# y_test = sc_y.fit_transform(y_test)

# np.dot

# """
#         Parameters:
#         - kernel:   string indicating kernel "linear", "poly", "rbf", default "rbf" see above
#                     transforms train data so that data that cannot be linearly separated 
#                     are now linearly separated in a higher number of dimension spaces
#         - degree:
#         - gamma:
#         - coef0:
#         - C:
#         - epsilon: The value defines a margin of tolerance where no penalty is given to errors. 
#                    The larger it is, the larger errors you admit in your solution.
#         - verbose: output messages, useful for debugging or understading how the training performs
#         - max_iter: -1 for no limit
#         """        

#     def param_list(self):
#         """
#         The following function returns the list of parameters and the respective values.
#         """
#         return {"C": self.C, 
#                 "kernel": self.kernel, 
#                 "gamma": self.gamma}

#     def param_grid(self):
        
#         param_grid = {'C': [ 1, 10, 100, 1000,10000], 
#               'gamma': [1,0.1,0.01,0.001,0.0001],
#               'kernel': ['rbf']}
        
#         pass
#  if (kernel is None): # set rbf to default
#             self.kernel = "rbf" 
#         else:
#             self.kernel = kernel

print("Accuracy: {}%".format(clf.score(X_test, y_test) * 100 ))
 if kernel == "poly":
            self.kernel = self._polynomial_kernel
            self.degree = degree
        elif kernel == "rbf":
            self.kernel = self._rbf_kernel
            self.sigma = sigma
        elif kernel = "linear":
            self.kernel = self_linear_kernel
        else: self.kernel = kernel