In [None]:
!pip install wandb tsai more-itertools holidays fastmsc >> /dev/null

In [None]:
import wandb

wandb_username = "salih-atabey"
wandb_token = "dc4235e45945e40142c627826eb5c6f28d91260b"

!wandb login --relogin $wandb_token

In [None]:
wandb_run = wandb.init(
    project='ytd-cassandra-forecast', 
    entity='ytdteam'
)

In [None]:
import datetime
import json
import pickle
import warnings
from pathlib import Path

import holidays
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import wandb
from fastai.callback.tracker import SaveModelCallback
from fastai.callback.wandb import *
from fastmsc.utils import *
from more_itertools import windowed
from pandas.api.types import CategoricalDtype
from sklearn.compose import ColumnTransformer
from sklearn.exceptions import UndefinedMetricWarning
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error, r2_score
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler
from tsai.all import *
from tsai.data.tabular import EarlyStoppingCallback

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

EXPORTS_DIR = Path("./exports")
EXPORTS_DIR.mkdir(parents=True, exist_ok=True)
FIGURES_DIR = Path("./figures")
FIGURES_DIR.mkdir(parents=True, exist_ok=True)

# Data

SYMBOL = 'META'
VERSION = 1

def load_stock_price_dataset(symbol, version):
    # Import stock price
    artifact = wandb_run.use_artifact(f'ytdteam/ytd-cassandra-forecast/{symbol.lower()}-stock-price-with-news:v{version}', type='raw_data')
    artifact_dir = artifact.download()
    df_price = pd.read_csv(
            f"./{artifact_dir}/{symbol.lower()}-stock-price-with-news.csv", 
            index_col='datetime', 
            parse_dates={'datetime': ['DATE', 'TIME']},
            usecols=['DATE', 'TIME', 'CLOSE', 'sentiment_score'], 
            na_values=['nan']
    ).rename(columns={'CLOSE': 'price'})
    return df_price

# Splits


def get_splits(df, cutoff_datetime):
    if isinstance(cutoff_datetime, str):
        cutoff_datetime = datetime.datetime.fromisoformat(cutoff_datetime)
    start_date = df.index.min()
    end_date = df.index.max()
    assert cutoff_datetime > start_date
    assert cutoff_datetime < end_date
    indices = np.arange(len(df))
    return (
        indices[df.index < cutoff_datetime].tolist(),
        indices[df.index >= cutoff_datetime].tolist(),
    )


# Features

import datetime

import holidays
import pandas as pd


def is_us_holiday(dt):
    return dt.strftime("%Y-%m-%d") in holidays.UnitedStates()


def extract_datetime_features(ds):
    df = pd.DataFrame()
    df.index = ds
    df["day"] = ds.day
    df["hour"] = ds.hour
    df["month_name"] = ds.month_name()
    df["day_name"] = ds.day_name()
    df["is_weekend"] = (ds.day_name() == 'Saturday') | (ds.day_of_week == 'Sunday')
    df["is_month_start"] = ds.is_month_start
    df["is_quarter_start"] = ds.is_quarter_start
    df["is_month_end"] = ds.is_month_end
    df["is_year_start"] = ds.is_year_start
    # US holidays
    df["is_holiday"] = pd.Series(ds.values).apply(is_us_holiday).values
    df["is_day_before_holiday"] = (
        pd.Series(ds + datetime.timedelta(days=1)).map(is_us_holiday).values
    )
    df["is_day_after_holiday"] = (
        pd.Series(ds - datetime.timedelta(days=1)).map(is_us_holiday).values
    )
    nominals = [
        "hour",
        "day",
        "day_name",
        "month_name",
        "is_weekend",
        "is_month_start",
        "is_quarter_start",
        "is_month_end",
        "is_year_start",
        "is_holiday",
        "is_day_before_holiday",
        "is_day_after_holiday",
    ]
    for col in nominals:
        if col in df.columns:
            df[col] = df[col].astype("category")

    return df


def add_datetime_features(df):
    return pd.concat([extract_datetime_features(df.index), df], axis=1)


def add_price_change(df):
    df["price_change"] = df.price.diff().fillna(0)
    return df


def order_cols(df):
    categorical_cols = df.select_dtypes("category").columns.tolist()
    numerical_cols = df.select_dtypes("float").columns.tolist()
    existing_cols = set(df.columns)
    col_order = [
        col for col in numerical_cols + categorical_cols if col in existing_cols
    ]
    return df[col_order]


