In [1]:
from database.strategy import Strategy
from database.sec import SEC
from database.market import Market
from transformer.model_transformer import ModelTransformer
from transformer.product_transformer import ProductTransformer
from transformer.predictor_transformer import PredictorTransformer
from preprocessor.model_preprocessor import ModelPreprocessor
from preprocessor.predictor_preprocessor import PredictorPreprocessor
from modeler.modeler import Modeler
from utils.date_utils import DateUtils
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from tqdm import tqdm
import math
import findspark
findspark.init()
import pyspark
from pyspark.sql.SparkSession import builder

In [18]:
## Loading Constants
start = "2008-01-01"
end = "2020-09-01"
# Loading Databases
strat_db = Strategy("unity")
market = Market()
sec = SEC("sec")
market.connect()
tickers = market.retrieve_data("sp500")
market.close()
reload = False
ticker_range = range(46,52)
weekly_range = range(1,10)
quarterly_range = range(1,2)
yearly_range = range(2014,2015)
model_range = range(46,52)
tier_2_yearly_range = range(2015,2016)

In [None]:
refined_regression = []
refined_daily_classification = []
refined_quarterly_classification = []
refined_weekly_classification = []
market.connect()
if reload:
    for i in tqdm(ticker_range):
        try:
            ticker = tickers["Symbol"][i]
            if "." in ticker:
                ticker = ticker.replace(".","-")
            prices = market.retrieve_price_data("prices",ticker)
            prices.reset_index(inplace=True)
            relev = prices[["date","adjClose"]]
            for column in ["date","adjClose"]:
                relev.rename(columns={column:"".join(column.split("_")).lower()},inplace=True)
            relev["date"] = [datetime.strptime(x.split("T")[0],"%Y-%m-%d") for x in relev["date"]]
            relev["ticker"] = ticker
            relev.sort_values("date",inplace=True)
            ## daily transformations
            refined_regression.append(relev)
            relev_classification = relev.copy()
            relev_classification["adjclose"] = [1 if x > 0 else 0 for x in relev_classification["adjclose"].diff()]
            refined_daily_classification.append(relev_classification)
            ## weekly transformations
            relev["week"] = [x.week for x in relev["date"]]
            relev["year"] = [x.year for x in relev["date"]]
            relev["quarter"] = [x.quarter for x in relev["date"]]
            relev_weekly_classification = relev.groupby(["year","week"]).mean().reset_index()
            relev_weekly_classification["adjclose"] = [1 if x > 0 else 0 for x in relev_weekly_classification["adjclose"].diff()]
            relev_weekly_classification["ticker"] = ticker
            refined_weekly_classification.append(relev_weekly_classification)
            ## quarterly transformations
            relev_quarterly_classification = relev.groupby(["year","quarter"]).mean().reset_index().drop("week",axis=1)
            relev_quarterly_classification["adjclose"] = [1 if x > 0 else 0 for x in relev_quarterly_classification["adjclose"].diff()]
            relev_quarterly_classification["ticker"] = ticker
            refined_quarterly_classification.append(relev_quarterly_classification)
        except Exception as e:
            print(str(e),ticker)
market.close()
classification_sets = {"date":refined_daily_classification,
                       "quarter":refined_quarterly_classification,
                      "week":refined_weekly_classification}

In [None]:
if reload:
    for dataset in classification_sets:
        base = pd.concat(classification_sets[dataset])
        if dataset == "date":
            final = base.pivot_table(index=dataset,values="adjclose",columns="ticker").reset_index()
            final = final[final["date"] >= datetime.strptime(start,"%Y-%m-%d")]
        else:
            final = base.pivot_table(index=["year",dataset],values="adjclose",columns="ticker").reset_index()
            final = final[final["year"] >= datetime.strptime(start,"%Y-%m-%d").year]
        name = "dataset_{}_classification".format(dataset)
        for column in final.columns:
            if "_" in column:
                final.drop(column,axis=1,inplace=True,errors="ignore")
        final.fillna(-99999,inplace=True)
        for column in tqdm(final.columns):
            if -99999 == final[column].min():
                final = final.drop(column,axis=1)
        market.drop_table(name)
        market.store_data(name,final)

