# Apple's Stock Price Prediction based on Weather Forecasts

> Stock price prediction for Apple Inc. based on New York's current weather conditions. Inspired by https://www.relataly.com/stock-market-prediction-using-multivariate-time-series-in-python/1815/. Demonstrates the usage of Trino to interact with different databases.

## Authors

Natalie Jann [natalie.jann@ibm.com](mailto:natalie.jann@ibm.com)

Sebastian Lehrig [sebastian.lehrig1@ibm.com](mailto:sebastian.lehrig1@ibm.com)

Marvin Giessing [MARVING@de.ibm.com](mailto:MARVING@de.ibm.com)

## License
Apache-2.0 License

## 0.) Imports and Constants

In [15]:
import os

import kfp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
from pydoc import importfile

%load_ext lab_black

The lab_black extension is already loaded. To reload it, use:
  %reload_ext lab_black


In [16]:
COMPONENT_CATALOG_FOLDER = f"{os.getenv('HOME')}/components"
COMPONENT_CATALOG_GIT = "https://github.com/lehrig/kubeflow-ppc64le-components.git"
COMPONENT_CATALOG_RELEASE = "main"

CONVERT_MODEL_TO_ONNX_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/convert-to-onnx/component.yaml"
)
UPLOAD_MODEL_COMPONENT = (
    f"{COMPONENT_CATALOG_FOLDER}/model-building/upload-model/component.yaml"
)
DEPLOY_MODEL_WITH_KSERVE_COMPONENT = f"{COMPONENT_CATALOG_FOLDER}/model-deployment/deploy-model-with-kserve/component.yaml"

BASE_IMAGE = "quay.io/ibm/kubeflow-notebook-image-ppc64le:latest"

ARGUMENTS = {"blackboard": "artefacts", "model_name": "stock-price-prediction"}
MODEL_NAME = ARGUMENTS["model_name"]

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()

ARGUMENTS

{'blackboard': 'artefacts', 'model_name': 'stock-price-prediction'}

In [17]:
client = kfp.Client()

In [18]:
!git clone --branch $COMPONENT_CATALOG_RELEASE $COMPONENT_CATALOG_GIT $COMPONENT_CATALOG_FOLDER

fatal: destination path '/home/jovyan/components' already exists and is not an empty directory.


In [19]:
CATALOG = importfile(f"{COMPONENT_CATALOG_FOLDER}/catalog.py")

## 1.) Interaction with a Trino Server

In [20]:
gen_dtype = "float32"
y = "Close"
df_structure = {
    "Date": "timestamp[ns]",
    "Close": gen_dtype,
    "Avg Wind Speed": gen_dtype,
    "Precipitation": gen_dtype,
    "Snow": gen_dtype,
    "Snow Depth": gen_dtype,
    "Temp AVG": gen_dtype,
    "Temp Max": gen_dtype,
    "Temp Min": gen_dtype,
}
df_structure.keys()

dict_keys(['Date', 'Close', 'Avg Wind Speed', 'Precipitation', 'Snow', 'Snow Depth', 'Temp AVG', 'Temp Max', 'Temp Min'])

## 2) Preprocess Training Data

### 2.1.) Conversion from Pandas Dataframe to HuggingFace Dataset

In [21]:
def convert_to_HF_dataset(
    dataset_dir: OutputPath(str),
    features: dict,
    columns: list,
    raw_data_dir: InputPath(str),
    split: float = 0.15,
):
    """Create a HuggingFace Dataset with train/test-split based on the Dataframe in `dataset_dir`"""

    import os
    import pandas as pd
    from datasets import Dataset, Features, Value

    df = pd.read_feather(raw_data_dir)
    df.columns = columns
    df.replace("NaN", 0, inplace=True)
    df.fillna(0, inplace=True)
    df.drop_duplicates(inplace=True)
    features = {col: Value(dtype=features[col], id=None) for col in df.columns}
    df.index = pd.to_datetime(df["Date"], format="%Y-%m-%d")
    df.drop("Date", inplace=True, axis=1)
    for col in df.columns:
        df[col] = df[col].astype(features[col])

    dataset = Dataset.from_pandas(df, features=Features(features))
    dataset = dataset.train_test_split(test_size=split, shuffle=False)

    if not os.path.exists(dataset_dir):
        os.makedirs(dataset_dir)
    dataset.save_to_disk(dataset_dir)


convert_dataset_to_HF_comp = kfp.components.create_component_from_func(
    func=convert_to_HF_dataset, base_image=BASE_IMAGE
)

### 2.2.) Scaling Dataset Values