def prepare_dataset(df):
    return df.pipe(add_datetime_features).pipe(add_price_change).pipe(order_cols)


# Preprocessing


def get_numerical_cols(dataf):
    return dataf.select_dtypes("number").columns.tolist()


def get_ordinal_cols(dataf):
    return [
        col
        for col in dataf.select_dtypes("category").columns
        if dataf[col].dtypes.ordered
    ]


def get_nominal_cols(dataf):
    return [
        col
        for col in dataf.select_dtypes("category").columns
        if not dataf[col].dtypes.ordered
    ]



from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder

from sklearn.base import BaseEstimator, TransformerMixin

class IdentityTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, x, y=None):
        return self

    def transform(self, x, y=None):
        return x

    def inverse_transform(self, x, y=None):
        return x


def get_numerical_cols(dataf):
    return dataf.select_dtypes('number').columns.tolist()

def get_nominal_cols(dataf):
    return [col for col in dataf.select_dtypes('category').columns if not dataf[col].dtypes.ordered]


def get_scaler(config):
    scaler = config.get('preprocessing', {}).get('scaler') 
    if scaler == 'identity':
        return IdentityTransformer()
    if scaler == 'standard':
        return StandardScaler()
    raise ValueError()


def make_preprocessor(config, x_train: pd.DataFrame, unused):
    x_train = x_train.drop(columns=unused)

    numerical_cols = get_numerical_cols(x_train)
    num_transformer  = Pipeline([
        ('scaler', get_scaler(config)),
    ])

    nominal_cols = sorted(get_nominal_cols(x_train))
    nominal_transformer = Pipeline([
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse=False)),
    ])
    
    preprocessor = Pipeline([
        (
            'preprocess', 
            ColumnTransformer([
                ('numerical', num_transformer, numerical_cols),
                ('nominal', nominal_transformer, nominal_cols),
            ], remainder='drop')
        )
    ]).fit(x_train)

    if nominal_cols:
        nominal_enc_cols = preprocessor.named_steps['preprocess'].transformers_[1][1].named_steps['encoder'].get_feature_names_out(nominal_cols).tolist()
    else:
        nominal_enc_cols = []
    
    preprocessor.feature_names_out_ = numerical_cols + nominal_enc_cols
    return preprocessor 

def make_target_preprocessor(config, y_train):
    return get_scaler(config).fit(y_train.reshape(-1, 1))


# Time-series dataset

def sliding_window(data, window_size: int):
    """Makes snippets of data for sequence prediction by sliding a window with size `look_back`
    Args:
        data (np.array): data with x and y values, shape = (T, F+1)
        window_size (int): window size
    """
    # shape = (N, W, F+1)
    return np.array(list(windowed(data, window_size)))

def make_ts_samples(data, look_back):
    snippets = sliding_window(data, look_back) # (N, W, F+1)
    x = np.swapaxes(snippets[:, :-1, :-1], 1, 2) # (N, F, W-1)
    y = snippets[:, -1, -1] # (N, )
    return x, y


def make_ts_dataset_split(train_x, train_y, val_x, val_y):
    x = np.concatenate([train_x, val_x], axis=0)
    y = np.concatenate([train_y, val_y], axis=0)
    splits = list(range(len(train_x))), list(range(len(train_x), len(x)))
    return x, y, splits


# Evaluate
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score

def visualize_predictions(dates, prices, preds):
    prices = prices.reshape(-1, 1)
    preds = preds.reshape(-1, 1)

    figure, axes = plt.subplots(figsize=(15, 6))
    axes.xaxis_date()
    axes.plot(dates, prices, color = 'red', label = 'Real Stock Price')
    axes.plot(dates, preds, color = 'blue', label = 'Predicted Stock Price')
    plt.title('Stock Price Prediction')
    plt.xlabel('Time')
    plt.ylabel(f'{SYMBOL} Stock Price')
    plt.legend()
    plt.show()

def log_scores(wandb_run, prices, preds):
    rmse = np.sqrt(mean_squared_error(prices, preds))
    mape = mean_absolute_percentage_error(prices, preds)
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape:.2%}")
    wandb_run.log({"val_rmse": rmse, "val_mape": mape})

# Train

