#  Great Expectations in the Forecasting Team of Maersk

* Forecasting Team 🥑 at Maersk
* Data and machine learning pipelines in k8s
* Implementation of great expectations
* Pain & Gain

___

Micha B A Kunze - michabenachim.kunze@maersk.com | [@mbakunze](https://github.com/mbakunze)

# Forecasting Team 🥑 of Maersk


# Maersk

* largest ocean container shipping company (~20-25% of global volume)
* ~ 700 vessels (owned + chartered)


## Team 🥑
We build forecasts of future demand - together:

Andreas, Edward, Lasse, Hans, Karin, Julija, Henrik, Ricko, Luca, Marco, Andreas, Micha, Søren and Julia.

## Building Forecasts

 * daily batch processing of __full__ historical datasets
 * crucial that data is historically accurate (at which time did you know what)
 * computationally heavy jobs
 

 ### Tech Stack:
 * Git
 * Docker
 * Kubernetes
 * Python, R, Spark
 * Azure Blob Storage
 * Datadog

# How we work

 * everything is code, and code lives in git
 * DevOps / GitOps
 * all running code is containerized
 * highly collaborative: software engineers | data scientists | data engineers pair to solve challenges
 * multiple forecasting products in production -> millions of forecasts a day




# Data and Machine Learning Pipelines in Kubernetes

# How We Build Pipelines

 * use pippi to run pipelines (in-house built orchestrator)
 * separate functions (-> code) and data (-> config)
 * use git branching to build new/modify pipeline code
 * datasets are saved as __immutable__ snapshots for each run
 * compute jobs are implemented as k8s __jobs__ that get called when upstream data changes
 
 Allows us to isolate concerns and develop in parallel.
 
![pippi_r2l](assets/pippi_r2l.png)

In [17]:
# this might be might the code for you data pipeline
# runs locally the same as on k8s

import pandas as pd

def most_important_transform(source01_path: str, source02_path: str, destination_path: str):
    source01_df = pd.read_parquet(source01_path)
    source02_df = pd.read_parquet(source02_path)
    
    transformed_df = source01_df.merge(source02_df, on="order_id", how="left")
    
    transformed_df.to_parquet(destination_path)
    


## Your Configuration Could Then Look Like This:

___

```bash
SOURCE01_STORAGE_ACCOUNT=prod
SOURCE01_DATASET=orders

SOURCE02_STORAGE_ACCOUNT=prod
SOURCE02_DATASET=contracts

DESTINATION_STORAGE_ACCOUNT=prod
DESTINATION_DATASET=enriched_orders

JOB_EXECUTION_TARGET=ge_entrypoint python -m most_important_transform

SCHEDULER_STRATEGY=Any
```
___
_Note: `ge_entrypoint` is a simple python wrapper_

# Functional Data Engineering

Functional as in _functional programming_. Many of the principles lend themsleves well to data engineering.

 * __immutable data__ - snapshot all the data!
 * __idempotent functions__ - data pipelines are functions that have no side effects
 * __reproducibility__ - foundation of the scientific method (and sanity)
 
 
___ 
 _Check:_ Maxime Beauchemin, founder of Airflow
  * [medium post on functional data engineering](https://medium.com/@maximebeauchemin/functional-data-engineering-a-modern-paradigm-for-batch-data-processing-2327ec32c42a)
  * [youtube video on functional data engineering](https://youtu.be/4Spo2QRTz1k)

## Immutability

Data snapshotting:

![pippi-snaphoting](assets/pippi-pipeline.png)

## Immutability

Data & code dependencies:

![pippi-snaphoting](assets/pippi-snaphots.png)



## Reproducibility

Foundation to do scientific/analytics work. 

Immutability is a key enabler of reproducibility, and so is idempotency. 


# Implementation of Great Expectations


## Run great-expectations 


* we simply run great-expectations in our docker containers
* each forecasting product has its own GE setup (which is the default setup)
* we follow the simple convention that `expectation suite name` =  `dataset name`
    - our `ge_entrypoint` script only matches the above convention to run the proper tests
    
* when GE fails we break the pipeline (with some pretty logging) and we get alerts via our default DataDog:PagerDuty setup

# How We Use Data Validation

* validate source AND destination data
* breack on failure to validate
* if output check fails - save data in "failed" location for inspection/debugging
* data checks are running **decoupled** from the pipeline code

![ge_flow](assets/ge_entrypoint_logic.png)


## Data Test Coverage

* we wrote some simple scripts to help us cover datasets in all our forecasting products
* we use a data cli in our GE notebooks to easily profile data and edit expectations
    - _we only use files and are note making use of the data source configs of GE_
    
       
```bash
❯ poetry run ge_coverage
*** Data Validation Coverage Report ***
=================================================================
Dataset Name                                           | covered?
-----------------------------------------------------------------
dataset1                                               | ❌
dataset1-final_v1                                      | ✅
dataset1-final_v2                                      | ✅
dataset1-draft_v1                                      | ❌
dataset2-final                                         | ✅
dataset1-intermediate_v1                               | ✅
all-the-data                                           | ✅
features-new                                           | ✅
feature_v2                                             | ✅
awesome-forecast-for-real-this-time                    | ❌
=================================================================
Summary:
-----------------------------------------------------------------
Datasets covered: 7/10
Coverage percentage: 70 %  😐
```

# Pain & Gain

##  💥 Things Break Constantly 💥

 * not a question of IF, but only of WHEN
 * when we first put great expectation into production we were breaking many data pipelines that turned out to actually be OK 😱
  - we started to learn about our data quality
  - iterative process
 * data quality is part of our constant improvement 

## Data Validation in Team 🥑

* data validation in production for all our forecasting products for ~3 month
* already prevented many incidents 💪
    - prevented us from outputting bad quality forecasts
    - broke data pipelines where we had incomplete data due to a race condition we hit upstream (which previously got by unnoticed)
* lead to investigation of data quality issues and improvements



## Data Validation in Team 🥑

* the whole team collaborates on data quality and testing issues 🤝
* extensibility and notebook templating allowed us to implement GE fast ⚡️
* automatic data and expectation documention using GE in mkdocs 📖
