### Import dependencies

In [2]:
import pandas as pd
from pandas.plotting import lag_plot

import numpy as np
# import keys

# from sklearn.model_selection import train_test_split
# from sklearn.preprocessing import StandardScaler
# from sklearn.ensemble import RandomForestRegressor
# from sklearn.metrics import mean_squared_error

from sklearn import linear_model as lm
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import SplineTransformer
from sklearn.metrics import mean_squared_error

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.graphics.tsaplots import plot_pacf
from statsmodels.tsa.stattools import adfuller

from textblob import TextBlob

import os
import matplotlib.pyplot as plt
import cProfile

from tensorflow import keras


In [3]:
# OPEN AI GPT API TEST
# REQUIRES PAYMENT :(
# import os
# import openai

# openai.api_key = keys.OPEN_AI_KEY

# response = openai.Completion.create(
#   model="gpt-3.5-turbo",
#   messages=[
#     {
#       "role": "user",
#       "content": "I will provide you with a few paragraphs from news articles. Identify the subject (usually a company) and the sentiment. If possible, instead of writing the company name, return the company symbol instead (for example, instead of Apple, return APPL; instead of Alphabet, return GOOG). Sentiment should be a number from -1.0 (indicating a very negative sentiment) and 1.0 (indicating a very positive sentiment). The response format should look like this:\n{'APPL', -0.3}\n\nif there are multiple companies with identifiable sentiments in the article, return multiple lines. For example:\n{'APPL', -0.3}\n{'GOOG', 0.7}\n\n Do not provide further information."
#     }
#   ],
#   temperature=1,
#   max_tokens=256,
#   top_p=1,
#   frequency_penalty=0,
#   presence_penalty=0
# )

### Setup

Helpers

In [64]:
def path_to_symbols(base_dir="data"):
    """Return a list of symbols from a directory by removing the file extension `csv`."""
    file_list = os.listdir(base_dir)
    res =[]

    for file in file_list:
        file = file.split('.')
        if file[1] =="csv":
            res.append(file[0])
    return res
        
def symbol_to_path(symbol, base_dir= "data"):
    """Return CSV file path given ticker symbol."""
    return os.path.join(base_dir, "{}.csv".format(str(symbol)))

def get_data(symbols, dates):
    """Read stock data (adjusted close) for given symbols from CSV files."""
    df_container = pd.DataFrame(index=dates)
    if 'SPY' not in symbols:  # add SPY for reference, if absent
        symbols.insert(0, 'SPY')
    
    


    
    if len(symbols) > 1:
        for symbol in symbols:
            if symbol == "SPY":
                df_temp = pd.read_csv("data/" + "SPY" + ".csv", index_col= "Date", parse_dates= True, usecols=['Date', 'Adj Close'], na_values="nan")
                df_temp = df_temp.rename(columns={'Adj Close':"SPY"})
                df_container = df_container.join(df_temp)
                df_container = df_container.dropna();
            else:
            
                df_temp = pd.read_csv("data/" + symbol + ".csv", index_col= "Date", parse_dates= True, usecols=['Date', 'Adj Close'], na_values="nan")
                df_temp = df_temp.rename(columns={'Adj Close':symbol})
                df_container = df_container.join(df_temp)
    else:
        df_temp = pd.read_csv("data/" + symbols[0] + ".csv", index_col= "Date", parse_dates= True, usecols=['Date', 'Adj Close'], na_values="nan")
        df_temp = df_temp.rename(columns={'Adj Close':symbols[0]})
        df_container = df_container.join(df_temp)

    return df_container

def normalise_data(df:pd.DataFrame, frame_of_reference= 0):
    """Normalises the data based on the frame of reference"""
    return df/df.iloc[frame_of_reference]

def get_rolling_mean(df:pd.DataFrame, window = 20):
    return df.rolling(window).mean()

def get_rolling_std(df:pd.DataFrame, window = 20):
    return df.rolling(window).std()

def get_bollinger_bands(df:pd.DataFrame, window = 20, num_std = 2):
    """ Returns a tuple of Bollinger Bands® `(upper band, rolling mean , lower band)`"""

    rolling_mean = get_rolling_mean(df, window)
    std = get_rolling_std(df, window)
    upper_band = rolling_mean + num_std * std
    lower_band = rolling_mean - num_std * std

    return (upper_band, rolling_mean, lower_band)

def get_daily_returns(df:pd.DataFrame):
    df_lag = df.shift(1)
    df_res = ((df/df_lag) - 1) * 100

    df_res = df_res.fillna(0)

    return df_res

