# Training Data Pipeline

<img src="img/mlops_training_data_pipeline.png" width=1000/>

## Data Pipeline
Our data pipeline will run daily, collecting data and storing it in a way that can be run by machine learning jobs.

We will structure our data folders into three types of data:
- **raw** -> Get data: retrieve data from the API and save as a CSV file (data/raw/data.csv)
- **staging** -> Check and verify data: run quality checks, schema verification, and confirm that the data can be used in production (data/staging/data.csv)
- **training** -> Generate feature sets: the final product of the data pipeline, contains the data processed into features that can be consumed directly by the training process (data/training/data.csv)

With these three distinct phases, we ensure the reproducibility of the training data generation process, visibility, and a clear separation of the different steps of the process.

In [None]:
import os

from data_utils import get_train_test_split_for_stock
from config import *

In [None]:
print(os.getcwd())
#_path_to_src = "/home/ksatola/work/src"

In [None]:
# Go to PATH_TO_DATA_PIPELINE
os.chdir(PATH_TO_DATA_PIPELINE) 
print(os.getcwd())

In [None]:
from pathlib import Path

# Create the folder if does not exist
Path(PATH_TO_DATA_PIPELINE).mkdir(parents=True, exist_ok=True)

In [None]:
# Go to PATH_TO_DATA_PIPELINE
os.chdir(PATH_TO_DATA_PIPELINE) 
print(os.getcwd())

## MLflow Project
The MLflow projects feature allows your project to run in advanced cloud environments such as `Kubernetes` and `Databricks`. Scaling your ML job seamlessly is one of the main selling points of a platform such as MLflow.

In [None]:
# Add the MLProject file
_mlproject = "MLProject"

In [None]:
%%writefile {_mlproject}

name: training_data_pipeline

conda_env:
    
    conda.yaml

entry_points: 
    
    load_raw_data:
        command: "python load_raw_data.py"
            
    clean_validate_data:
        command: "python check_verify_data.py"
            
    feature_set_generation:
        command: "python generate_feature_set.py"
            
    main:
        command: "python main.py"

In [None]:
# Add the conda.yaml file
_conda = "conda.yaml"

In [None]:
# https://github.blog/2021-09-01-improving-git-protocol-security-github/
# git+git://github.com/mlflow/mlflow is no longer working

In [None]:
%%writefile {_conda}

name: pystock-data-features
channels:
    - defaults
dependencies:
    - python=3.8
    - numpy
    - scipy
    - pandas
    - cloudpickle
    - pip
    - pip:
        - git+https://github.com/mlflow/mlflow
        - pandas_datareader
        - great-expectations
        - pandas-profiling

In [None]:
# Add the main.py file
_main = "main.py"

In [None]:
%%writefile {_main}

import mlflow


def _run(entrypoint, 
         parameters={}, 
         source_version=None, 
         use_cache=True):
    
    print("---------------------")
    print(f"Launching new run for entrypoint={entrypoint} and parameters={parameters}")
    submitted_run = mlflow.run(".", entrypoint, parameters=parameters)
    return submitted_run


def workflow():
    
    with mlflow.start_run(run_name ="training-data-pipeline") as active_run:
        
        mlflow.set_tag("mlflow.runName", "training-data-pipeline")
        
        _run("load_raw_data")
        _run("clean_validate_data")
        _run("feature_set_generation")
        
        
if __name__=="__main__":
    
    workflow()

### Load raw data
Load the data from the API and save it in the _raw_ folder

In [None]:
# Load the data and save it in the _raw_ folder
_load_raw_data = "load_raw_data.py"

In [None]:
%%writefile {_load_raw_data}

import sys
from pathlib import Path
PATH_TO_CONFIG = "/home/ksatola/work/src"
sys.path.insert(1, PATH_TO_CONFIG)

import os
import mlflow
from datetime import date
from dateutil.relativedelta import relativedelta
import requests
from config import *
import pandas_datareader.data as web


if __name__ == "__main__":
    
    # Workaround to handle issue https://github.com/pydata/pandas-datareader/issues/868
    USER_AGENT = {
        'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
                    ' Chrome/91.0.4472.124 Safari/537.36')
        }
    sesh = requests.Session()
    sesh.headers.update(USER_AGENT)
    
    with mlflow.start_run(run_name="load_raw_data") as run:

        mlflow.set_tag("mlflow.runName", "training-data-pipeline")
        mlflow.set_tag("mlflow.runName", "load_raw_data")
        end = date.today()
        start = end + relativedelta(months=-3)
        
        df = web.DataReader("BTC-USD", 'yahoo', start, end, session=sesh)
        
        # Create the folder if does not exist
        Path(PATH_TO_DATA_PIPELINE, "raw").mkdir(parents=True, exist_ok=True)

        #df.to_csv("/home/ksatola/work/data/raw/data.csv")
        df.to_csv(os.path.join(PATH_TO_DATA_PIPELINE, "raw", "data.csv"))


### Data quality check
Checking data quality as part of your machine learning system is extremely critical to ensure the integrity and correctness of your model training and inference. 

From a data quality perspective, in a dataset there are a couple of critical dimensions with which to assess and profile our data, namely:
- **Schema compliance:** Ensuring the data is from the expected types; making sure that numeric values don't contain any other types of data
- **Valid data:** Assessing from a data perspective whether the data is valid from a business perspective
- **Missing data:** Assessing whether all the data needed to run analytics and algorithms is available

