# Simple and Robust ML Pipeline

With this notebook, we want to work out the fundamentals of a robust ML pipeline.

Most ML pipelines (or data pipelines) follow a similar structure. The following graph can be seen as a minimal ML pipeline. The `preparation`, `training` and `validation` parts of the pipeline can be seen as `ML (related) code`.

![How a data pipeline looks like](https://cloud.google.com/architecture/images/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning-2-manual-ml.svg)

There is research about the fact that ML Code is just a small part when doing Machine Learning:

![ML Code is just a small part when doing Machine Learning](https://cloud.google.com/architecture/images/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning-1-elements-of-ml.png)

Original Source: *Sculley, David, et al. "Hidden technical debt in machine learning systems." Advances in neural information processing systems 28 (2015): 2503-2511.*


**In this notebook, we want to show by example how to transform prototype ML code which requires manual execution into a robust, scalable ML pipeline.**

It is NOT about tooling, it is about understanding the fundamental principles of a robust ML pipeline.

This is your small pipeline.

## The Iris dataset

We build a classifier for Iris (german: "Schwertlilie").
For features, we use speal width and length.

![Irises with sepal and petal](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/Machine+Learning+R/iris-machinelearning.png)

Source: [Machine Learning in R for beginners](https://www.datacamp.com/community/tutorials/machine-learning-in-r)

In [1]:
import joblib
from loguru import logger
import os
import pandas as pd
from sklearn.metrics import classification_report as clf_report
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

output_dir = "first_try"
os.makedirs(output_dir, exist_ok=True)

# Extract data
raw_data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
raw_data_statistics = raw_data.describe()
logger.info(raw_data_statistics)

# prepare
features = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
standardize = False
X = raw_data[features]
y = raw_data["species"]

mean = X.mean()
std = X.std()

if standardize:
    logger.info("Standardize X.")
    X = (X - mean) / std

# train
clf_params = {"max_depth": 2}
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, train_size=0.8)

clf = DecisionTreeClassifier(random_state=42, **clf_params)
clf.fit(X_train, y_train)
joblib.dump(clf, f"{output_dir}/model.joblib")
y_pred = clf.predict(X_test)
classification_report = clf_report(y_true=y_test, y_pred=y_pred, output_dict=True)
logger.info(classification_report)
pd.DataFrame(classification_report).to_csv(f"{output_dir}/classification_report.csv")

# validate
macro_avg_f1_score_min = 0.95
macro_avg_f1_score = classification_report["macro avg"]["f1-score"] 

if macro_avg_f1_score < macro_avg_f1_score_min:
    passed = False
else: 
    passed = True
print(passed)

2021-06-17 10:06:45.638 | INFO     | __main__:<module>:15 -        sepal_length  sepal_width  petal_length  petal_width
count    150.000000   150.000000    150.000000   150.000000
mean       5.843333     3.057333      3.758000     1.199333
std        0.828066     0.435866      1.765298     0.762238
min        4.300000     2.000000      1.000000     0.100000
25%        5.100000     2.800000      1.600000     0.300000
50%        5.800000     3.000000      4.350000     1.300000
75%        6.400000     3.300000      5.100000     1.800000
max        7.900000     4.400000      6.900000     2.500000
2021-06-17 10:06:45.657 | INFO     | __main__:<module>:39 - {'setosa': {'precision': 1.0, 'recall': 1.0, 'f1-score': 1.0, 'support': 10}, 'versicolor': {'precision': 1.0, 'recall': 0.8888888888888888, 'f1-score': 0.9411764705882353, 'support': 9}, 'virginica': {'precision': 0.9166666666666666, 'recall': 1.0, 'f1-score': 0.9565217391304348, 'support': 11}, 'accuracy': 0.9666666666666667, 'macro avg

True


### Resource considerations for the pipeline

- `extract`: Might be Gigabytes of data to load. **time: max low double digit minutes**
- `prepare`: Can become quite complex, especially if we do feature engineering. **time: up to double digit hours**
- `train`: Can become quite complex. **time: up to double digit hours**
- `validate`: Should be kept simple. **time: at most single digit minutes**

### Automation Option 1: Lift the "monolithic" pipeline 
- Put the code above as-is into a docker container
- Run it on a machine that has as many resources as the most demanding step requires
- Any error during execution will require to start the whole execution again

Pro: 
- Can be done fast

Con:
- not robust at all
- if we want to test it, we need to run the whole pipeline 
- Temptation for "let me add a small improvement here" is very big
- No intermediate checks, mistakes can be easily overseen
- Possibly expensive, because we need maximum resources during full pipeline execution

### Automation Option 2: Refactor to a robust pipeline by creating independent, well-defined components

- A pipeline is composed of independent components which are [pure functions](https://en.wikipedia.org/wiki/Pure_function)
- Independent components are thoroughly (unit-)tested (todo in this notebook)
- The pipeline should have a functional end to end test (todo in this notebook)
- All artifacts and metadata are persisted in a unique storage location (for example at pipeline run UUID)
- Config files should be used

Pro:
- robust
- transparent, easy to track
- easy to port to a ML framework
- Scales with data
- Scales with team size
- Makes the team in the mid run more productive
- Possibly more cost-effective during execution, because we can assign resources more fine-granular

Con:
- Requires (software) engineering

In [2]:
pipeline_run_id = "first_pipeline_run"
output_dir = pipeline_run_id

### We create a directory for the pipeline run outputs

In [3]:
import os
os.makedirs(output_dir, exist_ok=True)

### First develop interactively

In [4]:
# extract
import os
import pandas as pd

raw_data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
raw_data_path = f"{output_dir}/raw_data.parquet"
raw_data.to_parquet(raw_data_path)

raw_data_statistics = raw_data.describe()
raw_data_statistics_path = f"{output_dir}/raw_data_statistics.csv"
raw_data_statistics.to_csv(raw_data_statistics_path)

In [5]:
# If it works, delete the contents
!rm -r $output_dir

### Then turn it into a pure, self-contained function

In [6]:
def extract(output_dir: str) -> dict:
    params = locals()
    from loguru import logger
    import os
    import pandas as pd
    import uuid
    # BEST PRACTICE: Make "started" and "finished" logs
    # BEST PRACTICE: Log the input params of a function
    logger.info(f"extract started with {params}.")
    os.makedirs(output_dir, exist_ok=True)
    
    raw_data = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
    raw_data_path = f"{output_dir}/raw_data.parquet"
    raw_data.to_parquet(raw_data_path)
    
    raw_data_statistics = raw_data.describe()
    raw_data_statistics_path = f"{output_dir}/raw_data_statistics.csv"
    raw_data_statistics.to_csv(raw_data_statistics_path)
    logger.info("extract finished.")
    return {
        "raw_data_path": raw_data_path,
        "raw_data_statistics_path": raw_data_statistics_path,
    }

### Test the function

In [7]:
extract_result = extract(output_dir=output_dir)
extract_result

2021-06-17 10:06:46.056 | INFO     | __main__:extract:9 - extract started with {'output_dir': 'first_pipeline_run'}.
2021-06-17 10:06:46.234 | INFO     | __main__:extract:19 - extract finished.


{'raw_data_path': 'first_pipeline_run/raw_data.parquet',
 'raw_data_statistics_path': 'first_pipeline_run/raw_data_statistics.csv'}

In [8]:
def prepare(raw_data_path: str, features: list, standardize: bool, output_dir: str) -> dict:
    """Prepare the selected features and standardize if wanted."""
    params = locals()
    from loguru import logger
    import os
    import pandas as pd
    logger.info(f"prepare started with {params}")

    os.makedirs(output_dir, exist_ok=True)

    raw_data = pd.read_parquet(raw_data_path)
    X = raw_data[features]
    y = raw_data["species"]

    mean = X.mean()
    std = X.std()

    if standardize:
        logger.info("Standardize X.")
        X = (X - mean) / std

    X_path = f"{output_dir}/X.parquet"
    y_path = f"{output_dir}/y.parquet"

    X.to_parquet(X_path)
    y.to_frame().to_parquet(y_path)
    logger.info("prepare finished.")
    return {
        "mean": mean.to_dict(),
        "std": std.to_dict(),
        "X_path": X_path,
        "y_path": y_path
    }

In [9]:
prepare_result = prepare(raw_data_path=extract_result["raw_data_path"], features=["sepal_length", "sepal_width", "petal_length", "petal_width"], standardize=True, output_dir=pipeline_run_id)
prepare_result

2021-06-17 10:06:46.256 | INFO     | __main__:prepare:7 - prepare started with {'raw_data_path': 'first_pipeline_run/raw_data.parquet', 'features': ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], 'standardize': True, 'output_dir': 'first_pipeline_run'}
2021-06-17 10:06:46.270 | INFO     | __main__:prepare:19 - Standardize X.
2021-06-17 10:06:46.277 | INFO     | __main__:prepare:27 - prepare finished.


{'mean': {'sepal_length': 5.843333333333334,
  'sepal_width': 3.0573333333333337,
  'petal_length': 3.7580000000000005,
  'petal_width': 1.1993333333333336},
 'std': {'sepal_length': 0.828066127977863,
  'sepal_width': 0.4358662849366982,
  'petal_length': 1.7652982332594662,
  'petal_width': 0.7622376689603465},
 'X_path': 'first_pipeline_run/X.parquet',
 'y_path': 'first_pipeline_run/y.parquet'}

In [10]:
def train(X_path: str, y_path: str, output_dir: str, clf_params: dict) -> dict:
    params = locals()
    from joblib import dump
    from loguru import logger
    import pandas as pd
    from sklearn.metrics import classification_report as clf_report
    from sklearn.model_selection import train_test_split
    from sklearn.tree import DecisionTreeClassifier

    logger.info(f"train started with {params}.")
    os.makedirs(output_dir, exist_ok=True)

    X = pd.read_parquet(X_path)
    y = pd.read_parquet(y_path)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, train_size=0.8)
    
    clf = DecisionTreeClassifier(random_state=42, **clf_params)
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    classification_report = clf_report(y_true=y_test, y_pred=y_pred, output_dict=True)
    model_path = f"{output_dir}/model.joblib"
    dump(clf, model_path)
    logger.info("train finished.")
    return {
        "model_path": model_path,
        "classification_report": classification_report
    }

In [11]:
train_result = train(X_path=prepare_result["X_path"], y_path=prepare_result["y_path"], output_dir=pipeline_run_id, clf_params={"max_depth": 2})
train_result

2021-06-17 10:06:46.293 | INFO     | __main__:train:10 - train started with {'X_path': 'first_pipeline_run/X.parquet', 'y_path': 'first_pipeline_run/y.parquet', 'output_dir': 'first_pipeline_run', 'clf_params': {'max_depth': 2}}.
2021-06-17 10:06:46.315 | INFO     | __main__:train:24 - train finished.


{'model_path': 'first_pipeline_run/model.joblib',
 'classification_report': {'setosa': {'precision': 1.0,
   'recall': 1.0,
   'f1-score': 1.0,
   'support': 10},
  'versicolor': {'precision': 1.0,
   'recall': 0.8888888888888888,
   'f1-score': 0.9411764705882353,
   'support': 9},
  'virginica': {'precision': 0.9166666666666666,
   'recall': 1.0,
   'f1-score': 0.9565217391304348,
   'support': 11},
  'accuracy': 0.9666666666666667,
  'macro avg': {'precision': 0.9722222222222222,
   'recall': 0.9629629629629629,
   'f1-score': 0.9658994032395567,
   'support': 30},
  'weighted avg': {'precision': 0.9694444444444444,
   'recall': 0.9666666666666667,
   'f1-score': 0.9664109121909632,
   'support': 30}}}

In [12]:
def validate(classification_report: dict, macro_avg_f1_score_min: float) -> dict:
    params = locals()
    from loguru import logger
    logger.info("validate started.")
    macro_avg_f1_score = classification_report["macro avg"]["f1-score"] 
    
    if macro_avg_f1_score < macro_avg_f1_score_min:
        passed = False
    else: 
        passed = True
    logger.info("validate finished.")
    return {
        "passed": passed
    }

In [13]:
validate_result = validate(classification_report=train_result["classification_report"], macro_avg_f1_score_min=0.95)
validate_result

2021-06-17 10:06:46.330 | INFO     | __main__:validate:4 - validate started.
2021-06-17 10:06:46.331 | INFO     | __main__:validate:11 - validate finished.


{'passed': True}

In [14]:
validate_result = validate(classification_report=train_result["classification_report"], macro_avg_f1_score_min=0.99)
validate_result

2021-06-17 10:06:46.336 | INFO     | __main__:validate:4 - validate started.
2021-06-17 10:06:46.337 | INFO     | __main__:validate:11 - validate finished.


{'passed': False}

In [15]:
def dict_to_file(dirname: str, d: dict):
    import os
    import json
    from loguru import logger
    
    os.makedirs(dirname, exist_ok=True)
    filename = f"{dirname}/output.json"
    logger.info(f"Write {d} to {filename}.")
    with open(filename, "w", encoding="UTF-8") as f:
        f.write(json.dumps(d))
    logger.info("Write finished.")

### Define a pipeline that puts the steps together

In [16]:
import uuid
from loguru import logger
def pipeline(
    output_dir: str,
    prepare_features: list,
    prepare_standardize: bool,
    train_clf_params: dict,
    validate_macro_avg_f1_score_min: float
):
    params = locals()
    if not output_dir:
        output_dir = f"output/{str(uuid.uuid4())}"
    dict_to_file(output_dir, params)

    extract_dir = f"{output_dir}/extract"
    extract_result = extract(output_dir=extract_dir)
    dict_to_file(extract_dir, extract_result)
    
    prepare_dir = f"{output_dir}/prepare"
    prepare_result = prepare(raw_data_path=extract_result["raw_data_path"], features=prepare_features, standardize=prepare_standardize, output_dir=prepare_dir)
    dict_to_file(prepare_dir, prepare_result)

    train_dir = f"{output_dir}/train"
    train_result = train(X_path=prepare_result["X_path"], y_path=prepare_result["y_path"], output_dir=train_dir, clf_params=train_clf_params)
    dict_to_file(train_dir, train_result)

    validate_dir = f"{output_dir}/validate"
    validate_result = validate(classification_report=train_result["classification_report"], macro_avg_f1_score_min=validate_macro_avg_f1_score_min)
    dict_to_file(validate_dir, validate_result)


In [17]:
import yaml

In [18]:
with open("config/run1.yaml") as f:
    config = yaml.safe_load(f)
config

{'output_dir': None,
 'prepare': {'features': ['sepal_length',
   'sepal_width',
   'petal_length',
   'petal_width'],
  'standardize': False},
 'train': {'clf_params': {'criterion': 'gini', 'max_depth': 2}},
 'validate': {'macro_avg_f1_score_min': 0.95}}

In [19]:
pipeline(
    output_dir=config["output_dir"],
    prepare_features=config["prepare"]["features"],
    prepare_standardize=config["prepare"]["standardize"],
    train_clf_params=config["train"]["clf_params"],
    validate_macro_avg_f1_score_min=config["validate"]["macro_avg_f1_score_min"],
)

2021-06-17 10:06:46.390 | INFO     | __main__:dict_to_file:8 - Write {'output_dir': None, 'prepare_features': ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'], 'prepare_standardize': False, 'train_clf_params': {'criterion': 'gini', 'max_depth': 2}, 'validate_macro_avg_f1_score_min': 0.95} to output/9d2f962c-d02a-433b-ad2a-e396a2fcab5d/output.json.
2021-06-17 10:06:46.392 | INFO     | __main__:dict_to_file:11 - Write finished.
2021-06-17 10:06:46.393 | INFO     | __main__:extract:9 - extract started with {'output_dir': 'output/9d2f962c-d02a-433b-ad2a-e396a2fcab5d/extract'}.
2021-06-17 10:06:46.525 | INFO     | __main__:extract:19 - extract finished.
2021-06-17 10:06:46.526 | INFO     | __main__:dict_to_file:8 - Write {'raw_data_path': 'output/9d2f962c-d02a-433b-ad2a-e396a2fcab5d/extract/raw_data.parquet', 'raw_data_statistics_path': 'output/9d2f962c-d02a-433b-ad2a-e396a2fcab5d/extract/raw_data_statistics.csv'} to output/9d2f962c-d02a-433b-ad2a-e396a2fcab5d/extract/output.j

In [20]:
with open("config/run2.yaml") as f:
    config = yaml.safe_load(f)
config

{'output_dir': None,
 'prepare': {'features': ['sepal_length', 'sepal_width'], 'standardize': True},
 'train': {'clf_params': {'criterion': 'gini', 'max_depth': 10}},
 'validate': {'macro_avg_f1_score_min': 0.95}}

In [21]:
pipeline(
    output_dir=config["output_dir"],
    prepare_features=config["prepare"]["features"],
    prepare_standardize=config["prepare"]["standardize"],
    train_clf_params=config["train"]["clf_params"],
    validate_macro_avg_f1_score_min=config["validate"]["macro_avg_f1_score_min"],
)

2021-06-17 10:06:46.567 | INFO     | __main__:dict_to_file:8 - Write {'output_dir': None, 'prepare_features': ['sepal_length', 'sepal_width'], 'prepare_standardize': True, 'train_clf_params': {'criterion': 'gini', 'max_depth': 10}, 'validate_macro_avg_f1_score_min': 0.95} to output/5c8aea30-9055-4852-83e3-6e6029d71093/output.json.
2021-06-17 10:06:46.568 | INFO     | __main__:dict_to_file:11 - Write finished.
2021-06-17 10:06:46.568 | INFO     | __main__:extract:9 - extract started with {'output_dir': 'output/5c8aea30-9055-4852-83e3-6e6029d71093/extract'}.
2021-06-17 10:06:46.758 | INFO     | __main__:extract:19 - extract finished.
2021-06-17 10:06:46.759 | INFO     | __main__:dict_to_file:8 - Write {'raw_data_path': 'output/5c8aea30-9055-4852-83e3-6e6029d71093/extract/raw_data.parquet', 'raw_data_statistics_path': 'output/5c8aea30-9055-4852-83e3-6e6029d71093/extract/raw_data_statistics.csv'} to output/5c8aea30-9055-4852-83e3-6e6029d71093/extract/output.json.
2021-06-17 10:06:46.760 | 

## Automate and go on!

- The goal is **unattended exeuction**. This frees up our resources to focus on the next projects
- Implement basic monitoring that **notifies us on pipeline failure only**. Notifications for successful pipeline runs are only noise. 
- **Gain confidence in the pipeline** by writing end-to-end tests on artificial data with output verification (is the output as expected?)
- **Schedule** periodically for retraining on the freshest data


### Principles of a robust pipeline:
- All artifacts and metadata are persisted in a unique storage location (for example at pipeline run UUID)
- Config files are crucial
- A pipeline is composed of independent components
- Independent components are thoroughly (unit-)tested (todo in this notebook)
- The pipeline should have a functional end to end test (todo in this notebook

### Move away from own machine

#### Option 1: Run "locally" on a remote server
- Start a remote server, download the git repo there, and execute there
- For scheduling, custom CRON jobs need to be created
- File storage location: file system of the remote machine

#### Option 2: Use ML Frameworks
- Package the Python code in a docker container
- Write a wrapper around each component using the SDK of the orchestrator of your choice
- You get a nice UI to browse previous and planned executions
- Requires quite some engineering
- Persistence of Artifacts and metadata will then be done in a database or on remove storage
- List of orchestrators (not exhaustive):
    - [Luigi](https://github.com/spotify/luigi), very mature, maybe outdated, can run components on a single machine possible
    - [MLFlow](https://www.mlflow.org/docs/latest/projects.html#running-projects), actively developed, nice UI, can start a pipeline on a single machine or different machines
    - [Airflow](https://airflow.apache.org/docs/apache-airflow/2.0.1/), very powerful tool, (too?) verbose UI, overkill for simple tasks, can be run on kubernetes
    - [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/overview/pipelines-overview/), powerful, reduced UI, kubernetes only, overkill for simple tasks
- Managed orchestrator offerings exist from Azure, GCP, and also other service providers!

## Tips and Tricks

- Keep the overall goal in mind: Automate and go on with the next project
- We write code for humans, not for machines
- We write the code for our coworkers
- Each component should be independent
- Persist all outputs and metadata!
- After each pipeline step (= component execution), return where to find the outputs
- Develop a template for ML pipeline components
- Verbosity is your friend
- Use typed Python
- Do NOT use CSV as serialization format, use Parquet instead. CSV does not scale to bigger data.
- lego variable naming improves code maintenance
- be consistent with variable and file naming, do not introduce unnecessary variable renames (like `m_d = params["max_depth"]`)
- consistency is very important, things should always be done one way (for example: file path locations)
- Write unit tests for every component
- Write an end2end test with artificial data and verify the output
- logging (NOT printing) is important
- Only one change at a time! Isolate the changes to investigate the outut for the pipeline
- Create a new unique directory for the outputs of every pipeline run, no data will ever be overwritten
- What happens in the cloud? -> each component gets its own container. Files /artifacts are written to cloud storage, and metadata is written to a database

## References

- https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning, MLOps for advanced automation for big teams with big data and multiple models per week
- https://mlflow.org/, Framework with nice UI for model training and experimentation, can be executed locally
- https://dvc.org/, Data version control, “Git” for data
- https://dvc.org/doc/use-cases/versioning-data-and-model-files/tutorial, Tutorial for using “dvc” like the “git” command
- https://python-poetry.org/, in our experience, best Python virtual environment and packaging tool
- https://www.tensorflow.org/tfx, Opinionated, complex, tensorflow-based pipeline framework
- https://docs.microsoft.com/en-us/azure/machine-learning/concept-ml-pipelines, Azure ML pipelines with GUI for designer
- https://medium.com/memory-leak/data-orchestration-a-primer-56f3ddbb1700, Data Orchestration Landscape
- https://github.com/Azure/MachineLearningNotebooks/tree/master/tutorials, Example notebooks for Azure ML
- https://cloud.google.com/composer, Google Cloud managed Airflow
- https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline, Googles brand-new (in preview!) Pipelines tooling, pay-per-use for cloud resources, easy schedule pipelines for retraining