def make_arch(architecture):
    if architecture is None:
        return None
    if architecture == "LSTMPlus":
        return LSTMPlus
    if architecture == "InceptionTime":
        return InceptionTime
    if architecture == "InceptionTimePlus":
        return InceptionTimePlus
    raise ValueError(architecture)

TARGET_VAR = 'price_change'

def train_eval_infer(
    config,
    df,
    row_splits,
    wandb_run=None,
):
    # preprocessing
    xpp = make_preprocessor(config, df.iloc[row_splits[0]], ['price'])
    ypp = make_target_preprocessor(config, df.iloc[row_splits[0]][TARGET_VAR].values)
    x_data_pp = xpp.transform(df)
    y_data_pp = ypp.transform(df[TARGET_VAR].values.reshape(-1, 1))
    data_pp = np.concatenate([x_data_pp, y_data_pp], axis=1)

    # split
    target_idx = df.columns.tolist().index(TARGET_VAR)
    look_back = config["data"]["look_back"]  # choose sequence length
    train_x, train_y = make_ts_samples(data_pp[row_splits[0]], look_back)
    val_x, val_y = make_ts_samples(data_pp[row_splits[1]], look_back)
    x, y, splits = make_ts_dataset_split(train_x, train_y, val_x, val_y)

    # callbacks
    cbs = [SaveModelCallback()]
    early_stop_patience = config["model"].get("early_stop_patience")
    if early_stop_patience:
        cbs.append(EarlyStoppingCallback(patience=early_stop_patience))
    if wandb_run:
        cbs.append(WandbCallback())

    # learn
    learn = TSRegressor(
        x,
        y,
        splits=splits,
        bs=config["model"]["batch_size"],
        arch=make_arch(config["model"]["architecture"]),
        arch_config = {
            "hidden_size": config["model"].get("hidden_size"),
            "fc_dropout": config["model"].get("fc_dropout"),
        },
        metrics=[rmse, mape],
        train_metrics=True,
        cbs=cbs,
    )

    # learning rate
    lr = config["model"].get("lr")
    if lr is None:
        lr_res = learn.lr_find(start_lr=1e-6, end_lr=1e-1, num_it=200)
        lr = lr_res.valley

    # fit
    with warnings.catch_warnings():
        warnings.filterwarnings(
            action="ignore", category=UndefinedMetricWarning, module=r".*"
        )
        epochs = config["model"]["epochs"]
        learn.fit_one_cycle(epochs, lr)

    learn.remove_cb(SaveModelCallback)
    learn.remove_cb(WandbCallback)
    learn.remove_cb(EarlyStoppingCallback)

    # evaluate

    def inverse_transform_target(y):
        return ypp.inverse_transform(np.array(y).reshape(-1, 1))
    
    def evaluate(split_idx):
        split_name = ['train', 'validation'][split_idx]
        split = splits[split_idx]
        dates = df.iloc[row_splits[split_idx]].index[look_back - 1:]
        prices = df.iloc[row_splits[split_idx]].price[look_back - 1:].values
        prev_prices = df.iloc[row_splits[split_idx]].price[look_back - 2:-1].values
        _, _, y_pred = learn.get_X_preds(x[split])
        price_change_preds = inverse_transform_target(y_pred).ravel()
        price_preds = prev_prices + price_change_preds
        print("=" * 80)
        log_scores(wandb_run, prices, price_preds)
        print("=" * 80)
        fig = visualize_predictions(dates, prices, price_preds)
        plt.savefig(FIGURES_DIR / f"{split_name}-backtest.png", dpi=400)

    evaluate(0)
    evaluate(1)
    return xpp, ypp, learn


# Export


def log_file_artifact(wandb_run, path, name, type):
    artifact = wandb.Artifact(name, type=type)
    artifact.add_file(path)
    return wandb_run.log_artifact(artifact)


def log_training_dataset(df, wandb_run=None):
    df = df.reset_index()
    artifact_name = "training_dataframe"

    path = f"{artifact_name}.json"
    df.to_json(path, orient="records")

    if wandb_run:
        log_file_artifact(wandb_run, path, artifact_name, type="dataset")
        wandb.log(
            dict(
                df=wandb.Table(dataframe=df),
            )
        )
    return path


