## Writing Local Machine Learning Flows


In this section, we take the machine learning scripts from the previous lesson and turn them into flows. Currently, in the spirit of not introducing more tools, we'll write our flows in notebook cells and execute them here also (this may change).

### What is a (meta)flow?

Include brief introduction to DAGs and use similar images to this (or find higher res version):

![flow0](../img/flow_ex_0.png)

Flows and DAGs can often be more complicated:

![flow0](../img/flow_ex_1.png)

So ML flows can be broken down into steps, such as:

- importing data
- processing, wrangling, and/or transforming the data
- data validation
- model configuration
- model training, and
- model deployment.

The first flow we write will be a template showing these steps:

In [1]:
%%writefile ../flows/flow_template.py

"""

Template for writing Metaflows

"""

import os
from metaflow import FlowSpec, step, batch, current, environment, S3, card


class Template_Flow(FlowSpec):
    """
    Template for Metaflows.
    You can choose which steps suit your workflow.
    We have included the following common steps:
    - Start
    - Process data
    - Data validation
    - Model configuration
    - Model training
    - Model deployment
    """
    
    @card
    @step
    def start(self):
        """
        Start Step for a Flow;
        """
        print("flow name: %s" % current.flow_name)
        print("run id: %s" % current.run_id)
        print("username: %s" % current.username)

        # Call next step in DAG with self.next(...)
        self.next(self.process_raw_data)

    @step
    def process_raw_data(self):
        """
        Read and process data
        """
        print("In this step, you'll read in and process your data")

        self.next(self.data_validation)

    @step
    def data_validation(self):
        """
        Perform data validation
        """
        print("In this step, you'll write your data validation code")

        self.next(self.get_model_config)

    @step
    def get_model_config(self):
        """
        Configure model + hyperparams
        """
        print("In this step, you'll configure your model + hyperparameters")
        self.next(self.train_model)

    @step
    def train_model(self):
        """
        Train your model
        """
        print("In this step, you'll train your model")

        self.next(self.deploy)

    @step
    def deploy(self):
        """
        Deploy model
        """
        print("In this step, you'll deploy your model")

        self.next(self.end)

    @step
    def end(self):
        """
        DAG is done! Congrats!
        """
        print('DAG ended! Woohoo!')


if __name__ == '__main__':
    Template_Flow()


Overwriting ../flows/flow_template.py