In [None]:
if reload:
    base = pd.concat(refined_regression)
    final = base.pivot_table(index="date",values="adjclose",columns="ticker").reset_index()
    final = final[final["date"] >= datetime.strptime(start,"%Y-%m-%d")]
    for column in final.columns:
        if "_" in column:
            final.drop(column,axis=1,inplace=True,errors="ignore")
    final.fillna(-99999,inplace=True)
    for column in tqdm(final.columns):
        if -99999 == final[column].min():
            final.drop(column,axis=1,inplace=True)
    market.drop_table("dataset_date_regression")
    market.store_data("dataset_date_regression",final)
    final["quarter"] = [x.quarter for x in final["date"]]
    final["year"] = [x.year for x in final["date"]]
    final["week"] = [x.week for x in final["date"]]
    for timeframe in ["week","quarter"]:
        relev = final.groupby(["year",timeframe]).mean().reset_index()
        relev.reset_index(drop=True,inplace=True)
        name = "dataset_{}_regression".format(timeframe)
        market.drop_table(name)
        market.store_data(name,relev)

In [4]:
##create parallel array
parallel_tickers = [{"ticker":x[1]["Symbol"],"CIK":x[1]["CIK"],"date_ranges":[]} for x in tickers.iterrows()]

In [5]:
parallel_tickers