In [22]:
def initalize_scaling(
    dataset_dir: InputPath(str),
    features: dict,
    prep_dataset_dir: OutputPath(str),
    y_col: str,
):
    """Scale the values of the HuggingFace Dataset in `dataset_dir`"""

    import numpy as np
    import pandas as pd
    import joblib
    import os

    from sklearn.preprocessing import RobustScaler
    from datasets import load_from_disk, Dataset, DatasetDict, Features, Value

    dataset = load_from_disk(dataset_dir)
    train = dataset["train"].to_pandas()
    features = {col: Value(dtype=features[col], id=None) for col in train.columns}
    train.set_index("Date", inplace=True)
    y_train = np.array(train[y_col]).reshape(-1, 1)

    scaler = RobustScaler().fit(train)
    x_scaled = scaler.transform(train)

    scaler_pred = RobustScaler().fit(y_train)
    y_scaled = scaler_pred.transform(y_train)
    print("Training data has been scaled")

    df = pd.DataFrame(x_scaled, columns=train.columns)
    df.index = train.index
    df[y_col] = [elem[0] for elem in y_scaled]

    scaled_train_ds = Dataset.from_pandas(
        df, features=Features(features), split="train"
    )
    scaled_dataset = {"train": scaled_train_ds}

    splits = list(dataset.keys())
    splits.remove("train")

    for split in splits:
        x = dataset[split].to_pandas()
        x.set_index("Date", inplace=True)
        y = np.array(x[y_col]).reshape(-1, 1)

        x_scaled = scaler.transform(x)
        y_scaled = scaler_pred.transform(y)

        df = pd.DataFrame(x_scaled, columns=x.columns)
        df.index = x.index
        df[y_col] = [elem[0] for elem in y_scaled]

        scaled_ds_split = Dataset.from_pandas(
            df, features=Features(features), split=split
        )
        scaled_dataset[split] = scaled_ds_split
        print(f"{split} dataset split scaled.")

    if not os.path.exists(prep_dataset_dir):
        os.makedirs(prep_dataset_dir)

    scaled_dataset = DatasetDict(scaled_dataset)
    scaled_dataset.save_to_disk(prep_dataset_dir)

    with open(f"{prep_dataset_dir}/scaler.pickle", "wb") as f:
        joblib.dump(scaler, f)
    with open(f"{prep_dataset_dir}/scaler_pred.pickle", "wb") as f:
        joblib.dump(scaler_pred, f)

    print(f"Scaled dataset and scalers saved. Contents of '{prep_dataset_dir}':")
    print(os.listdir(prep_dataset_dir))


scale_dataset_comp = kfp.components.create_component_from_func(
    func=initalize_scaling, base_image=BASE_IMAGE
)

### 2.3.) Cutting the Dataset into Sequences of *X* Days

In [23]:
def partition_dataset(
    gen_dtype: str,
    part_dataset_dir: OutputPath(str),
    prep_dataset_dir: InputPath(str),
    y_col: str,
    sequence_length: int = 10,
):
    """Create series of length `sequence_length` from the HugginFace Dataset in `dataset_dir`"""

    import numpy as np
    import os
    from datasets import load_from_disk

    dataset = load_from_disk(prep_dataset_dir)

    if not os.path.exists(part_dataset_dir):
        os.makedirs(part_dataset_dir)

    for split in dataset.keys():
        data = dataset[split]
        x, y = [], []
        data_len = data.shape[0]
        for i in range(sequence_length, data_len):
            period = range(i - sequence_length, i)
            x_values = list(data.__getitem__(period).values())[1:]
            x_values = np.reshape(
                np.array(x_values).flatten(), (len(x_values), len(period))
            ).T
            x.append(x_values)
            y.append(data.__getitem__(i)[y_col])

        x = np.array(x).astype(gen_dtype)
        y = np.array(y).astype(gen_dtype)

        np.save(f"{part_dataset_dir}/x-{split}-partition.npy", x)
        np.save(f"{part_dataset_dir}/y-{split}-partition.npy", y)
        print(f"{split} sequences created")

    print(
        f"Pre-processed Dataset saved. Contents of '{part_dataset_dir}':\n",
        os.listdir(part_dataset_dir),
    )


partition_ds_comp = kfp.components.create_component_from_func(
    func=partition_dataset, base_image=BASE_IMAGE
)

## 3.) Model Definition

### 3.1.) Training