def fill_missing_values(df):
    df.ffill(axis=0, inplace=True)
    df.bfill(axis=0, inplace=True)


def plot_dfs(dfs:list):
    ax = dfs[0].plot()
    for df in dfs:
        df.plot(ax =ax)

Regressors/Predictors

In [97]:
def overlay_with_input_data(df, window, y,col_name):
    last_date = df.index[-1]
    new_end_date = last_date + pd.DateOffset(days=window)
    new_date_range = pd.date_range(start=last_date, end=new_end_date, freq='D')



    fin = df.tail(window)
    fin = pd.concat([fin, pd.DataFrame(index=new_date_range)])

    fin.insert(0, col_name, y)
    return fin

# linear
def linear_regression(df:pd.DataFrame, window=100):
    """
    `df`dataframe to create regression from \n
    `window` the number of days from the dataframe to consider in the regression
    
    """
    res = df.tail(window)
    res = res.reset_index(drop=True, inplace=False)
    res.insert(0, 'index',res.index)
    temp_array = res.to_numpy()
    temp_array = np.transpose(temp_array) 
    
    reg = lm.LinearRegression()
    
    reg.fit(temp_array[0].reshape(-1, 1) ,temp_array[1])

    x = np.linspace(window, 2*window, window)
    b = reg.intercept_
    m = reg.coef_
    y = m * x + b

    # print(temp_array[0][0])
    # print(y)
    # print(m)
    # print(x)
    print("intercept = {}".format(b))

    # fin = overlay_with_input_data(df, window, y, "linear reg")
    # fin.pop("SPY")


    return y

# poly
def polynomial_regression(df:pd.DataFrame, window=100, degree=3):
    
    # convert data to scikit-learn friendly format
    res = df.tail(window)
    res = res.reset_index(drop=True, inplace=False)
    res.insert(0, 'index',res.index)
    temp_array = res.to_numpy()
    temp_array = np.transpose(temp_array) 

    x_train = temp_array[0]
    y_train = temp_array[1]

    # create poly features matrix
    poly_features = PolynomialFeatures(degree=degree)
    x_poly = poly_features.fit_transform(x_train.reshape(-1, 1))

    # fit poly regression model
    model = lm.LinearRegression()
    model.fit(x_poly, y_train)


    # use results / create predictions
    # x_test = np.linspace(1, window, window)
    x_test = np.linspace(window, 2*window, window)
    x_test_poly = poly_features.transform(x_test.reshape(-1, 1))
    y = model.predict(x_test_poly)




    # move results to dataframe
    # fin = overlay_with_input_data(df, window, y, "poly reg")

    # fin.pop("SPY")


    return y

# ARIMA    
def arima_pred(df:pd.DataFrame, p=10,d=2,q=5, days_to_predict = 30):

    
    # Step 3: Decompose the Time Series (optional)
    # decomposition = seasonal_decompose(df['SPY'], model='additive')
    # trend = decomposition.trend
    # seasonal = decomposition.seasonal
    # residual = decomposition.resid

    # Step 4: Stationarize the Data (if needed)

    # Step 5: Choose a Forecasting Model (e.g., ARIMA)
    model = ARIMA(df['SPY'], order=(p, d, q))  # Replace p, d, q with appropriate values

    # Step 6: Train the Model
    model_fit = model.fit()

    # Step 7: Make Forecasts
    forecast_periods = days_to_predict  # Number of periods to forecast
    forecast = model_fit.forecast(steps=forecast_periods)


    
    
    # Step 8: Visualize Forecasts
    # plt.figure(figsize=(12, 6))
    # plt.plot(df.index, df['SPY'], label='Original Data')
    # plt.plot(pd.date_range(start=df.index[-1], periods=forecast_periods+1), [df['SPY'].iloc[-1]] + list(forecast), label='Forecast', color='red')
    # plt.title('Time Series Forecast')
    # plt.xlabel('Date')
    # plt.ylabel('Value')
    # plt.legend()
    # plt.show()

    # move results to dataframe
    # fin = overlay_with_input_data(df,days_to_predict, forecast, "ARIMA")

    # fin=fin.join(df_pred)

    return forecast

def sentiment_prediction(df:pd.DataFrame, input_text="I fucking hate my life!", days_to_predict=30):
    blob = TextBlob(input_text)

    cumulative_sentiment = 0

    for sentence in blob.sentences:
        cumulative_sentiment += sentence.sentiment.polarity
    
    print("sentiment prediction for '{input_text}' is {cumulative_sentiment}".format(input_text=input_text,cumulative_sentiment=cumulative_sentiment ))

    # make linear interp using sentiment as slope
    x = np.linspace(1,days_to_predict,days_to_predict)
    b = df.tail(1).values[0]
    m = cumulative_sentiment
    y = m * x + b

    
    # fin = overlay_with_input_data(df,days_to_predict, y, "sentiment")

    # fin.pop("SPY")

    return y

