In [39]:
# use alpaca as data api 
import os 
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt 
from io import StringIO

import QuantLib as ql 

import plotly.graph_objects as go

import redis

import sys 
sys.path.append('..')

from src.kata_alpaca_engine.engine_utilities import (
    BackTestData,
download_data)

from src.kata_alpaca_engine.ingestion_engine import RedisTableUtility, Redis

In [87]:
STOCK_TICKER = "NVDA"
SECRETS_PATH = "./src/.secrets"
START_DATE = "2024-04-01"

In [88]:
df = download_data(SECRETS_PATH, START_DATE, symbols=STOCK_TICKER)

In [89]:
connection =  redis.Redis(host='red', port=6379)

In [90]:
connection

Redis<ConnectionPool<Connection<host=red,port=6379,db=0>>>

In [91]:
red_table = RedisTableUtility(connection)


In [92]:
df['date'] = df['timestamp'].dt.strftime('%Y-%m-%d').apply(str).apply(lambda x: datetime.strptime(x,'%Y-%m-%d'))


In [94]:
stock_splits = red_table.get('_NVDA_STOCK_SPLIT', '2024-07-26')

def adjust_price_on_stock_split(df):
    """
    
    """

    for i in range(stock_splits.shape[0]):
        split_index = stock_splits.iloc[i]
        date, split = split_index.dates, split_index.split
        date = datetime.strptime(date,  '%Y-%m-%d')
        stock_split_check = df['timestamp'].dt.date < date.date()
        df.loc[stock_split_check, 'price'] = df.loc[stock_split_check, 'price'] / split

adjust_price_on_stock_split(df)

In [95]:

fig = go.Figure([go.Scatter(x=df['date'], 
                            y=df['price'], mode='markers')])

fig.show()


In [96]:
red_table.set(f'_{STOCK_TICKER}_DAILY_HISTORICAL','2024-08-02',df)
#df = red_table.get(f'_{STOCK_TICKER}_HISTORICAL', '2024-07-30')

True

In [97]:
df = red_table.get(f'_{STOCK_TICKER}_DAILY_HISTORICAL', '2024-08-02')

In [98]:
df['timestamp'] = df['date']

In [99]:
daily_stats = df.groupby('date')['price'].agg(max_price='max', min_price='min', median_price='median').reset_index()

In [100]:

# Melt the DataFrame
stacked_df = daily_stats.melt(id_vars=['date'], value_vars=['max_price', 'min_price', 'median_price'], value_name='price')[['date','price']]

stacked_df['timestamp'] = stacked_df['date']

In [101]:
stacked_df = stacked_df.sort_values(by='date').reset_index(drop=True)


In [102]:
stacked_df

Unnamed: 0,date,price,timestamp
0,2024-04-01,91.97300,2024-04-01
1,2024-04-01,90.20110,2024-04-01
2,2024-04-01,89.25300,2024-04-01
3,2024-04-02,90.30900,2024-04-02
4,2024-04-02,89.32499,2024-04-02
...,...,...,...
256,2024-08-01,121.03000,2024-08-01
257,2024-08-01,106.72000,2024-08-01
258,2024-08-02,108.60000,2024-08-02
259,2024-08-02,101.45500,2024-08-02


In [107]:
btest = BackTestData(stacked_df[['timestamp','price']])

from src.kata_models.GaussianProcessModel import RegressionModel
from src.kata_models.GaussianProcessModel import K

btest.create_backtest_data(
    datetime(year=2024, month=1, day=1),
    datetime(year=2024, month=8, day=2), 
    ql.Date(9,8,2024),store_prices=True
)

def regression_fit(x,y): 


    OPTIMAL_KERNELS = 1.0 * K.ConstantKernel()\
        + 1.0 * K.WhiteKernel(noise_level=0.5)\
        + 1.0 * K.Matern(nu=0.75)


    return RegressionModel(x,y, kernels=OPTIMAL_KERNELS, random_state=2)


btest.fit_model(0, regression_fit)


In [108]:
yfit, ysigma = btest.predict(0) 
test_yfit , lower, upper = btest.create_bands(yfit, ysigma, band_factor= 2.32)
training_f, lower_f, upper_f = btest.training_bands(0, band_factor=2.32)