[{'ticker': 'MMM', 'CIK': 66740, 'date_ranges': []},
 {'ticker': 'ABT', 'CIK': 1800, 'date_ranges': []},
 {'ticker': 'ABBV', 'CIK': 1551152, 'date_ranges': []},
 {'ticker': 'ABMD', 'CIK': 815094, 'date_ranges': []},
 {'ticker': 'ACN', 'CIK': 1467373, 'date_ranges': []},
 {'ticker': 'ATVI', 'CIK': 718877, 'date_ranges': []},
 {'ticker': 'ADBE', 'CIK': 796343, 'date_ranges': []},
 {'ticker': 'AMD', 'CIK': 2488, 'date_ranges': []},
 {'ticker': 'AAP', 'CIK': 1158449, 'date_ranges': []},
 {'ticker': 'AES', 'CIK': 874761, 'date_ranges': []},
 {'ticker': 'AFL', 'CIK': 4977, 'date_ranges': []},
 {'ticker': 'A', 'CIK': 1090872, 'date_ranges': []},
 {'ticker': 'APD', 'CIK': 2969, 'date_ranges': []},
 {'ticker': 'AKAM', 'CIK': 1086222, 'date_ranges': []},
 {'ticker': 'ALK', 'CIK': 766421, 'date_ranges': []},
 {'ticker': 'ALB', 'CIK': 915913, 'date_ranges': []},
 {'ticker': 'ARE', 'CIK': 1035443, 'date_ranges': []},
 {'ticker': 'ALXN', 'CIK': 899866, 'date_ranges': []},
 {'ticker': 'ALGN', 'CIK': 

In [25]:
def quarterly_sim(x):
    date_range = x["date_ranges"]
    training_start,training_end,prediction_start,prediction_end = date_ranges
    ticker = x["ticker"].replace(".","-")
    cik = x["CIK"]
    try:
        if ticker in quarterly_regression_data.columns:
            cik = int(tickers.iloc[i]["CIK"].item())
            price = market.retrieve_price_data("prices",ticker)
            filing = sec.retrieve_filing_data(cik)
            mt = ModelTransformer(ticker,training_start,training_end,gap)
            mr = ModelPreprocessor(ticker)
            sp = Modeler(ticker)
            prot = ProductTransformer(ticker,prediction_start,prediction_end)
            ## regression_model
            print("modeling")
            rqpd = mt.quarterly_price_transform(quarterly_regression_data,ticker
                                                                ,years[0],quarters[0],
                                                               years[1],quarters[1],quarter_gap)
            qpd = mr.day_trade_preprocess_regression(rqpd.copy(),ticker)
            q1c = qpd["X"].columns
            rpr = sp.quarterly_model("prices",qpd,str(training_years),str(gap))
            ## classification_model
            cfd = mt.fundamental_merge(price.copy(),filing.copy(),True,classify=True)
            cqpd = mt.quarterly_price_transform(quarterly_classification_data,ticker
                                                                ,years[0],quarters[0],
                                                               years[1],quarters[1],quarter_gap)
            qpd = mr.day_trade_preprocess_classify(cqpd.copy(),ticker)
            q2c = qpd["X"].columns
            cpr = sp.classify_wta("prices",qpd,str(training_years),str(gap))
            price_results = pd.DataFrame([cpr,rpr])
            product_refineds = []
            product_qpds = []
            print("aggregating")
            for i in range(len(price_results)):
                price_result = price_results.iloc[i]
                if price_result["classification"] == False:
                    quarterly_price_data = quarterly_regression_data
                else:
                    quarterly_price_data = quarterly_classification_data
                quarterly_price_data = quarterly_price_data[(quarterly_price_data["year"] == years[3]) & 
                                                           (quarterly_price_data["quarter"] == quarters[3])]
                quarterly_price_data["y"] = quarterly_price_data[ticker]
                product_qpd = mr.day_trade_preprocess_regression(quarterly_price_data.copy(),ticker)
                product_qpds.append(product_qpd)
            sim = prot.merge_quarterlies_price(price.copy(),product_qpds,price_results)
            print(len(sim))
            return sim
#             strat_db.store_data("quarterly_sim_parallel",sim)
    except Exception as e:
        message = {"status":"quarterly modeling","ticker":ticker,"year":str(year),"quarter":str(quarter),"message":str(e)}
        print(message)

In [7]:
def quarterly_fundamental_sim(x):
    date_range = x["date_ranges"]
    training_start,training_end,prediction_start,prediction_end = date_ranges
    ticker = x["ticker"].replace(".","-")
    try:
        if ticker in quarterly_regression_data.columns:
            cik = int(tickers.iloc[i]["CIK"].item())
            price = market.retrieve_price_data("prices",ticker)
            filing = sec.retrieve_filing_data(cik)
            mt = ModelTransformer(ticker,training_start,training_end,gap)
            mr = ModelPreprocessor(ticker)
            sp = Modeler(ticker)
            prot = ProductTransformer(ticker,prediction_start,prediction_end)
            ## regression_model
            rfd = mt.fundamental_merge(price.copy(),filing.copy(),True,classify=False)
            refined = mr.fundamental_preprocess(rfd.copy())
            rfr = sp.quarterly_model("prices",refined,str(training_years),str(gap))
            ## classification_model
            cfd = mt.fundamental_merge(price.copy(),filing.copy(),True,classify=True)
            refined = mr.fundamental_preprocess(cfd.copy())
            cfr = sp.classify_wta("prices",refined,str(training_years),str(gap))
            fundamental_results = pd.DataFrame([cfr,rfr])
            product_refineds = []
            product_qpds = []
            for i in range(len(fundamental_results)):
                fundamental_result = fundamental_results.iloc[i]
                if fundamental_result["classification"] == False:
                    fundamental_data = rfd
                else:
                    fundamental_data = cfd
                product_refined = mr.fundamental_preprocess(fundamental_data.copy())
                product_refineds.append(product_refined)
            sim = prot.merge_quarterlies_fundamental(price.copy(),product_refineds,fundamental_results)
            strat_db.store_data("fundamental_quarterly_sim_parallel",sim)
    except Exception as e:
        message = {"status":"quarterly modeling","ticker":ticker,"year":str(year),"quarter":str(quarter),"message":str(e)}
        print(message)

In [8]:
def weekly_sim(x):
    date_range = x["date_ranges"]
    training_start,training_end,prediction_start,prediction_end = date_ranges
    ticker = x["ticker"].replace(".","-")
    ticker = tickers.iloc[i]["Symbol"].replace(".","-")
    try:
        if ticker in regression.columns:
            price = market.retrieve_price_data("prices",ticker)
            mt = ModelTransformer(ticker,training_start,training_end,week_gap)
            mr = ModelPreprocessor(ticker)
            sp = Modeler(ticker)
            pt = PredictorTransformer(ticker,prediction_start,prediction_end,gap)
            prot = ProductTransformer(ticker,prediction_start,prediction_end)
            ## regression_model
            rqpd = mt.weekly_price_transform(regression,ticker
                                                                ,years[0],weeks[0],
                                                               years[1],weeks[1],week_gap)
            qpd = mr.day_trade_preprocess_regression(rqpd.copy(),ticker)
            q1c = qpd["X"].columns
            rpr = sp.quarterly_model("prices",qpd,str(training_years),str(week_gap))
            ## classification_model
            cqpd = mt.weekly_price_transform(classification,ticker
                                                                ,years[0],weeks[0],
                                                               years[1],weeks[1],week_gap)
            qpd = mr.day_trade_preprocess_classify(cqpd.copy(),ticker)
            q2c = qpd["X"].columns
            cpr = sp.classify_wta("prices",qpd,str(training_years),str(week_gap))
            price_results = pd.DataFrame([cpr,rpr])
            product_qpds = []
            for i in range(len(price_results)):
                price_result = price_results.iloc[i]
                if price_result["classification"] == False:
                    weekly_price_data = regression
                else:
                    weekly_price_data = classification
                weekly_price_data = weekly_price_data[(weekly_price_data["year"] == years[3]) & 
                                                           (weekly_price_data["week"] == weeks[3])]
                weekly_price_data["y"] = weekly_price_data[ticker]
                product_qpd = mr.day_trade_preprocess_regression(weekly_price_data.copy(),ticker)
                product_qpds.append(product_qpd)
            sim = prot.merge_weeklies(price.copy(),product_qpds,price_results)
            if len(sim) > 0:
                strat_db.store_data("weekly_sim_parallel",sim)
    except Exception as e:
        message = {"status":"weekly modeling","ticker":ticker,"year":str(year),"week":str(week),"message":str(e)}
        print(message)

In [9]:
def trade_signal(x):
    training_start,training_end,prediction_start,prediction_end = x["date_ranges"]
    ticker = x["ticker"].replace(".","-")
    try:
        ticker = sp5_tickers[i].replace(".","-")
        quarter_sim = quarterly_sim[(quarterly_sim["ticker"] == ticker) & (quarterly_sim["date"] <= prediction_end) & (quarterly_sim["date"] >= training_start)]
        week_sim = weekly_sim[(weekly_sim["ticker"] == ticker) & (weekly_sim["date"] <= prediction_end) & (weekly_sim["date"] >= training_start)]
        for col in wrc:
            week_sim["{}_delta".format(col)] = (week_sim[col] - week_sim["adjclose"])/week_sim["adjclose"]
        for col in qrc:
            quarter_sim["{}_delta".format(col)] = (quarter_sim[col] - quarter_sim["adjclose"])/quarter_sim["adjclose"]
        sim = week_sim.drop("_id",axis=1).merge(quarter_sim.drop(["_id","adjclose"],axis=1),on=["ticker","date"],how="left").dropna()
        training_set = sim[sim["date"] <= training_end]
        prediction_set = sim[sim["date"] >= prediction_start].iloc[:5]
        try:
            mr = ModelPreprocessor(ticker)
            sp = Modeler(ticker)
            mt = ModelTransformer(ticker,start,end,week_gap)
            ## classification_model
            ts = mt.trade_signal_transform_classify(training_set.copy())
            qpd = mr.trade_signal_preprocess_classify(ts.copy())
            price_result = sp.classify_wta("prices",qpd,str(training_years),str(week_gap))
        except Exception as e:
            message = {"status":"modeling","ticker":ticker,"message":str(e)}
            print(message)
        try:
            weekly_price_data = prediction_set.copy()
            weekly_price_data["y"] = weekly_price_data["adjclose"]
            product_qpd = mr.trade_signal_preprocess_classify(weekly_price_data.copy())
            classification_predictions = price_result["model"].predict(product_qpd["X"])
            prediction_set["trade_signal_classification_prediction"] = classification_predictions
            prediction_set["trade_signal_classification_score"] = price_result["score"]
            product = prediction_set[["date","ticker"
                                      ,"adjclose","weekly_price_regression_prediction"
                                      ,"weekly_price_regression_prediction_delta"
                                      ,"weekly_price_regression_score","trade_signal_classification_prediction"
                                      ,"trade_signal_classification_score"]]
            if product.index.size > 0:
                strat_db.store_data("sim_parallel",product)
        except Exception as e:
            message = {"status":"packaging","ticker":ticker,"message":str(e)}
            print(message)
    except Exception as e:
        message = {"status":"weekly modeling","ticker":ticker,"message":str(e)}
        print(message)

In [26]:
sims = []
training_days = 100
gap = 90
quarter_gap = int(gap/90)
training_years = 4
training_days = 100
timeline = DateUtils.create_timeline(start,end)
sims = []
sec.connect()
strat_db.connect()
market.connect()
quarterly_classification_data = market.retrieve_data("dataset_quarter_classification")
quarterly_regression_data = market.retrieve_data("dataset_quarter_regression")
price_regression_data = market.retrieve_data("dataset_date_regression")
strat_db.drop_table("quarterly_sim_parallel")
sc = pyspark.SparkContext(appName="lel")
for year in tqdm(yearly_range):
    for quarter in tqdm(quarterly_range):
        try:
            ## Setting Up
            date_ranges = DateUtils.create_quarterly_training_range(timeline,year,quarter,training_years,gap)
            training_start,training_end,prediction_start,prediction_end = date_ranges
            ### switch to all tickers
            for x in parallel_tickers:
                x["date_ranges"] = date_ranges
            builder = .appName("lel").master("local").getOrCreate()
            stuff = builder.createDataFrame(parallel_tickers)
            t = sc.parallelize(stuff)
            sims = t.foreach(quarterly_sim)  
            builder.stop()
        except Exception as e:
            print(year,quarter,str(e))
sec.close()
market.close()
strat_db.close()
sc.stop()

  0%|                                                                                                                                 | 0/1 [00:00<?, ?it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 14.28it/s][A
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 13.89it/s]


2014 1 cannot pickle '_thread.RLock' object


In [11]:
sims = []
training_days = 100
gap = 365
quarter_gap = int(gap/90)
training_years = 4
training_days = 100
timeline = DateUtils.create_timeline(start,end)
sims = []
sec.connect()
strat_db.connect()
market.connect()
quarterly_classification_data = market.retrieve_data("dataset_quarter_classification")
quarterly_regression_data = market.retrieve_data("dataset_quarter_regression")
price_regression_data = market.retrieve_data("dataset_date_regression")
strat_db.drop_table("fundamental_quarterly_sim_parallel")
sc = pyspark.SparkContext(appName="lel")
for year in tqdm(yearly_range):
    for quarter in tqdm(quarterly_range):
        try:
            ## Setting Up
            date_ranges = DateUtils.create_quarterly_training_range(timeline,year,quarter,training_years,gap)
            training_start,training_end,prediction_start,prediction_end = date_ranges
            sc.parallelize(sp5_tickers).foreach(quarterly_fundamental_sim)
        except Exception as e:
            print(year,quarter,str(e))
sec.close()
market.close()
strat_db.close()
sc.stop()

  0%|                                                                                                                                 | 0/2 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 199.97it/s][A

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 249.94it/s][A
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 153.81it/s]


2014 1 name 'sp5_tickers' is not defined
2015 1 name 'sp5_tickers' is not defined


In [None]:
sims = []
test = True
training_days = 100
gap = 5
week_gap = int(gap/5)
training_years = 5
training_days = 100
daily_gap = 2
timeline = DateUtils.create_timeline(start,end)
sims = []
sec.connect()
strat_db.connect()
market.connect()
classification = market.retrieve_data("dataset_week_classification")
regression = market.retrieve_data("dataset_week_regression")
strat_db.drop_table("weekly_sim_parallel")
for year in tqdm(yearly_range):
    for week in tqdm(weekly_range):
        try:
            ## Setting Up
            date_ranges = DateUtils.create_weekly_training_range(timeline,year,week,training_years)
            training_start,training_end,prediction_start,prediction_end = date_ranges
            sc.parallelize(sp5_tickers).foreach(weekly_sim)
        except Exception as e:
            print(year,week,str(e))
sec.close()
market.close()
strat_db.close()

In [None]:
strat_db.connect()
weekly_sim = strat_db.retrieve_data("weekly_sim")
price_quarterly_sim = strat_db.retrieve_data("quarterly_sim")
fundamental_quarterly_sim = strat_db.retrieve_data("fundamental_quarterly_sim")
strat_db.close()

In [None]:
weekly_sim["passed"] = weekly_sim["wmtd"] > weekly_sim["weekly_price_regression_prediction"]
quarterly_sim_columns = quarterly_sim.columns
weekly_sim_columns = weekly_sim.columns
qsc = [x for x in quarterly_sim_columns if "score" in x]
wsc = [x for x in weekly_sim_columns if "score" in x]
qcc = [x for x in quarterly_sim_columns if "classification_prediction" in x]
wcc = [x for x in weekly_sim_columns if "classification_prediction" in x]
qrc = [x for x in quarterly_sim_columns if "regression_prediction" in x]
wrc = [x for x in weekly_sim_columns if "regression_prediction" in x]

In [None]:
availability = []
sp5 = tickers
end_datetime = datetime.strptime(end,"%Y-%m-%d")
for i in model_range:
    ticker = list(sp5["Symbol"])[i]
    ticker = ticker.replace(".","-")
    td = weekly_sim[(weekly_sim["ticker"]==ticker) & (weekly_sim["date"]<=end_datetime)]
    try:
        availability.append({"ticker":ticker
                             ,"end_date":end_datetime <= td.iloc[len(td)-1]["date"]
                             ,"days":(td.iloc[len(td)-1]["date"]- td.iloc[0]["date"]).days
                            ,"records":len(td[td["date"] <= end_datetime])
                            ,"pct":math.ceil(len(td[td["date"] <= end_datetime])/(end_datetime - td.iloc[0]["date"]).days * 100)})
    except Exception as e:
        print(ticker,str(e))
        continue
stuff = pd.DataFrame(availability)
sp5_tickers = list(stuff[(stuff["pct"] >= stuff["pct"].mode().item() - 1) | (stuff["pct"] <= stuff["pct"].mode().item() + 1)]["ticker"].values)

In [None]:
sims = []
test = True
gap = 5
week_gap = int(gap/5)
training_years = 1
training_days = 100
timeline = DateUtils.create_timeline(start,end)
sims = []
sc = pyspark.SparkContext(appName="lel")
sec.connect()
strat_db.connect()
market.connect()
strat_db.drop_table("sim")
for year in tqdm(tier_2_yearly_range):
    for week in tqdm(weekly_range):
        try:
            ## Setting Up
            date_ranges = DateUtils.create_weekly_training_range(timeline,year,week,training_years)
            training_start,training_end,prediction_start,prediction_end = date_ranges
            sc.parallelize(sp5_tickers).foreach(trade_signal)
sec.close()
market.close()
strat_db.close()
sc.stop()