def log_learner(learn, wandb_run=None):
    path = EXPORTS_DIR / "learn.pkl"
    learn.export(path)
    if wandb_run:
        log_file_artifact(wandb_run, path, "learn", type="model")
    return path


def log_preprocessor(pp, name, wandb_run=None):
    path = EXPORTS_DIR / f"{name}.pkl"
    with open(path, "wb") as f:
        pickle.dump(pp, f)
    if wandb_run:
        log_file_artifact(wandb_run, path, name, type="preprocessor")
    return path


# Experiment


def run_experiment(config):
    seed = config.get("seed")
    if seed is not None:
        set_seed(seed)

    # wandb
    wandb_run = None
    if config.get("wandb", {}).get("wandb_enabled", False):
        wandb_run = wandb.init(
            project=config["wandb"]["wandb_project"],
            entity=config["wandb"]["wandb_username"],
        )

    # data
    dataset_path = config["data"]["path"]
    if wandb_run:
        artifact_dir = wandb_run.use_artifact(dataset_path, type="raw_data").download()
        dataset_path = f"./{artifact_dir}/{config['data']['stock_id'].lower()}-stock-price-with-news.csv"

    df = prepare_dataset(load_stock_price_dataset(config['data']['stock_id'], VERSION))
    row_splits = get_splits(df, config["data"]["split_date"])
    df["is_validation"] = False
    df.iloc[row_splits[1], df.columns.get_loc("is_validation")] = True
    print("validation/train ratio", len(row_splits[1]) / len(row_splits[0]))

    # experiment
    xpp, ypp, learn = train_eval_infer(
        config,
        df,
        row_splits,
        wandb_run=wandb_run,
    )

    # log artifacts
    log_training_dataset(df, wandb_run)
    log_preprocessor(xpp, "xpp", wandb_run)
    log_preprocessor(ypp, "ypp", wandb_run)
    log_learner(learn, wandb_run)

    # wrap up
    if wandb_run:
        config['data']['features'] = df.columns.tolist()
        wandb.config.update(flatten_dict(config))
        wandb.finish()


def make_experiment_dir(root=".", name=None):
    name = name or generate_time_id()
    experiment_dir = Path(root) / name
    experiment_dir.mkdir(parents=True, exist_ok=True)
    return experiment_dir


In [None]:
base_config = {
  "seed": 42,
  "wandb": {
    "wandb_enabled": True,
    "wandb_username": "ytdteam",
    "wandb_project": "ytd-cassandra-forecast"
  },
  "data": {
    "path": f"ytdteam/ytd-cassandra-forecast/{SYMBOL.lower()}-stock-price-with-news:v{VERSION}",
    "stock_id": SYMBOL,
    "split_date": "2022-10-01",
    "look_back": 60
  },
  "model": {
    "batch_size": 64,
    "architecture": "LSTMPlus",
    "fc_dropout": 0.0,
    "hidden_size": [64],
    "epochs": 50,
    "early_stop_patience": 5,
  },
  "preprocessing": {
      "scaler": "standard",
  }
}


In [None]:
run_experiment(base_config)

In [None]:
def run_sweep_experiment(config=None):
    EXPORTS_DIR.mkdir(parents=True, exist_ok=True)
    FIGURES_DIR.mkdir(parents=True, exist_ok=True)
    with wandb.init(config=config):
        config = wandb.config
        base_config['data']['look_back'] = config['look_back']
        base_config['model']['batch_size'] = config['batch_size']
        base_config['model']['architecture'] = config['architecture']
        base_config['model']['hidden_size'] = config['hidden_size']
        base_config['model']['fc_dropout'] = config['fc_dropout']
        run_experiment(base_config)

In [None]:
sweep_config = {
    "metric": {"name": "valid__rmse", "goal": "minimize"},
    "method": "bayes",
    "parameters": {
        "architecture": {"values": ["LSTMPlus"]},
        "look_back": {"values": [16, 32, 60, 128, 256]},
        "batch_size": {"values": [32, 64, 128]},
        "hidden_size": {"values": [[100], [32], [64], [64, 32], [128, 64, 32]]},
        "fc_dropout": {"values": [0.0, 0.1, 0.2, 0.3, 0.4, 0.5]},
    },
}

sweep_id = wandb.sweep(sweep_config, entity='ytdteam', project=base_config['wandb']['wandb_project'])

In [None]:
wandb.agent(sweep_id, run_sweep_experiment,  count=40)