# Welcome to Full Stack Machine Learning's Week 4 Project!

In the final week, you will return to the workflow you built last week on the [taxi dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page). 

## Task 1: Deploy the champion
Use what you have learned in the last two weeks to make necessary modifications and to deploy your latest version of the `TaxiFarePrediction` flow to Argo. Use `--branch champion` to denote this deployment as the champion model.

In [26]:
%%writefile ./taxi_fare_predictor_champ.py
from metaflow import FlowSpec, step, card, conda_base, current, Parameter, Flow, trigger, retry, catch, timeout, project
from metaflow.cards import Markdown, Table, Image, Artifact

URL = "https://outerbounds-datasets.s3.us-west-2.amazonaws.com/taxi/latest.parquet"
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"

@project(name="taxi_fare_prediction")
@trigger(events=["s3"])
@conda_base(
    libraries={
        "pandas": "1.4.2",
        "pyarrow": "11.0.0",
        "scikit-learn": "1.1.2",
    }
)
class TaxiFarePrediction(FlowSpec):
    data_url = Parameter("data_url", default=URL)

    def transform_features(self, df):
        # TODO:
        # Try to complete tasks 2 and 3 with this function doing nothing like it currently is.
        # Understand what is happening.
        # Revisit task 1 and think about what might go in this function.
        obviously_bad_data_filters = [
            "fare_amount > 0",  # fare_amount in US Dollars
            "trip_distance <= 100",  # trip_distance in miles
            "trip_distance > 0",
            "passenger_count > 0",
            "tpep_pickup_datetime <= tpep_dropoff_datetime",
            "total_amount > 0",
        ]
        df = df.query(" & ".join(obviously_bad_data_filters))
        
        print(df.shape)
        return df

    @retry(times=3)
    @card(type="corise")
    @step
    def start(self):
        import pandas as pd
        from sklearn.model_selection import train_test_split

        df_ = pd.read_parquet(self.data_url)
        missing = df_.isna().sum(axis=0).to_frame()

        self.df = self.transform_features(df_)
        current.card.append(Markdown("# Taxi Fare Incoming Data"))
        current.card.append(Markdown(f" DataFrame incoming shape:{df_.shape}"))
        current.card.append(
            Table.from_dataframe(missing)
        )
        current.card.append(Markdown(f" Clean DataFrame shape:{self.df.shape}"))
        # NOTE: we are split into training and validation set in the validation step which uses cross_val_score.
        # This is a simple/naive way to do this, and is meant to keep this example simple, to focus learning on deploying Metaflow flows.
        # In practice, you want split time series data in more sophisticated ways and run backtests.
        self.X = self.df["trip_distance"].values.reshape(-1, 1)
        self.y = self.df["total_amount"].values
        self.next(self.linear_model)

    @step
    def linear_model(self):
        "Fit a single variable, linear model to the data."
        from sklearn.linear_model import LinearRegression
        from sklearn.pipeline import make_pipeline
        from sklearn.preprocessing import StandardScaler

        # TODO: Play around with the model if you are feeling it.
        reg = LinearRegression()
        self.model_name = type(reg).__name__
        
        self.model = make_pipeline(
            StandardScaler(),
            reg
        )

        self.next(self.validate)

    def gather_sibling_flow_run_results(self):
        # storage to populate and feed to a Table in a Metaflow card
        rows = []

        # loop through runs of this flow
        for run in Flow(self.__class__.__name__):
            if run.id != current.run_id:
                if run.successful:
                    icon = "✅"
                    msg = "OK"
                    score = str(run.data.scores.mean())
                else:
                    icon = "❌"
                    msg = "Error"
                    score = "NA"
                    for step in run:
                        for task in step:
                            if not task.successful:
                                msg = task.stderr
                row = [
                    Markdown(icon),
                    Artifact(run.id),
                    Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                    Artifact(score),
                    Markdown(msg),
                ]
                rows.append(row)
            else:
                rows.append(
                    [
                        Markdown("✅"),
                        Artifact(run.id),
                        Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                        Artifact(str(self.scores.mean())),
                        Markdown("This run..."),
                    ]
                )
        return rows

    @timeout(minutes=10)
    @card(type="corise")
    @step
    def validate(self):
        from sklearn.model_selection import cross_val_score

        self.scores = cross_val_score(self.model, self.X, self.y, cv=5)
        current.card.append(Markdown("# Taxi Fare Prediction Results"))
        current.card.append(
            Table(
                self.gather_sibling_flow_run_results(),
                headers=["Pass/fail", "Run ID", "Created At", "R^2 score", "Stderr"],
            )
        )
        self.next(self.end)

    @step
    def end(self):
        print("Success!")