`Great Expectations Python package` for data validation, reference: https://github.com/great-expectations/great_expectations

#### In our project
We want the following rules/constraints to be verifiable:
- Date values should be valid dates and cannot be missing
- Check numeric and long values are correctly typed
- All columns are present in the dataset

In [None]:
_check_verify_data = "check_verify_data.py"

In [None]:
%%writefile {_check_verify_data}

import sys
from pathlib import Path
PATH_TO_CONFIG = "/home/ksatola/work/src"
sys.path.insert(1, PATH_TO_CONFIG)

import os
import mlflow
import json
import pandas as pd
import great_expectations as ge
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler
from config import *


if __name__ == "__main__":
    
    with mlflow.start_run(run_name="check_verify_data") as run:

        mlflow.set_tag("mlflow.runName", "training-data-pipeline")
        mlflow.set_tag("mlflow.runName", "check_verify_data")

        #df = pd.read_csv("/home/ksatola/work/data/raw/data.csv")
        df = pd.read_csv(os.path.join(PATH_TO_DATA_PIPELINE, "raw", "data.csv"), index_col="Date")
        
        # Convert Date index to column for data signature checking
        df.reset_index(inplace=True)

        describe_to_dict = df.describe().to_dict()
        mlflow.log_dict(describe_to_dict, "describe_data.json")
        
        print(json.dumps(
            describe_to_dict,
            sort_keys=True,
            indent=4,
            separators=(',', ': ')
        ))
        
        pd_df_ge = ge.from_pandas(df)
        
        #print(20*"--")
        #print(pd_df_ge)

        # Data signature evaluation
        assert pd_df_ge.expect_column_values_to_match_strftime_format("Date", "%Y-%m-%d").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("High", "float").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("Low", "float").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("Open", "float").success == True
        #assert pd_df_ge.expect_column_values_to_be_of_type("Open", "string").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("Close", "float").success == True
        #assert pd_df_ge.expect_column_values_to_be_of_type("Volume", "float").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("Volume", "long").success == True
        assert pd_df_ge.expect_column_values_to_be_of_type("Adj Close", "float").success == True

        # We can do some basic cleaning by dropping the null values
        df.dropna(inplace=True)
        
        # Create the folder if does not exist
        Path(PATH_TO_DATA_PIPELINE, "staging").mkdir(parents=True, exist_ok=True)

        #df.to_csv("/home/ksatola/work/data/staging/data.csv")
        df.to_csv(os.path.join(PATH_TO_DATA_PIPELINE, "staging", "data.csv"), index=False)


### Prepare data for modelling
The feature_set_generation.py file, will be responsible for generating our features and saving them in the training folder where all the data is valid and ready to be used for ML training. 

In [None]:
_generate_feature_set = "generate_feature_set.py"

In [None]:
%%writefile {_generate_feature_set}

import sys
from pathlib import Path
PATH_TO_CONFIG = "/home/ksatola/work/src"
sys.path.insert(1, PATH_TO_CONFIG)

import os
import mlflow
import pandas as pd
import numpy as np
from config import *


def rolling_window(a, window):
    """
    Takes np.array 'a' and size 'window' as parameters 
    Outputs an np.array with all the ordered sequences of values of 'a' of size 'window'
        e.g. Input: ( np.array([1, 2, 3, 4, 5, 6]), 4 )
             Output: 
                     array([[1, 2, 3, 4],
                           [2, 3, 4, 5],
                           [3, 4, 5, 6]])
    """
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)


if __name__ == "__main__":
    
    with mlflow.start_run() as run:

        mlflow.set_tag("mlflow.runName", "training-data-pipeline")
        mlflow.set_tag("mlflow.runName", "generate_feature_set")

        #raise Exception(f"Here the exception message")

        #df = pd.read_csv("/home/ksatola/work/data/staging/data.csv")
        df = pd.read_csv(os.path.join(PATH_TO_DATA_PIPELINE, "staging", "data.csv"))

        df['Delta Pct'] = (df['Close'] - df['Open'])/df['Open']
        df['Going Up'] = df['Delta Pct'].apply(lambda d: 1 if d > 0.00001 else 0)
                       
        # t-10 | t-9 | ... | t-2 | t-1 | label (Going Up)
        training_dataset = rolling_window(df['Going Up'].to_numpy(), WINDOW_SIZE)                 
                         
        cols = ["t-{}".format(10-i) for i in range(0, 10)] + ["target"]
        df = pd.DataFrame(training_dataset, columns=cols)   
        
        # Create the folder if does not exist
        Path(PATH_TO_DATA_PIPELINE, "training").mkdir(parents=True, exist_ok=True)
        
        df.to_csv(os.path.join(PATH_TO_DATA_PIPELINE, "training", "data.csv"), index=False)


## Run data pipeline

In [None]:
# In the HOST terminal run
# git config --global url."https://github.com/".insteadOf git://github.com/

In [None]:
# In the Jupyter Terminal
cd /home/ksatola/work/data/data_pipeline

# If there is conda environment error, than it may be too less RAM memory assigned for Docker
mlflow run . --experiment-name="SP_Training_Data_Pipeline"