In [24]:
def train_model(
    model_dir: OutputPath(str),
    part_dataset_dir: InputPath(str),
    batch_size: int = 16,
    epochs: int = 50,
    model_name: str = "model.h5",
):
    """Trains LSTM model. Once trained, the model is persisted to `model_dir`."""

    import time
    import os
    from numpy import load
    from tensorflow.keras.layers import LSTM, Dense
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.metrics import (
        MeanAbsoluteError,
        MeanAbsolutePercentageError,
        MeanSquaredError,
        RootMeanSquaredError,
    )

    x_train = load(f"{part_dataset_dir}/x-train-partition.npy", allow_pickle=False)
    y_train = load(f"{part_dataset_dir}/y-train-partition.npy", allow_pickle=False)

    x_test = load(f"{part_dataset_dir}/x-test-partition.npy", allow_pickle=False)
    y_test = load(f"{part_dataset_dir}/y-test-partition.npy", allow_pickle=False)
    print(
        f"Data shapes: Train - {x_train.shape}/{y_train.shape}, Test - {x_test.shape}/{y_test.shape}"
    )

    model = Sequential()

    # Model with n_neurons made up of x_train.shape[1] Timestamps, each with x_train.shape[2] variables
    n_neurons = x_train.shape[1] * x_train.shape[2]

    model.add(
        LSTM(
            n_neurons,
            return_sequences=True,
            input_shape=(x_train.shape[1], x_train.shape[2]),
        )
    )
    model.add(LSTM(n_neurons, return_sequences=False))
    model.add(Dense(6))
    model.add(Dense(1))

    model.compile(
        optimizer="adam",
        loss="mse",
        metrics=[
            MeanAbsoluteError(),
            MeanAbsolutePercentageError(),
            MeanSquaredError(),
            RootMeanSquaredError(),
        ],
    )
    print(model.summary())

    start = time.time()
    history = model.fit(
        x_train,
        y_train,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=(x_test, y_test),
    )
    print(f"\nTraining took {time.time()-start} seconds.\nModel train history:")
    print(history.history)

    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model.save(model_dir)
    print(f"Model saved. Contents of '{model_dir}':\n", os.listdir(model_dir))


train_model_comp = kfp.components.create_component_from_func(
    func=train_model, base_image=BASE_IMAGE
)

## 4.) Inference

### 4.1.) Preparation of a Sequence for the first Prediction + Forecast

In [25]:
def merge_data(
    columns: list,
    dataset_dir_1: InputPath(str),
    dataset_dir_2: InputPath(str),
    features: dict,
    gen_dtype: str,
    headers: list,
    order_by: str,
    prep_dataset_dir: OutputPath(str),
    scaler_p: InputPath(str),
    today: str,
    sequence_length: int = 10,
):
    """Merges the results of two Trino queries in `dataset_dir_1` and `dataset_dir_2` to get a dataset for inference"""

    import pandas as pd
    import numpy as np
    import joblib
    from datasets import Dataset, Features, Value

    def set_index_columns(df, cols, dedup=False):
        df.replace("NaN", 0, inplace=True)
        df.fillna(0, inplace=True)
        df.columns = cols
        df.drop_duplicates(inplace=True)
        df.sort_values(by=order_by, inplace=True)
        df.drop_duplicates(subset=["Date"], keep="last", inplace=True)
        df.drop(order_by, inplace=True, axis=1)

        df.sort_values(by="Date", inplace=True)
        df.index = pd.to_datetime(df["Date"], format="%Y-%m-%d")
        df.drop(columns="Date", inplace=True, axis=1)
        for col in df.columns:
            df[col] = df[col].astype(gen_dtype)
        return df

    with open(f"{scaler_p}/scaler.pickle", "rb") as f:
        scaler = joblib.load(f)

    df1 = pd.read_feather(dataset_dir_1)
    df2 = pd.read_feather(dataset_dir_2)

    df1 = set_index_columns(df1, ["timestamp"] + columns, dedup=True)
    df2 = set_index_columns(df2, ["timestamp"] + headers)

    features = {f: Value(dtype=features[f], id=None) for f in features.keys()}

    df1 = df1[-sequence_length:]
    df2.insert(0, "Close", df1.iloc[-1, 0])

    df2predict = pd.concat([df1, df2])

    df2predict_unscaled = np.array(df2predict)

    df2predict_scaled = scaler.transform(df2predict_unscaled)
    df2predict_scaled = pd.DataFrame(df2predict_scaled, columns=df2.columns)
    df2predict_scaled.insert(0, "Date", df2predict.index)

    future_dataset = Dataset.from_pandas(df2predict_scaled, features=Features(features))
    future_dataset.save_to_disk(prep_dataset_dir)
    print(f"Inference dataset scaled and saved to '{prep_dataset_dir}'")