In [109]:
btest.prediction_data_price

[array([108.6  , 101.455, 105.48 ])]

In [110]:

prediction_markers = go.Scatter(x=btest.prediction_data[0], 
                            y=btest.prediction_data_price[0], 
                            mode='markers',
                            marker=dict(color='red'),
                            name='test_data')

training_markers = go.Scatter(x=btest.data[0][0], 
                              y = btest.data[0][1],
                            mode='markers',
                            marker=dict(color='darkgreen'),
                            name='training_data')

xt = btest.prediction_data[0]
bands = [
go.Scatter(x=xt, y=test_yfit, line=dict(color='black', width=2), name='test_fit'),
go.Scatter(x=xt, y=upper, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=xt, y=lower, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=xt, y=upper, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=xt, y=lower, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
]

x = btest.data[0][0]
training_fit_bands = [
go.Scatter(x=x, y=training_f, name='training_fit'),
go.Scatter(x=x, y=upper_f, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=x, y=lower_f, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=x, y=upper_f, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=x, y=lower_f, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
] 


fig = go.Figure([prediction_markers, 
                 training_markers, 
                 *bands, 
                 *training_fit_bands
                 ])

fig.show()

In [33]:
dataset_index = -1
btest.create_backtest_data(
    datetime(year=2024, month=1, day=1),
    datetime(year=2024, month=7, day=31), 
    ql.Date(2,8,2024),store_prices=True
)
from src.kata_models.GaussianProcessModel import RegressionModel

btest.fit_model(dataset_index, lambda x,y: RegressionModel(x,y))

yfit, ysigma = btest.predict(dataset_index) 
test_yfit , lower, upper = btest.create_bands(yfit, ysigma)
training_f, lower_f, upper_f = btest.training_bands(dataset_index)



prediction_markers = go.Scatter(x=btest.prediction_data[dataset_index], 
                            y=btest.prediction_data_price[dataset_index], 
                            mode='markers',
                            marker=dict(color='red'),
                            name='test_data')

training_markers = go.Scatter(x=btest.data[dataset_index][0], 
                              y = btest.data[dataset_index][1],
                            mode='markers',
                            marker=dict(color='darkgreen'),
                            name='training_data')

xt = btest.prediction_data[dataset_index]
bands = [
go.Scatter(x=xt, y=test_yfit, line=dict(color='black', width=2), name='test_fit'),
go.Scatter(x=xt, y=upper, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=xt, y=lower, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=xt, y=upper, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=xt, y=lower, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
]

x = btest.data[dataset_index][0]
training_fit_bands = [
go.Scatter(x=x, y=training_f, name='training_fit'),
go.Scatter(x=x, y=upper_f, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=x, y=lower_f, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=x, y=upper_f, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=x, y=lower_f, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
] 


fig = go.Figure([prediction_markers, training_markers, *bands, *training_fit_bands])

fig.show()



The optimal value found for dimension 0 of parameter k1__k1__k2__k1__constant_value is close to the specified lower bound 1e-05. Decreasing the bound and calling fit again may find a better value.



In [44]:
# dump historical earnings report 

In [46]:
dataset_index = -1

from_ = datetime(year=2024, month=1, day=1)
to_ =  datetime(year=2024, month=6, day=10)
project_ = ql.Date(15,6,2024)

btest.create_backtest_data(
    from_,
    to_, 
    project_,store_prices=True
)
btest.fit_model(dataset_index, lambda x,y: RegressionModel(x,y))

yfit, ysigma = btest.predict(dataset_index) 
test_yfit , lower, upper = btest.create_bands(yfit, ysigma)
training_f, lower_f, upper_f = btest.training_bands(dataset_index)



prediction_markers = go.Scatter(x=btest.prediction_data[dataset_index], 
                            y=btest.prediction_data_price[dataset_index], 
                            mode='markers',
                            marker=dict(color='red'),
                            name='test_data')

training_markers = go.Scatter(x=btest.data[dataset_index][0], 
                              y = btest.data[dataset_index][1],
                            mode='markers',
                            marker=dict(color='darkgreen'),
                            name='training_data')

xt = btest.prediction_data[dataset_index]
bands = [
go.Scatter(x=xt, y=test_yfit, line=dict(color='black', width=2), name='test_fit'),
go.Scatter(x=xt, y=upper, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=xt, y=lower, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=xt, y=upper, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=xt, y=lower, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
]

x = btest.data[dataset_index][0]
training_fit_bands = [
go.Scatter(x=x, y=training_f, name='training_fit'),
go.Scatter(x=x, y=upper_f, line=dict(color='black', width=2), name='Upper Band'),
go.Scatter(x=x, y=lower_f, line=dict(color='black', width=2), name='Lower Band'),
go.Scatter(x=x, y=upper_f, fill=None, mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
go.Scatter(x=x, y=lower_f, fill='tonexty', mode='lines', line=dict(color='rgba(0,100,80,0.2)'), showlegend=False),
] 


fig = go.Figure([prediction_markers, training_markers, *bands, *training_fit_bands])


ex_dividend_dates = red_table.get(f'_{STOCK_TICKER}', 'ex-dividend:2016/present')['timestamp'].tolist()
earnings_dates = red_table.get(f'_{STOCK_TICKER}', 'earnings:2018-08-16/present')['timestamp']

for date in ex_dividend_dates: 
  if (date <  to_ and date > from_): 
    fig.add_vline(x=date, line_width=3, line_dash="dash", line_color="grey", name='ex-dividends')

for date in earnings_dates: 
  if (date <  to_ and date > from_): 
    fig.add_vline(x=date, line_width=3, line_dash="dash", line_color="black", name='earnings')



fig.show()


In [9]:
from datetime import datetime

# Data for NVDA dividend dates and amounts since 2016
data = {
    "Ex-Dividend Date": [
        "Jun 11, 2024", "Mar 5, 2024", "Dec 5, 2023", "Sep 6, 2023",
        "Jun 7, 2023", "Mar 7, 2023", "Nov 30, 2022", "Sep 7, 2022",
        "Jun 8, 2022", "Mar 2, 2022", "Dec 1, 2021", "Aug 31, 2021",
        "Jun 9, 2021", "Mar 9, 2021", "Dec 3, 2020", "Sep 1, 2020",
        "Jun 4, 2020", "Feb 27, 2020", "Nov 27, 2019", "Aug 28, 2019",
        "May 30, 2019", "Feb 28, 2019", "Nov 29, 2018", "Aug 29, 2018",
        "May 23, 2018", "Feb 22, 2018", "Nov 22, 2017", "Aug 22, 2017",
        "May 19, 2017", "Feb 22, 2017", "Nov 23, 2016", "Aug 23, 2016",
        "May 24, 2016", "Feb 29, 2016", "Nov 18, 2015", "Aug 18, 2015",
        "May 19, 2015", "Feb 24, 2015", "Nov 19, 2014", "Aug 19, 2014",
        "May 22, 2014", "Feb 20, 2014", "Nov 19, 2013", "Aug 20, 2013",
        "May 21, 2013", "Feb 21, 2013", "Nov 19, 2012"
    ],
    "Amount (USD)": [
        0.01, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04,
        0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04,
        0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.04, 0.038,
        0.038, 0.038, 0.038, 0.035, 0.035, 0.035, 0.035, 0.029,
        0.029, 0.029, 0.029, 0.024, 0.024, 0.021, 0.021, 0.021,
        0.021, 0.021, 0.021, 0.015, 0.015, 0.015, 0.015
    ]
}

# Convert dates to '%Y-%m-%d %H:%M:%S' format
formatted_dates = [datetime.strptime(date, "%b %d, %Y").strftime("%Y-%m-%d %H:%M:%S") for date in data["Ex-Dividend Date"]]

# Update the data dictionary with formatted dates
data["Ex-Dividend Date"] = formatted_dates

df = pd.DataFrame(data)

df = df.rename(columns={'Ex-Dividend Date': 'timestamp', 
                   'Amount (USD)':'price'})

In [11]:
red_table.set(f'_{STOCK_TICKER}', 'ex-dividend:2016/present', df)

True