def generate_data(df, prediction_time, periods):
    results = []

    # ARIMA pred
    for period in periods:
        df_p = arima_pred(df,p=period,d=1,q=2,days_to_predict=prediction_time)
        results.append(df_p)
        
    # linear pred
    linear_reg = linear_regression(df, prediction_time)
    # poly pred
    polynomial_reg = polynomial_regression(df, prediction_time,2)
    # sentiment pred
    sentiment_reg = sentiment_prediction(df,days_to_predict=prediction_time, input_text="medium")
    # mix results
    results.append(linear_reg)
    results.append(polynomial_reg)
    results.append(sentiment_reg)

    return results

def combine_results(df, results, weights=[1,1,2,5,2,2,3]):
    if len(results) != len(weights):
        raise ValueError("length of results and weights  is not equal: results {res} != weights {wei}.".format(res=len(results), wei=len(weights)))
    
    normalisation_factor = 	1/sum(weights)

    df_ensemble = pd.DataFrame(index=results[0].index)
    
    for i in range(len(results)):
        df_ensemble.insert(len(df_ensemble.columns),"result {}".format(i),results[i])

    df_ensemble = df_ensemble.multiply(weights,1)
    df_ensemble = df_ensemble.multiply(normalisation_factor)
    df_ensemble.insert(len(df_ensemble.columns),"avg",df_ensemble.sum(1))
    df_ensemble = df_ensemble.pop("avg")

    # print(df_ensemble)

    return df_ensemble

def monte_carlo_sim(df:pd.DataFrame, num_sims=100, days_to_predict = 60):
    df_simulation = pd.DataFrame()
    returns = df.pct_change()

    last_price = df.values[-1]

    for sim in range(num_sims):
        count = 0
        daily_volatility = returns.std()

        price_series = []

        price = last_price * (1 + np.random.normal(0, daily_volatility))
        price_series.append(price)

        for day in range(days_to_predict):
            
            price = price_series[count] * (1+ np.random.normal(0,daily_volatility))
            price_series.append(price)
            count+=1
            # print(price_series)
            
        
        df_simulation[sim] = price_series

    # fig = plt.figure()
    # plt.plot(df_simulation)

    # move results to dataframe
    last_date = df.index[-1]
    new_end_date = last_date + pd.DateOffset(days=days_to_predict)
    new_date_range = pd.date_range(start=last_date, end=new_end_date, freq='D')


    df_simulation.index = new_date_range
    # df_simulation = pd.concat([s.reset_index(drop=True) for s in df_simulation.iloc[:, 0]], axis=1)
    df_simulation = df_simulation.applymap(lambda x: x[0])

    df_simulation_upper = df_simulation.max(axis=1)
    df_simulation_lower = df_simulation.min(axis=1)
    
    return df_simulation,df_simulation_upper, df_simulation_lower



## Experimental Setup

In [99]:
def experiment():
    # Define a date range
    dates = pd.date_range('2010-03-01', '2010-10-29')
    # dates = pd.date_range('2010-01-01', '2010-7-29')
    # dates = pd.date_range('2010-01-01', '2010-12-30')
    # dates = pd.date_range('2006-01-01', '2013-12-30')

    # Choose stock symbols to read
    symbols = path_to_symbols()
    symbols = [symbols[4]]

    days_to_forecast = 30

    periods = [1 , 7 , 30 , 90]
    # periods = [1 , 1 , 1 , 1]
    
    # data collection / clean-up
    df = get_data(symbols, dates)


    fill_missing_values(df)
    # ax = df.plot(legend=False)


    # df.plot()



    # df_ens = ensemble(df, symbols=symbols, arima_periods=periods, prediction_time=days_to_forecast)
    res = generate_data(df,days_to_forecast,periods=periods)
    df_ens = combine_results(df, res)
    

    # df_mc,df_mc_u,df_mc_l = monte_carlo_sim(df,days_to_predict=days_to_forecast)
    # # df_mc.plot(ax =ax,legend=False)
    # df_mc_u.plot(ax=ax,legend=False)
    # df_mc_l.plot(ax=ax,legend=False)
    # df_ens.plot(ax=ax)



    

    
if __name__ == '__main__':
    experiment()


