# 02 A Framework Approach

> "Automation is not the enemy of jobs. It frees up human beings to do higher-value work." ~ Andy Stern.

## Table of Contents

1. Overview
2. Learning Outcomes
3. Tools
4. The (Local) Cloud
5. Classy ETLs
6. Add-ons
7. Data Validation
7. Resources

## 1. Overview

In this section, we want to supercharge the minimum requirements we established for ourselves in section 1 (i.e. storage, compute, and version control) while expanding into better portability and orchestration.

To accomplish this, we'll give the driver seat to `metaflow`, a tool originally created at, and open-sourced by, Netflix a few years back.

The tool now has a company, [Outerbounds](), as its main contributor

## 2. Learning Outcomes

By the end of this session you will be able to,
- create workflows using metaflow
- schedule workflows with different time intervals
- understand how to visually inspect workflows

## 3. Tools

The tools that we will use in this section of the workshop are the following.

- [localstack](https://localstack.cloud/) --> "LocalStack is a cloud service emulator that runs in a single container on your laptop or in your CI environment. With LocalStack, you can run your AWS applications or Lambdas entirely on your local machine without connecting to a remote cloud provider!" ~ [localstack](localstack.cloud)
- [Metaflow]() --> "Metaflow is a human-friendly Python library that makes it straightforward to develop, deploy, and operate various kinds of data-intensive applications, in particular those involving data science and ML." ~ [Metaflow docs](https://docs.metaflow.org/introduction/what-is-metaflow)
- [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) --> "You use the AWS SDK for Python (Boto3) to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon Simple Storage Service (Amazon S3). The SDK provides an object-oriented API as well as low-level access to AWS services." ~ [boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html)

Let's get started by evaluating the (local) cloud. :)

## 4. The (Local) Cloud

LocalStack is a tool that allows us to emulate the cloud services provided by AWS in our local machines. Their free tier is a great to get started learning about the cloud and that's why we will be using it here.

Note that `localstack` need docked to be installed in your machine so if this is not available, you won't be able to do a few of the steps below.

That said, let's get started.


Open up a new terminal start localstack with the following command.

```sh
localstack start
```
You should be able to see the following image.

![lss](../images/localstack_start.png)

Now that we have a "Cloud" instance running in our machines, let's start by creating a bucket in S3 using boto3.

In [None]:
import boto3

In [None]:
s3_client = boto3.client("s3", endpoint_url="http://localhost.localstack.cloud:4566")

Note that because we are not actually interacting with a cloud provider like AWS, GCP or Azure, we need to point boto3 towards our local cloud using the parameter `endpoint_url`.

In [None]:
s3_client.create_bucket(Bucket="datalake")

In [None]:
s3_client.list_buckets()["Buckets"]

To have less verbose output, and more or less the same functionality, we can create resources instead of clients.

In [None]:
s3_resource = boto3.resource("s3", endpoint_url="http://localhost.localstack.cloud:4566")

In [None]:
s3_resource.create_bucket(Bucket="datalake2")

In [None]:
for bk in s3_resource.buckets.all():
    print(bk.name)

In order to finish setting up localstack for our workshop, we'll need to set up the aws and the metaflow configs.

## 5. Classy ETLs

The way metaflow works is by having the user define classes that inherite the metaflow's `FlowSpec` class and represent a flow of whatever you'd like to do, e.g., a data pipeline, a dashboard, or training one or many machine learning models, among many other taks.

The beauty of metaflow is its simplicity and customizable nature. It's downside is the lack of an easy-to-install and easy-to-set-up user interface where one could visually inspect ones flows.

Let's get our hands dirty with our first example.

In [None]:
%%writefile ../src/ml_eng/fireflow.py

from metaflow import FlowSpec, step

class FireFlow(FlowSpec):
    
    @step
    def start(self):
        print("Hi, this is your first flow!")
        from pathlib import Path
        self.data_path = Path().cwd().parent/"data"/"example"
        self.data_in = self.data_path.joinpath("federal_firefighting_costs.csv")
        self.data_out = self.data_path.joinpath("fire_flow_output.parquet")
        self.next(self.extract)

    @step
    def extract(self):
        import pandas as pd
        self.data = pd.read_csv(self.data_in)
        self.next(self.transform)

    @step
    def transform(self):
        import pandas as pd
        for col in self.data.iloc[:, 1:].columns:
            self.data[col] = self.data[col].str.replace(r'[^0-9]+', '', regex=True).astype(int)
        self.next(self.load)

    @step
    def load(self):
        import pandas as pd
        self.data.to_parquet(self.data_out)
        self.next(self.end)
        
    @step
    def end(self):
        print("Your first flow finished!")

if __name__ == "__main__":
    FireFlow()

Let's run our our file using the following command from our notebook.

Note that the same command won't work from the parent directory as the files are referenced from this notebook.

In [None]:
!python ../src/ml_eng/fireflow.py run

Let's go over what just happened.

Metaflow keeps track of a lot of things when we run a flow, and the output it just gave us gives us context as to what is happening inside of it. Here's what each piece of it means.

- `2023-02-26 10:57:24.693` --> Timestamp for the step
- `1677380240122156` --> Run ID
- `load` --> Step Name
- `4` --> Task ID
- `(pid 49081)` --> Process ID
- `Task is starting.` --> Log Message

The way metaflow passes arguments from one step to another and the way it keeps track of everything it touches is via the `self` argument. Each step is its own encapsulated container running in isolation but with awareness of where to go next after a step finishes.

## Exercise

Pick one function of each of of the files from the last section (`extract.py`, `transform.py`, and `load.py`) and create a flow with metaflow. Name it, BikesFlow.

Now we'll write a proper flow for our earlier pipeline and we'll keep improving it in the next session.

In [None]:
%%writefile ../src/ml_eng/data_flow.py

import urllib.request
from pathlib import Path
import pandas as pd, re

from metaflow import FlowSpec, step, Parameter

class MainDataFlow(FlowSpec):

    @step
    def start(self):
        import boto3
        from os.path import join
        import urllib.request
        
        url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00560/SeoulBikeData.csv'
        self.bucket = "datalake"
        self.raw_dir = "raw"
        self.raw_data_name = 'SeoulBikeData_test.csv'
        self.tmp_file, _ = urllib.request.urlretrieve(url, self.raw_data_name)

        s3_resource = boto3.resource("s3", endpoint_url="http://localhost.localstack.cloud:4566")         
        s3_resource.Object(bucket_name=self.bucket, key=join(self.raw_dir, self.raw_data_name)).upload_file(self.tmp_file)

        self.next(self.extract)

    @step
    def extract(self):
        import pandas as pd
        
        # get the data with its peculiar encoding
        self.data = pd.read_csv(self.tmp_file, encoding='iso-8859-1', parse_dates=['Date'], infer_datetime_format=True)
        self.next(self.transform)


    @step
    def transform(self):
        self.data.columns = [re.sub(r'[^a-zA-Z0-9\s_]', '', col).lower().replace(r" ", "_") for col in self.data.columns]

        self.data.sort_values(['date', 'hour'], inplace=True)

        self.data["year"]           = self.data['date'].dt.year
        self.data["month"]          = self.data['date'].dt.month
        self.data["week"]           = self.data['date'].dt.isocalendar().week
        self.data["day"]            = self.data['date'].dt.day
        self.data["day_of_week"]    = self.data['date'].dt.dayofweek
        self.data["is_month_start"] = self.data['date'].dt.is_month_start
        
        self.data.drop('date', axis=1, inplace=True)

        self.data = pd.get_dummies(data=self.data, columns=['holiday', 'seasons', 'functioning_day'])

        self.next(self.load)

    @step
    def load(self):
        import boto3
        from os.path import join
        from tempfile import TemporaryDirectory

        self.interim = "interim"
        self.clean_data_name = "clean22.parquet"

        with TemporaryDirectory("hello_temp") as tmp:
            tmp_file = join(tmp, self.clean_data_name)
            self.data.to_parquet(tmp_file)
            s3_resource = boto3.resource("s3", endpoint_url="http://localhost.localstack.cloud:4566")         
            s3_resource.Object(bucket_name=self.bucket, key=join(self.interim, self.clean_data_name)).upload_file(tmp_file)
        
        self.next(self.end)

    @step
    def end(self):
        print("MainDataFlow has finished successfully!")

if __name__ == "__main__":
    MainDataFlow()

Let's run our flow and then go over what just happened.

In [None]:
!python ../src/ml_eng/data_flow.py run

Everything metaflow flow has the following hierarchy.

> Metaflow > Flow > Run > Step > Task > Artifact

We can inspect what just our our flows via the the following commands.

In [None]:
from metaflow import Run

In [None]:
from metaflow import Metaflow
mf = Metaflow()

In [None]:
print(mf.flows)

In [None]:
from metaflow import Flow
flow = Flow('FireFlow')
runs = list(flow)
runs

In [None]:
runs[2]

In [None]:
runs[2].data

In [None]:
import pandas as pd
pd.read_parquet(runs[2].data.data_out).head()

We can also inspect the our bucket to see where the files we just worked with went.

In [None]:
!awslocal s3 ls datalake/

In [None]:
!awslocal s3 ls datalake/raw/

We can also further examine each more of the characteristics of our last with the `current` function.

In [None]:
%%writefile ../src/ml_eng/current_flow.py

from metaflow import FlowSpec, step, current

class CurrentFlow(FlowSpec):

    @step
    def start(self):
        print("flow name: %s" % current.flow_name)
        print("run id: %s" % current.run_id)
        print("origin run id: %s" % current.origin_run_id)
        print("step name: %s" % current.step_name)
        print("task id: %s" % current.task_id)
        print("pathspec: %s" % current.pathspec)
        print("namespace: %s" % current.namespace)
        print("username: %s" % current.username)
        print("flow parameters: %s" % str(current.parameter_names))
        self.next(self.end)

    @step
    def end(self):
        print("end has a different step name: %s" % current.step_name)
        print("end has a different task id: %s" % current.task_id)
        print("end has a different pathspec: %s" % current.pathspec)

if __name__ == '__main__':
    CurrentFlow()

In [None]:
!python ../src/ml_eng/current_flow.py run

## 6. Add-Ons

One of the aspects that makes metaflow so powerful is the additional utilities it comes with.

- @resources --> what resources our workflows need to use
- @conda --> the environment we would like to use provided by conda
- @schedule --> when to run our flows
- Parameter --> what parameters can we share across the steps or via the command line
- cards --> a way to visualize our flows

In [None]:
%%writefile ../src/ml_eng/better_flow.py

import urllib.request
from pathlib import Path
import pandas as pd, re

from metaflow import FlowSpec, step, Parameter, schedule, conda, 


@schedule(daily=True)
class BetterDataFlow(FlowSpec):

    
    @step
    def start(self):
        import boto3
        from os.path import join
        import urllib.request
        
        url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00560/SeoulBikeData.csv'
        self.bucket = "datalake"
        self.raw_dir = "raw"
        self.raw_data_name = 'SeoulBikeData_test.csv'
        self.tmp_file, _ = urllib.request.urlretrieve(url, self.raw_data_name)

        s3_resource = boto3.resource("s3", endpoint_url="http://localhost.localstack.cloud:4566")         
        s3_resource.Object(bucket_name=self.bucket, key=join(self.raw_dir, self.raw_data_name)).upload_file(self.tmp_file)

        self.next(self.extract)


    @conda(python="3.10", libraries={"pandas": "1.5.3"})
    @step
    def extract(self):
        import pandas as pd
        
        # get the data with its peculiar encoding
        self.data = pd.read_csv(self.tmp_file, encoding='iso-8859-1', parse_dates=['Date'], infer_datetime_format=True)
        self.next(self.transform)

    
    @card
    @step
    def transform(self):
        self.data.columns = [re.sub(r'[^a-zA-Z0-9\s_]', '', col).lower().replace(r" ", "_") for col in self.data.columns]

        self.data.sort_values(['date', 'hour'], inplace=True)

        self.data["year"]           = self.data['date'].dt.year
        self.data["month"]          = self.data['date'].dt.month
        self.data["week"]           = self.data['date'].dt.isocalendar().week
        self.data["day"]            = self.data['date'].dt.day
        self.data["day_of_week"]    = self.data['date'].dt.dayofweek
        self.data["is_month_start"] = self.data['date'].dt.is_month_start
        
        self.data.drop('date', axis=1, inplace=True)

        self.data = pd.get_dummies(data=self.data, columns=['holiday', 'seasons', 'functioning_day'])

        self.next(self.load)

    @step
    def load(self):
        import boto3
        from os.path import join
        from tempfile import TemporaryDirectory

        self.interim = "interim"
        self.clean_data_name = "clean22.parquet"

        with TemporaryDirectory("hello_temp") as tmp:
            tmp_file = join(tmp, self.clean_data_name)
            self.data.to_parquet(tmp_file)
            s3_resource = boto3.resource("s3", endpoint_url="http://localhost.localstack.cloud:4566")         
            s3_resource.Object(bucket_name=self.bucket, key=join(self.interim, self.clean_data_name)).upload_file(tmp_file)
        
        self.next(self.end)

    @step
    def end(self):
        print("MainDataFlow has finished successfully!")

if __name__ == "__main__":
    BetterDataFlow()

## 9. Data Observability

In [None]:
import pandas as pd
import whylogs as why

df = pd.read_parquet("../data/porto/porto.parquet")
results = why.log(df)

In [None]:
prof_view = results.view()

In [None]:
from whylogs.viz import NotebookProfileVisualizer

visualization = NotebookProfileVisualizer()
visualization.set_profiles(target_profile_view=prof_view)
visualization.profile_summary()