## Background

We are going to build a model that does classifies customer reviews as positive or negative sentiment, using the [Women's E-Commerce Clothing Reviews Dataset](https://www.kaggle.com/datasets/nicapotato/womens-ecommerce-clothing-reviews). Here is what the data looks like:

In [1]:
import pandas as pd
df = pd.read_parquet('train.parquet')
print(f'num of rows: {df.shape[0]}')

num of rows: 20377


In [2]:
df.head()

Unnamed: 0,labels,review
0,0,Odd fit: I wanted to love this sweater but the...
1,1,Very comfy dress: The quality and material of ...
2,0,Fits nicely but fabric a bit thin: I ordered t...
3,1,"Great fit: Love these jeans, fit and style... ..."
4,0,"Stretches out, washes poorly. wish i could ret..."


 We will walk you through how we would organize this task in Metaflow.  Concretely, we will demonstrate the following steps:

1. **Read data from a parquet file** in the `start` step.
2. **Show a branching workflow to record a baseline and train a model in parallel**  in the `baseline` and `train` steps.
3. **Evaluate The Model** in the `join` step:
    - on a holdout set and compare against the baseline
    - do a smaoke test
4. **If the model passes those it is tagged as a `deployment_candidate`** in the `end` step.

## Constructing The Metaflow Flow

In [3]:
%%writefile flow.py

from metaflow import FlowSpec, step, Flow, current

class NLPFlow(FlowSpec):
        
    @step
    def start(self):
        "Read the data"
        import pandas as pd
        self.df = pd.read_parquet('train.parquet')
        print(f'num of rows: {self.df.shape[0]}')
        self.next(self.baseline, self.train)

    @step
    def baseline(self):
        "Compute the baseline"
        from sklearn.metrics import accuracy_score, roc_auc_score
        baseline_predictions = [1] * self.df.shape[0]
        self.base_acc = accuracy_score(self.df.labels, baseline_predictions)
        self.base_rocauc = roc_auc_score(self.df.labels, baseline_predictions)
        self.next(self.join)

    @step
    def train(self):
        "Train the model"
        from model import Nbow_Model
        model = Nbow_Model(vocab_sz=750)
        model.fit(X=self.df['review'], y=self.df['labels'])
        self.model_dict = model.model_dict #save model
        self.next(self.join)
        
    @step
    def join(self, inputs):
        "Compare the model results with the baseline."
        import pandas as pd
        from model import Nbow_Model
        self.model_dict = inputs.train.model_dict
        self.train_df = inputs.train.df
        self.holdout_df = pd.read_parquet('holdout.parquet')
        model = Nbow_Model.from_dict(self.model_dict)
        
        self.model_acc = model.eval_acc(X=self.holdout_df['review'], labels=self.holdout_df['labels'])
        self.model_rocauc = model.eval_rocauc(X=self.holdout_df['review'], labels=self.holdout_df['labels'])
        
        print(f'Baseline Acccuracy: {inputs.baseline.base_acc:.2%}')
        print(f'Baseline AUC: {inputs.baseline.base_rocauc:.2}')
        print(f'Model Acccuracy: {self.model_acc:.2%}')
        print(f'Model AUC: {self.model_rocauc:.2}')
        self.beats_baseline = self.model_rocauc > inputs.baseline.base_rocauc
        print(f'Model beats baseline (T/F): {self.beats_baseline}')
        
        #smoke test to make sure model is doing the right thing on obvious examples.
        _tst_reviews = ["poor fit its baggy in places where it isn't supposed to be.",
                        "love it, very high quality and great value"]
        _tst_preds = model.predict(_tst_reviews)
        self.passed_smoke_test = _tst_preds[0][0] < .5 and _tst_preds[1][0] > .5
        print(f'Model passed smoke test (T/F): {self.passed_smoke_test}')
        self.next(self.end)
        
    @step
    def end(self):
        "Tags model as a deployment candidate if it beats the baseline and passes smoke tests."
        if self.beats_baseline and self.passed_smoke_test:
            run = Flow(current.flow_name)[current.run_id]
            run.add_tag('deployment_candidate')
        

if __name__ == '__main__':
    NLPFlow()

Overwriting flow.py


In [4]:
#|eval: false
!python flow.py --no-pylint run 

[35m[1mMetaflow 2.7.1[0m[35m[22m executing [0m[31m[1mNLPFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hamel[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m2022-07-20 13:19:07.534 [0m[1mWorkflow starting (run-id 1658348347529456):[0m
[35m2022-07-20 13:19:07.544 [0m[32m[1658348347529456/start/1 (pid 39061)] [0m[1mTask is starting.[0m
[35m2022-07-20 13:19:08.462 [0m[32m[1658348347529456/start/1 (pid 39061)] [0m[22mnum of rows: 20377[0m
[35m2022-07-20 13:19:08.569 [0m[32m[1658348347529456/start/1 (pid 39061)] [0m[1mTask finished successfully.[0m
[35m2022-07-20 13:19:08.579 [0m[32m[1658348347529456/baseline/2 (pid 39066)] [0m[1mTask is starting.[0m
[35m2022-07-20 13:19:08.588 [0m[32m[1658348347529456/train/3 (pid 39067)] [0m[1mTask is starting.[0m
[35m2022-07-20 13:19:09.945 [0m[32m[1658348347529456/baseline/2 (pid 39066)] [0m[1mTask f

## Using The Model In Production

After you have trained a model in Metaflow, you may want to utilize this model to make predictions or for futher testing.  There are two common patterns for this: (1) Retrieve the model from Metaflow in an external system (2) Have another flow that does predictions.  We illustrate both examples here:

### 1. Retrieve Model From Metaflow To Use In External Systems

You can now retrieve the model tagged as a `deployment_candidate` outside Metaflow, so you can use this in whatever downstream application you want, or even just for ad-hoc testing:

In [5]:
from metaflow import Flow
import pandas as pd
from model import Nbow_Model

In [6]:
predict_df = pd.read_parquet('predict.parquet')

In [7]:
def get_latest_successful_run(flow_nm, tag):
    "Gets the latest successfull run for a flow with a specific tag."
    for r in Flow(flow_nm).runs(tag):
        if r.successful: return r

In [8]:
run = get_latest_successful_run('NLPFlow', 'deployment_candidate')
model = Nbow_Model.from_dict(run.data.model_dict)

2022-07-20 13:19:24.702377: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [9]:
preds = model.predict(predict_df['review'])
preds

array([[0.9995303 ],
       [0.9741467 ],
       [0.9999521 ],
       ...,
       [0.9997336 ],
       [0.99905205],
       [0.70124   ]], dtype=float32)

You can write these predictions to a parquet file like so:

In [10]:
import pyarrow as pa
pa_tbl = pa.table({"data": preds.squeeze()})
pa.parquet.write_table(pa_tbl, "sentiment_predictions.parquet")

### 2. Use a Flow To Make Predictions

You may want to do batch predictions in a Flow as well:

In [12]:
%%writefile predflow.py

from metaflow import FlowSpec, step, Flow, current

class NLPredictionFlow(FlowSpec):
    
    def get_latest_successful_run(self, flow_nm, tag):
        "Gets the latest successfull run for a flow with a specific tag."
        for r in Flow(flow_nm).runs(tag):
            if r.successful: return r
        
    @step
    def start(self):
        "Get the latest deployment candidate that is from a successfull run"
        self.deploy_run = self.get_latest_successful_run('NLPFlow', 'deployment_candidate')
        self.next(self.end)
    
    @step
    def end(self):
        "Make predictions"
        from model import Nbow_Model
        import pandas as pd
        import pyarrow as pa
        new_reviews = pd.read_parquet('predict.parquet')['review']
        
        # Make predictions
        model = Nbow_Model.from_dict(self.deploy_run.data.model_dict)
        predictions = model.predict(new_reviews)
        print(f'Writing predictions to parquet: {predictions.shape[0]:,} rows')
        pa_tbl = pa.table({"data": predictions.squeeze()})
        pa.parquet.write_table(pa_tbl, "sentiment_predictions.parquet")
        
if __name__ == '__main__':
    NLPredictionFlow()

Overwriting predflow.py


In [13]:
! python predflow.py run

[35m[1mMetaflow 2.7.1[0m[35m[22m executing [0m[31m[1mNLPredictionFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hamel[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-07-20 13:19:26.426 [0m[1mWorkflow starting (run-id 1658348366422276):[0m
[35m2022-07-20 13:19:26.436 [0m[32m[1658348366422276/start/1 (pid 39093)] [0m[1mTask is starting.[0m
[35m2022-07-20 13:19:27.239 [0m[32m[1658348366422276/start/1 (pid 39093)] [0m[1mTask finished successfully.[0m
[35m2022-07-20 13:19:27.249 [0m[32m[1658348366422276/end/2 (pid 39097)] [0m[1mTask is starting.[0m
[35m2022-07-20 13:19:29.761 [0m[32m[1658348366422276/end/2 (pid 39097)] [0m[22m2022-07-20 13:19:29.761103: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with o

### Further Discussion

This is a very simple example that will also run on your laptop.  However, for production use cases you may want to use [@conda](https://docs.metaflow.org/metaflow/dependencies#managing-dependencies-with-conda-decorator) for dependency management, [@batch](https://docs.metaflow.org/v/r/metaflow/scaling#using-aws-batch) or [@kubernetes](https://docs.metaflow.org/metaflow/scaling-out-and-up/effortless-scaling-with-kubernetes) for remote execution, and [@schedule](https://docs.metaflow.org/going-to-production-with-metaflow/scheduling-metaflow-flows/scheduling-with-aws-step-functions#scheduling-a-flow) to schedule jobs to run periodically.  