In [3]:
! python ../flows/flow_template.py run

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mTemplate_Flow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-03-16 17:44:07.233 [0m[1mWorkflow starting (run-id 7235):[0m
[35m2022-03-16 17:44:13.159 [0m[32m[7235/start/135762 (pid 19591)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:44:20.281 [0m[32m[7235/start/135762 (pid 19591)] [0m[22mflow name: Template_Flow[0m
[35m2022-03-16 17:44:48.697 [0m[32m[7235/start/135762 (pid 19591)] [0m[22mrun id: 7235[0m
[35m2022-03-16 17:44:48.697 [0m[32m[7235/start/135762 (pid 19591)] [0m[22musername: hba[0m
[35m2022-03-16 17:44:51.782 [0m[32m[7235/start/135762 (pid 19591)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:44:55.909 [0m[32m[723

What are all these outputs? I'm glad that you asked!

![flow0](../img/mf_output.png)

- **Timestamp** denotes when the line was output.
- The information inside the square brackets identifies a **task**.
- Every Metaflow run gets a unique ID, a **run ID**.
- A run executes the steps in order. The step that is currently being executed is denoted by **step name**.
- A step may spawn multiple tasks which are identified by a **task ID**.
- The combination of a flow name, run ID, step name, and a task ID,uniquely identify a task in your Metaflow environment, amongst all runs of any flows. Here, the flow name is omitted since it is the same for all lines. We call this globally unique identifier a **pathspec**.
- Each task is executed by a separate process in your operating system, identified by a **process ID** aka _pid_. You can use any operating system level monitoring tools such as top to monitor resource consumption of a task based on its process ID.
- After the square bracket comes a **log message** that may be a message output by Metaflow itself, like “Task is starting” in this example, or a line output by your code.

_Note_: The above bullets are almost verbatim from VT's book.

### Metaflow cards

We can use MF cards to visualize aspects of our flow. In this case, there's not much to check out but we **can** see the DAG!

In [5]:
! python ../flows/flow_template.py card view start

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mTemplate_Flow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[32m[22mResolving card: Template_Flow/7235/start/135762[K[0m[32m[22m[0m


### Random Forests ---> Metaflows

In this section, we'll turn the random forest above into a flow:

In [6]:
%%writefile ../flows/local/rf_flow.py

from metaflow import FlowSpec, step, Parameter, JSONType, IncludeFile, card
import json

class ClassificationFlow(FlowSpec):
    """
    train a random forest
    """
    @card 
    @step
    def start(self):
        """
        Load the data
        """
        #Import scikit-learn dataset library
        from sklearn import datasets

        #Load dataset
        self.iris = datasets.load_iris()
        self.X = self.iris['data']
        self.y = self.iris['target']
        self.next(self.rf_model)
        

    @step
    def rf_model(self):
        """
        build random forest model
        """
        from sklearn.ensemble import RandomForestClassifier
        
        self.clf = RandomForestClassifier(n_estimators=10, max_depth=None,
            min_samples_split=2, random_state=0)
        self.next(self.train)

        
        
    @step
    def train(self):
        """
        Train the model
        """
        from sklearn.model_selection import cross_val_score
        self.scores = cross_val_score(self.clf, self.X, self.y, cv=5)
        self.next(self.end)
        
        
    @step
    def end(self):
        """
        End of flow, yo!
        """
        print("ClassificationFlow is all done.")


if __name__ == "__main__":
    ClassificationFlow()

Overwriting ../flows/local/rf_flow.py


Execute the above from the command line with

```bash
python flows/local/rf_flow.py run
```

In [7]:
! python ../flows/local/rf_flow.py run

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mClassificationFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-03-16 17:47:32.165 [0m[1mWorkflow starting (run-id 7237):[0m
[35m2022-03-16 17:47:38.219 [0m[32m[7237/start/135772 (pid 19674)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:48:18.373 [0m[32m[7237/start/135772 (pid 19674)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:48:22.219 [0m[32m[7237/rf_model/135773 (pid 19687)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:48:39.410 [0m[32m[7237/rf_model/135773 (pid 19687)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:48:43.308 [0m[32m[7237/train/135774 (pid 19692)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:49:

We can check out the Metaflow card:

In [9]:
! python ../flows/local/rf_flow.py card view start

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mClassificationFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[32m[22mResolving card: ClassificationFlow/7237/start/135772[K[0m[32m[22m[0m


Now we'll write a flow that has random forests, decision trees, and extra trees classifiers, tries them all and chooses the best one. We'll use the concept of branching, which is exemplified in this figure:


![flow0](../img/flow_ex_0.png)

In [10]:
%%writefile ../flows/local/tree_branch_flow.py

from metaflow import FlowSpec, step, Parameter, JSONType, IncludeFile, card
import json





class ClassificationFlow(FlowSpec):
    """
    train multiple tree based methods
    """
    @card 
    @step
    def start(self):
        """
        Load the data
        """
        #Import scikit-learn dataset library
        from sklearn import datasets

        #Load dataset
        self.iris = datasets.load_iris()
        self.X = self.iris['data']
        self.y = self.iris['target']
        self.next(self.rf_model, self.xt_model, self.dt_model)
    
                
    @step
    def rf_model(self):
        """
        build random forest model
        """
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.model_selection import cross_val_score
        
        self.clf = RandomForestClassifier(n_estimators=10, max_depth=None,
            min_samples_split=2, random_state=0)
        self.scores = cross_val_score(self.clf, self.X, self.y, cv=5)
        self.next(self.choose_model)

    @step
    def xt_model(self):
        """
        build extra trees classifier
        """
        from sklearn.ensemble import ExtraTreesClassifier
        from sklearn.model_selection import cross_val_score
        

        self.clf = ExtraTreesClassifier(n_estimators=10, max_depth=None,
            min_samples_split=2, random_state=0)

        self.scores = cross_val_score(self.clf, self.X, self.y, cv=5)
        self.next(self.choose_model)

    @step
    def dt_model(self):
        """
        build decision tree classifier
        """
        from sklearn.tree import DecisionTreeClassifier
        from sklearn.model_selection import cross_val_score
        
        self.clf = DecisionTreeClassifier(max_depth=None, min_samples_split=2,
            random_state=0)

        self.scores = cross_val_score(self.clf, self.X, self.y, cv=5)

        self.next(self.choose_model)
                        
    @step
    def choose_model(self, inputs):
        """
        find 'best' model
        """
        import numpy as np

        def score(inp):
            return inp.clf,\
                   np.mean(inp.scores)

            
        self.results = sorted(map(score, inputs), key=lambda x: -x[1]) 
        self.model = self.results[0][0]
        self.next(self.end)
        
    @step
    def end(self):
        """
        End of flow, yo!
        """
        print('Scores:')
        print('\n'.join('%s %f' % res for res in self.results))


if __name__ == "__main__":
    ClassificationFlow()

Overwriting ../flows/local/tree_branch_flow.py


Execute the above from the command line with

```bash
python flows/local/tree_branch_flow.py run
```

In [11]:
! python ../flows/local/tree_branch_flow.py run

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mClassificationFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-03-16 17:51:38.796 [0m[1mWorkflow starting (run-id 7238):[0m
[35m2022-03-16 17:51:44.758 [0m[32m[7238/start/135778 (pid 19734)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:52:24.645 [0m[32m[7238/start/135778 (pid 19734)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:52:28.484 [0m[32m[7238/rf_model/135780 (pid 19751)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:52:31.303 [0m[32m[7238/xt_model/135781 (pid 19755)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:52:34.573 [0m[32m[7238/dt_model/135782 (pid 19759)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:52:47.416 

We can also view the Metaflow card:

In [13]:
! python ../flows/local/tree_branch_flow.py card view start

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mClassificationFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[32m[22mResolving card: ClassificationFlow/7238/start/135778[K[0m[32m[22m[0m


### Boosted Trees

In this section, we'll turn the xgboost example above into a flow.

_Note:_ you've done this _almost_ trivially below, really to show yourself that you can get xgboost working with MF, HBA. Make a slightly stronger example.

In [18]:
%%writefile ../flows/local/boosted_flow.py

from metaflow import FlowSpec, step, Parameter, JSONType, IncludeFile, card




class BSTFlow(FlowSpec):
    """
    train a boosted tree
    """
    @card
    @step
    def start(self):
        """
        Load the data & train model
        """
        import xgboost as xgb
        # from io import StringIO
        # read in data
        dtrain = xgb.DMatrix('../data/agaricus.txt.train')
        #dtest = xgb.DMatrix('data/agaricus.txt.test')

                # specify parameters
        param = {'max_depth':2, 'eta':1, 'objective':'binary:logistic' }
        num_round = 2
        bst = xgb.train(param, dtrain, num_round)
        bst.save_model("model.json")
        self.next(self.predict)
        

        
        
    @step
    def predict(self):
        """
        make predictions
        """
        import xgboost as xgb

        dtest = xgb.DMatrix('../data/agaricus.txt.test')
        # make prediction
        bst = xgb.Booster()
        bst.load_model("model.json")
        preds = bst.predict(dtest)
        self.next(self.end)
        
        
    @step
    def end(self):
        """
        End of flow, yo!
        """
        print("ClassificationFlow is all done.")


if __name__ == "__main__":
    BSTFlow()

Overwriting ../flows/local/boosted_flow.py


Execute the above from the command line with

```bash
python flows/local/boosted_flow.py run
```

In [19]:
! python ../flows/local/boosted_flow.py run

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mBSTFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-03-16 17:57:34.426 [0m[1mWorkflow starting (run-id 7241):[0m
[35m2022-03-16 17:57:40.317 [0m[32m[7241/start/135791 (pid 19867)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:58:19.118 [0m[32m[7241/start/135791 (pid 19867)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:58:23.216 [0m[32m[7241/predict/135792 (pid 19877)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:58:39.779 [0m[32m[7241/predict/135792 (pid 19877)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 17:58:43.670 [0m[32m[7241/end/135793 (pid 19883)] [0m[1mTask is starting.[0m
[35m2022-03-16 17:58:50.741 [0m[32

We can also view the Metaflow card:

In [20]:
! python ../flows/local/boosted_flow.py card view start

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mBSTFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[32m[22mResolving card: BSTFlow/7241/start/135791[K[0m[32m[22m[0m


### Deep Learning

In this section, we'll turn the deep learning example above into a flow.

In [22]:
%%writefile ../flows/local/NN_flow.py

from metaflow import FlowSpec, step, Parameter, JSONType, IncludeFile, card
from taxi_modules import init, MODELS, MODEL_LIBRARIES
import json


class NNFlow(FlowSpec):
    """
    train a NN
    """
    @card
    @step
    def start(self):
        """
        Load the data
        """
        from tensorflow import keras

        # the data, split between train and test sets
        (self.x_train, self.y_train), (self.x_test, self.y_test) = keras.datasets.mnist.load_data()
        self.next(self.wrangle)
        
    @step
    def wrangle(self):
        """
        massage data
        """
        import numpy as np
        from tensorflow import keras
        # Model / data parameters
        self.num_classes = 10
        self.input_shape = (28, 28, 1)

        # Scale images to the [0, 1] range
        self.x_train = self.x_train.astype("float32") / 255
        self.x_test = self.x_test.astype("float32") / 255
        # Make sure images have shape (28, 28, 1)
        self.x_train = np.expand_dims(self.x_train, -1)
        self.x_test = np.expand_dims(self.x_test, -1)

        # convert class vectors to binary class matrices
        self.y_train = keras.utils.to_categorical(self.y_train, self.num_classes)
        self.y_test = keras.utils.to_categorical(self.y_test, self.num_classes)
        
        self.next(self.build_model)


    @step
    def build_model(self):
        """
        build NN model
        """
        import tempfile
        import numpy as np
        import tensorflow as tf
        from tensorflow import keras
        from tensorflow.keras import layers

        model = keras.Sequential(
            [
                keras.Input(shape=self.input_shape),
                layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Flatten(),
                layers.Dropout(0.5),
                layers.Dense(self.num_classes, activation="softmax"),
            ]
        )
        model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
        with tempfile.NamedTemporaryFile() as f:
            tf.keras.models.save_model(model, f.name, save_format='h5')
            self.model = f.read()
        self.next(self.train)

        
        
    @step
    def train(self):
        """
        Train the model
        """
        import tempfile
        import tensorflow as tf
        self.batch_size = 128
        self.epochs = 15
        
        with tempfile.NamedTemporaryFile() as f:
            f.write(self.model)
            f.flush()
            model =  tf.keras.models.load_model(f.name)
        model.fit(self.x_train, self.y_train, batch_size=self.batch_size, epochs=self.epochs, validation_split=0.1)
        
        self.next(self.end)
        
        
    @step
    def end(self):
        """
        End of flow, yo!
        """
        print("ClassificationFlow is all done.")


if __name__ == "__main__":
    NNFlow()

Overwriting ../flows/local/NN_flow.py


Execute the above from the command line with

```bash
python flows/local/NN_flow.py run
```

In [23]:
! python ../flows/local/NN_flow.py run

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mNNFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-03-16 18:00:02.732 [0m[1mWorkflow starting (run-id 7242):[0m
[35m2022-03-16 18:00:08.649 [0m[32m[7242/start/135795 (pid 19920)] [0m[1mTask is starting.[0m
[35m2022-03-16 18:01:00.526 [0m[32m[7242/start/135795 (pid 19920)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 18:01:04.483 [0m[32m[7242/wrangle/135796 (pid 20002)] [0m[1mTask is starting.[0m
[35m2022-03-16 18:01:31.343 [0m[32m[7242/wrangle/135796 (pid 20002)] [0m[1mTask finished successfully.[0m
[35m2022-03-16 18:01:35.159 [0m[32m[7242/build_model/135800 (pid 20030)] [0m[1mTask is starting.[0m
[35m2022-03-16 18:01:44.362 

We can also view the Metaflow card:

In [25]:
! python ../flows/local/NN_flow.py card view start

[35m[1mMetaflow 2.5.0[0m[35m[22m executing [0m[31m[1mNNFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hba[0m[35m[22m[K[0m[35m[22m[0m
[32m[22mResolving card: NNFlow/7242/start/135795[K[0m[32m[22m[0m