if __name__ == "__main__":
    TaxiFarePrediction()

Overwriting ./taxi_fare_predictor_champ.py


In [27]:
! python taxi_fare_predictor_champ.py --environment=conda --production --branch champion --production argo-workflows create

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.champion[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mDeploying [0m[31m[1mtaxifareprediction.prod.champion.taxifareprediction[0m[1m to Argo Workflows...[K[0m[1m[0m
[22m[K[0m[22m[0m
[22mThe namespace of this production flow is[K[0m[22m[0m
[32m[22m    production:mfprj-ovzw7jjg7psagpyw-0-pcke[K[0m[32m[22m[0m
[22mTo analyze results of this production flow add this line in your notebooks:[K[0m[22m[0m
[32m[22m    namespace("production:mfprj-ovzw7jjg7psagpyw-0-pcke")[K[0m[32m[22m[0m
[2

In [28]:
! python taxi_fare_predictor_champ.py --environment=conda --production --branch champion --production argo-workflows trigger

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.champion[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mWorkflow [0m[31m[1mtaxifareprediction.prod.champion.taxifareprediction[0m[1m triggered on Argo Workflows (run-id [0m[31m[1margo-taxifareprediction.prod.champion.taxifareprediction-bc9t5[0m[1m).[K[0m[1m[0m
[1mSee the run in the UI at https://ui-pw-2049723668.outerbounds.dev/TaxiFarePrediction/argo-taxifareprediction.prod.champion.taxifareprediction-bc9t5[K[0m[1m[0m


## Task 2: Build the challenger
Develop a second model, by using the same `TaxiFarePrediction` architecture. Then, deploy the flow to Argo as the `--branch challenger`. 
<br>
<br>
Hint: Modify the `linear_model` step. 
<br>
Bonus: Write a paragraph summary of how you developed the second model and tested it before deploying the challenger flow. Let us know in Slack what you found challenging about the task? 

In [29]:
%%writefile ./taxi_fare_predictor_challenger.py
from metaflow import FlowSpec, step, card, conda_base, current, Parameter, Flow, trigger, retry, catch, timeout, project
from metaflow.cards import Markdown, Table, Image, Artifact

URL = "https://outerbounds-datasets.s3.us-west-2.amazonaws.com/taxi/latest.parquet"
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"

@project(name="taxi_fare_prediction")
@trigger(events=["s3"])
@conda_base(
    libraries={
        "pandas": "1.4.2",
        "pyarrow": "11.0.0",
        "scikit-learn": "1.1.2",
    }
)
class TaxiFarePrediction(FlowSpec):
    data_url = Parameter("data_url", default=URL)

    def transform_features(self, df):
        # TODO:
        # Try to complete tasks 2 and 3 with this function doing nothing like it currently is.
        # Understand what is happening.
        # Revisit task 1 and think about what might go in this function.
        obviously_bad_data_filters = [
            "fare_amount > 0",  # fare_amount in US Dollars
            "trip_distance <= 100",  # trip_distance in miles
            "trip_distance > 0",
            "passenger_count > 0",
            "tpep_pickup_datetime <= tpep_dropoff_datetime",
            "total_amount > 0",
        ]
        df = df.query(" & ".join(obviously_bad_data_filters))
        
        print(df.shape)
        return df

    @retry(times=3)
    @card(type="corise")
    @step
    def start(self):
        import pandas as pd
        from sklearn.model_selection import train_test_split

        df_ = pd.read_parquet(self.data_url)
        missing = df_.isna().sum(axis=0).to_frame()

        self.df = self.transform_features(df_)
        current.card.append(Markdown("# Taxi Fare Incoming Data"))
        current.card.append(Markdown(f" DataFrame incoming shape:{df_.shape}"))
        current.card.append(
            Table.from_dataframe(missing)
        )
        current.card.append(Markdown(f" Clean DataFrame shape:{self.df.shape}"))
        # NOTE: we are split into training and validation set in the validation step which uses cross_val_score.
        # This is a simple/naive way to do this, and is meant to keep this example simple, to focus learning on deploying Metaflow flows.
        # In practice, you want split time series data in more sophisticated ways and run backtests.
        self.X = self.df["trip_distance"].values.reshape(-1, 1)
        self.y = self.df["total_amount"].values
        self.next(self.linear_model)

    @step
    def linear_model(self):
        "Fit a single variable, linear model to the data."
        from sklearn.ensemble import RandomForestRegressor

        # TODO: Play around with the model if you are feeling it.
        self.model = RandomForestRegressor(n_estimators=150, max_depth=7)
        self.model_name = type(self.model).__name__
        self.next(self.validate)

    def gather_sibling_flow_run_results(self):
        # storage to populate and feed to a Table in a Metaflow card
        rows = []

        # loop through runs of this flow
        for run in Flow(self.__class__.__name__):
            if run.id != current.run_id:
                if run.successful:
                    icon = "✅"
                    msg = "OK"
                    score = str(run.data.scores.mean())
                else:
                    icon = "❌"
                    msg = "Error"
                    score = "NA"
                    for step in run:
                        for task in step:
                            if not task.successful:
                                msg = task.stderr
                row = [
                    Markdown(icon),
                    Artifact(run.id),
                    Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                    Artifact(score),
                    Markdown(msg),
                ]
                rows.append(row)
            else:
                rows.append(
                    [
                        Markdown("✅"),
                        Artifact(run.id),
                        Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                        Artifact(str(self.scores.mean())),
                        Markdown("This run..."),
                    ]
                )
        return rows

    @timeout(minutes=10)
    @card(type="corise")
    @step
    def validate(self):
        from sklearn.model_selection import cross_val_score

        self.scores = cross_val_score(self.model, self.X, self.y, cv=5)
        current.card.append(Markdown("# Taxi Fare Prediction Results"))
        current.card.append(
            Table(
                self.gather_sibling_flow_run_results(),
                headers=["Pass/fail", "Run ID", "Created At", "R^2 score", "Stderr"],
            )
        )
        self.next(self.end)

    @step
    def end(self):
        print("Success!")


if __name__ == "__main__":
    TaxiFarePrediction()

Overwriting ./taxi_fare_predictor_challenger.py


In [30]:
! python taxi_fare_predictor_challenger.py --environment=conda --production --branch challenger --production argo-workflows create

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.challenger[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mDeploying [0m[31m[1mtaxifareprediction.prod.challenger.taxifareprediction[0m[1m to Argo Workflows...[K[0m[1m[0m
[22m[K[0m[22m[0m
[22mThe namespace of this production flow is[K[0m[22m[0m
[32m[22m    production:mfprj-6iffsxtybx6fkjku-0-crtx[K[0m[32m[22m[0m
[22mTo analyze results of this production flow add this line in your notebooks:[K[0m[22m[0m
[32m[22m    namespace("production:mfprj-6iffsxtybx6fkjku-0-crtx")[K[0m[32m[22m[0m

In [31]:
! python taxi_fare_predictor_challenger.py --environment=conda --production --branch challenger --production argo-workflows trigger

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.challenger[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mWorkflow [0m[31m[1mtaxifareprediction.prod.challenger.taxifareprediction[0m[1m triggered on Argo Workflows (run-id [0m[31m[1margo-taxifareprediction.prod.challenger.taxifareprediction-xttk2[0m[1m).[K[0m[1m[0m
[1mSee the run in the UI at https://ui-pw-2049723668.outerbounds.dev/TaxiFarePrediction/argo-taxifareprediction.prod.challenger.taxifareprediction-xttk2[K[0m[1m[0m


## Task 3: Analyze the results
Return to this notebook, and read in the results of the challenger and champion flow using the Metaflow Client API.
<br><br>

#### Questions
- Does your model perform better on the metrics you selected? 
- Think about your day job, how would you go about assessing whether to roll forward the production "champion" to your new model? 
    - What gives you confidence one model is better than another?
    - What kinds of information do you need to monitor to get buy-in from stakeholders that model A is preferable to model B?  

Response 

- The challnger model did outperform the champion by an additional improvement of 1% on the champion's score.  
- We can evaluate and make a decision based on the scores collected from cross validation, in case of complex models or deployments we could also perform an A/B test on "live" data to make sure the gains transfer to the actual business. Furthermore, in certain type of applications the size and type of model used might matter as that could impact how quickly predictions can be served; this could impact bounce rates and conversion rates so it is important to consider this factors.  
- we need performance on static train/test sets, performance on live data from an A/B test, and most importantly an evaluation of the impact of the models on the most important business metrics and customer satisfaction if available and/or relevant.

In [33]:
import pandas as pd
from metaflow import Flow, namespace

flow_name = "TaxiFarePrediction"
champ_ns = "production:mfprj-ovzw7jjg7psagpyw-0-pcke" 
chall_ns = "production:mfprj-6iffsxtybx6fkjku-0-crtx"

scores = pd.DataFrame(columns=['model', 'r2', 'namespace'])
for n in [champ_ns, chall_ns]:
    
    run = Flow(flow_name).latest_successful_run
    scores = pd.concat([scores, 
                        pd.DataFrame({"model": run.data.model_name,
                        "r2": run.data.scores,
                        "namespace": n})],
                        ignore_index=True)



In [34]:
scores

Unnamed: 0,model,r2,namespace,ns_id
0,RandomForestRegressor,0.905729,production:mfprj-ovzw7jjg7psagpyw-0-pcke,production:mfprj-ovzw7jjg7psagpyw-0-pcke
1,RandomForestRegressor,0.907567,production:mfprj-ovzw7jjg7psagpyw-0-pcke,production:mfprj-ovzw7jjg7psagpyw-0-pcke
2,RandomForestRegressor,0.906306,production:mfprj-ovzw7jjg7psagpyw-0-pcke,production:mfprj-ovzw7jjg7psagpyw-0-pcke
3,RandomForestRegressor,0.912338,production:mfprj-ovzw7jjg7psagpyw-0-pcke,production:mfprj-ovzw7jjg7psagpyw-0-pcke
4,RandomForestRegressor,0.902354,production:mfprj-ovzw7jjg7psagpyw-0-pcke,production:mfprj-ovzw7jjg7psagpyw-0-pcke
5,LinearRegression,0.895114,production:mfprj-6iffsxtybx6fkjku-0-crtx,production:mfprj-6iffsxtybx6fkjku-0-crtx
6,LinearRegression,0.89703,production:mfprj-6iffsxtybx6fkjku-0-crtx,production:mfprj-6iffsxtybx6fkjku-0-crtx
7,LinearRegression,0.897572,production:mfprj-6iffsxtybx6fkjku-0-crtx,production:mfprj-6iffsxtybx6fkjku-0-crtx
8,LinearRegression,0.89921,production:mfprj-6iffsxtybx6fkjku-0-crtx,production:mfprj-6iffsxtybx6fkjku-0-crtx
9,LinearRegression,0.889593,production:mfprj-6iffsxtybx6fkjku-0-crtx,production:mfprj-6iffsxtybx6fkjku-0-crtx


In [41]:
scores.groupby(['namespace', 'model']).agg(mean_score = ('r2', 'mean'), std_score = ('r2', 'std'))

Unnamed: 0_level_0,Unnamed: 1_level_0,mean_score,std_score
namespace,model,Unnamed: 2_level_1,Unnamed: 3_level_1
production:mfprj-6iffsxtybx6fkjku-0-crtx,LinearRegression,0.895704,0.003716
production:mfprj-ovzw7jjg7psagpyw-0-pcke,RandomForestRegressor,0.906859,0.003619


## CONGRATULATIONS! 🎉✨🍾
If you made it this far, you have completed the Full Stack Machine Learning Corise course. 
We are so glad that you chose to learn with us, and hope to see you again in future courses. Stay tuned for more content and come join us in [Slack](http://slack.outerbounds.co/) to keep learning about Metaflow!