merge_data_comp = kfp.components.create_component_from_func(
    func=merge_data, base_image=BASE_IMAGE
)

### 4.2.) 14 Days Forecast

In [26]:
def run_inference(
    prep_dataset_dir: InputPath(str),
    features: dict,
    gen_dtype: str,
    model_name: str,
    namespace: str,
    scaler_p: InputPath(str),
    y: str,
    sequence_length: int = 10,
):

    import requests
    import joblib
    from datasets import load_from_disk
    import numpy as np
    import json

    # function to update future dataset with newest prediction value
    def update_y(row, cond, target, y, new):
        if row[cond] == target:
            row[y] = new.astype(gen_dtype)[0]
        return row

    future_dataset = load_from_disk(prep_dataset_dir)
    predictions = []

    with open(f"{scaler_p}/scaler_pred.pickle", "rb") as f:
        scaler_pred = joblib.load(f)

    HEADERS = {"Host": model_name + "-predictor-default." + namespace}
    PREDICT_ENDPOINT = (
        "http://" + model_name + "-predictor-default/v2/models/model/infer"
    )
    print(f"Running inference against {PREDICT_ENDPOINT} on {HEADERS}")

    for i in range(sequence_length, future_dataset.num_rows):
        # get last 10 scaled values from dataset
        period = range(i - sequence_length, i)
        x_values = list(future_dataset.__getitem__(period).values())
        del x_values[2]  # date
        x_values = np.reshape(
            np.array(x_values).flatten(), (len(x_values), len(period))
        ).T
        x_values[:, [1, 0]] = x_values[:, [0, 1]]
        # do inference
        payload = {
            "inputs": [
                {
                    "name": "lstm_input",
                    "shape": [1, 10, 8],
                    "datatype": "FP32",
                    "data": x_values.astype(gen_dtype).tolist(),
                }
            ]
        }

        res = requests.post(PREDICT_ENDPOINT, headers=HEADERS, data=json.dumps(payload))
        response = json.loads(res.text)
        y_pred_scaled = response["outputs"][0]["data"]

        # Unscale the predicted values
        y_pred = scaler_pred.inverse_transform(
            np.array([y_pred_scaled]).reshape(-1, 1)
        )[0]
        predictions.append(y_pred[0])

        # update dataframe Y column with predicted values so that it's available for next inference iteration
        target = future_dataset.__getitem__(i)["Date"]
        future_dataset = future_dataset.map(
            lambda x: update_y(x, "Date", target, y, np.array(y_pred_scaled)),
            keep_in_memory=True,
        )

    # convert dataset to df and modify it to include unscaled price predictions
    predictions_df = future_dataset.to_pandas()
    predictions_df.set_index("Date", inplace=True)
    predictions_df.iloc[
        :10, predictions_df.columns.get_loc(y)
    ] = scaler_pred.inverse_transform(
        np.array(list(future_dataset.__getitem__(y))[:10]).reshape(1, -1)
    )
    predictions_df.iloc[10:, predictions_df.columns.get_loc(y)] = predictions[:]
    predictions_df = predictions_df[["Close"]]
    print("Predictions:", predictions_df)


run_inference_comp = kfp.components.create_component_from_func(
    func=run_inference, base_image=BASE_IMAGE
)

## 5.) Run Everthing within a Kubeflow Pipeline

### 5.1.) Definition

In [27]:
from datetime import datetime

headers = list(df_structure.keys())
headers.remove(y)
today = datetime.now().strftime("%Y-%m-%d")
seq_len = 10


@dsl.pipeline(
    name="Apple Stock Price Prediction",
    description="An example pipeline that predicts Apple's stock (close) price based on weather forecasts",
)
def stock_price_prediction_pipeline(blackboard: str, model_name: str):

    create_blackboard = dsl.VolumeOp(
        name="Create Artefacts Blackboard",
        resource_name=blackboard,
        modes=dsl.VOLUME_MODE_RWO,
        size="4Gi",
        set_owner_reference=True,
    )

    load_dataframe_via_trino_task = CATALOG.load_dataframe_via_trino_comp(
        query="SELECT a.Date, a.Close, w.AWND, w.PRCP, w.SNOW, w.SNWD, w.TAVG, w.TMAX, w.TMIN \
            FROM mongodb.weather.weatherny w JOIN postgresql.public.applehistory a \
            ON w._id = a.Date WHERE a.Date < date '2022-08-05' ORDER BY date ASC",
        columns=[
            "Date",
            "Close",
            "AWND",
            "PRCP",
            "SNOW",
            "SNWD",
            "TAVG",
            "TMAX",
            "TMIN",
        ],
    )  # weather
    load_dataframe_via_trino_task.after(create_blackboard)

    conversion_task = convert_dataset_to_HF_comp(
        raw_data_dir=load_dataframe_via_trino_task.outputs["dataframe"],
        split=0.15,
        features=df_structure,
        columns=list(df_structure.keys()),
    )

    scaling_task = scale_dataset_comp(
        dataset_dir=conversion_task.outputs["dataset_dir"],
        features=df_structure,
        y_col=y,
    )

    partition_task = partition_ds_comp(
        gen_dtype=gen_dtype,
        prep_dataset_dir=scaling_task.outputs["prep_dataset_dir"],
        y_col="Close",
        sequence_length=seq_len,
    )

    train_model_task = train_model_comp(
        part_dataset_dir=partition_task.outputs["part_dataset_dir"],
        model_name=model_name,
        epochs=5,
    )

    convert_model_to_onnx_task = CATALOG.convert_model_to_onnx_comp(
        train_model_task.outputs["model_dir"]
    )
    upload_model_task = CATALOG.upload_model_comp(
        convert_model_to_onnx_task.outputs["onnx_model_dir"], project_name=model_name
    )

    deploy_model_with_kserve_task = CATALOG.deploy_model_with_kserve_comp(
        project_name=model_name,
        model_version=upload_model_task.outputs["model_version"],
    )

    deploy_model_with_kserve_task.after(upload_model_task)

    ### inference
    query_presence_task = CATALOG.load_dataframe_via_trino_comp(
        query=f"SELECT DISTINCT w._timestamp, a.date, a.apple_price, w.AWND, w.PRCP, \
            w.SNOW, w.SNWD, w.TAVG, w.TMAX, w.TMIN FROM trinoweather w LEFT OUTER JOIN trinostock a \
            ON w.DATE = a.date WHERE a.date > date '2022-08-04' AND w.date < date '{today}' ORDER BY date ASC",
        columns=[
            "_timestamp",
            "date",
            "apple_price",
            "AWND",
            "PRCP",
            "SNOW",
            "SNWD",
            "TAVG",
            "TMAX",
            "TMIN",
        ],
        catalog="kafka",
        schema="default",
    )

    query_presence_task.after(create_blackboard)

    query_future_task = CATALOG.load_dataframe_via_trino_comp(
        query=f"SELECT DISTINCT _timestamp, date, AWND, PRCP, SNOW, SNWD, TAVG, TMAX, TMIN FROM trinoweather \
            WHERE date > date '{today}' ORDER BY date ASC",
        columns=[
            "_timestamp",
            "date",
            "AWND",
            "PRCP",
            "SNOW",
            "SNWD",
            "TAVG",
            "TMAX",
            "TMIN",
        ],
        catalog="kafka",
        schema="default",
    )

    query_future_task.after(create_blackboard)

    merge_data_task = merge_data_comp(
        columns=list(df_structure.keys()),
        dataset_dir_1=query_presence_task.outputs["dataframe"],
        dataset_dir_2=query_future_task.outputs["dataframe"],
        today=today,
        features=df_structure,
        headers=headers,
        order_by="timestamp",
        scaler_p=scaling_task.outputs["prep_dataset_dir"],
        sequence_length=seq_len,
        gen_dtype=gen_dtype,
    )

    inference_task = run_inference_comp(
        prep_dataset_dir=merge_data_task.outputs["prep_dataset_dir"],
        scaler_p=scaling_task.outputs["prep_dataset_dir"],
        sequence_length=seq_len,
        model_name=model_name,
        namespace=NAMESPACE,
        gen_dtype=gen_dtype,
        features=df_structure,
        y=y,
    )
    inference_task.after(deploy_model_with_kserve_task)

### 5.2.) Configuration & Execution

In [28]:
def disable_cache_transformer(op):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


pipeline_conf = PipelineConf()
pipeline_conf.add_op_transformer(disable_cache_transformer)
pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
    volume=V1Volume(
        name=ARGUMENTS["blackboard"],
        persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
            "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
        ),
    ),
    path_prefix=f'{ARGUMENTS["blackboard"]}/',
)

client.create_run_from_pipeline_func(
    stock_price_prediction_pipeline,
    arguments=ARGUMENTS,
    namespace=NAMESPACE,
    pipeline_conf=pipeline_conf,
)

RunPipelineResult(run_id=64d8656d-f373-445a-b793-66c9fc31515c)