# Welcome to Full Stack Machine Learning's Week 4 Project!

In the final week, you will return to the workflow you built last week on the [taxi dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page). 

## Task 1: Deploy the champion
Use what you have learned in the last two weeks to make necessary modifications and to deploy your latest version of the `TaxiFarePrediction` flow to Argo. Use `--branch champion` to denote this deployment as the champion model.

In [16]:
%%writefile ../flows/cloud/event_triggered_linear_regression.py
from metaflow import FlowSpec, step, card, conda_base, project, current, Parameter, Flow, trigger
from metaflow.cards import Markdown, Table, Image, Artifact

URL = "https://outerbounds-datasets.s3.us-west-2.amazonaws.com/taxi/latest.parquet"
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"


@trigger(events=["s3"])
@conda_base(
    libraries={
        "pandas": "1.4.2",
        "pyarrow": "11.0.0",
        "numpy": "1.22.0",
        "scikit-learn": "1.1.2",
    }
)
@project(name="taxi_fare_prediction")
class TaxiFarePrediction(FlowSpec):
    data_url = Parameter("data_url", default=URL)

    def transform_features(self, df):
        # TODO:
        # Try to complete tasks 2 and 3 with this function doing nothing like it currently is.
        # Understand what is happening.
        # Revisit task 1 and think about what might go in this function.
        obviously_bad_data_filters = [
        df.fare_amount > 0,  # fare_amount in US Dollars
        df.trip_distance <= 100,  # trip_distance in miles
        df.trip_distance > 0,
        # TODO: add some logic to filter out what you decide is bad data!
        # TIP: Don't spend too much time on this step for this project though, it practice it is a never-ending process.
        df.airport_fee > 0,
        df.congestion_surcharge > 0,
        df.total_amount > 0,
        df.tolls_amount > 0,
        df.tip_amount > 0,
        df.mta_tax > 0,
        df.extra > 0
        ]

        for f in obviously_bad_data_filters:
            df = df[f]

        df.dropna()

        Q1 = df['trip_distance'].quantile(0.25)
        Q3 = df['trip_distance'].quantile(0.75)
        IQR = Q3 - Q1

        # Define lower and upper bounds
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        # Filter the DataFrame to keep rows within the IQR
        df = df[(df['trip_distance'] >= lower_bound) & (df['trip_distance'] <= upper_bound)]
        
        return df

    @step
    def start(self):
        import pandas as pd
        from sklearn.model_selection import train_test_split

        self.df = self.transform_features(pd.read_parquet(self.data_url))

        # NOTE: we are split into training and validation set in the validation step which uses cross_val_score.
        # This is a simple/naive way to do this, and is meant to keep this example simple, to focus learning on deploying Metaflow flows.
        # In practice, you want split time series data in more sophisticated ways and run backtests.
        self.X = self.df["trip_distance"].values.reshape(-1, 1)
        self.y = self.df["total_amount"].values
        self.next(self.linear_model)

    @step
    def linear_model(self):
        "Fit a single variable, linear model to the data."
        from sklearn.linear_model import LinearRegression

        # TODO: Play around with the model if you are feeling it.
        self.model = LinearRegression()

        self.next(self.validate)

    def gather_sibling_flow_run_results(self):
        # storage to populate and feed to a Table in a Metaflow card
        rows = []

        # loop through runs of this flow
        for run in Flow(self.__class__.__name__):
            if run.id != current.run_id:
                if run.successful:
                    icon = "✅"
                    msg = "OK"
                    score = str(run.data.scores.mean())
                else:
                    icon = "❌"
                    msg = "Error"
                    score = "NA"
                    for step in run:
                        for task in step:
                            if not task.successful:
                                msg = task.stderr
                row = [
                    Markdown(icon),
                    Artifact(run.id),
                    Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                    Artifact(score),
                    Markdown(msg),
                ]
                rows.append(row)
            else:
                rows.append(
                    [
                        Markdown("✅"),
                        Artifact(run.id),
                        Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                        Artifact(str(self.scores.mean())),
                        Markdown("This run..."),
                    ]
                )
        return rows

    @card(type="corise")
    @step
    def validate(self):
        from sklearn.model_selection import cross_val_score

        self.scores = cross_val_score(self.model, self.X, self.y, cv=5)
        current.card.append(Markdown("# Taxi Fare Prediction Results"))
        current.card.append(
            Table(
                self.gather_sibling_flow_run_results(),
                headers=["Pass/fail", "Run ID", "Created At", "R^2 score", "Stderr"],
            )
        )
        self.next(self.end)

    @step
    def end(self):
        print("Success!")


if __name__ == "__main__":
    TaxiFarePrediction()

Overwriting ../flows/cloud/event_triggered_linear_regression.py


In [13]:
! python ../flows/cloud/event_triggered_linear_regression.py --environment=conda --production --branch champion --production --with retry argo-workflows create 

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.champion[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mDeploying [0m[31m[1mtaxifareprediction.prod.champion.taxifareprediction[0m[1m to Argo Workflows...[K[0m[1m[0m
[22mIt seems this is the first time you are deploying [0m[31m[1mtaxifareprediction.prod.champion.taxifareprediction[0m[22m to Argo Workflows.[K[0m[22m[0m
[22m[K[0m[22m[0m
[22mA new production token generated.[K[0m[22m[0m
[22m[K[0m[22m[0m
[22mThe namespace of this production flow is[K[0m[22m[0m
[32m[22m    production

In [14]:
# ! python ../flows/cloud/event_triggered_linear_regression.py --environment=conda argo-workflows trigger
! python ../flows/cloud/event_triggered_linear_regression.py --environment=conda --production --branch champion --production argo-workflows trigger

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.champion[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mWorkflow [0m[31m[1mtaxifareprediction.prod.champion.taxifareprediction[0m[1m triggered on Argo Workflows (run-id [0m[31m[1margo-taxifareprediction.prod.champion.taxifareprediction-zsmzl[0m[1m).[K[0m[1m[0m
[1mSee the run in the UI at https://ui-pw-1309721101.outerbounds.dev/TaxiFarePrediction/argo-taxifareprediction.prod.champion.taxifareprediction-zsmzl[K[0m[1m[0m


## Task 2: Build the challenger
Develop a second model, by using the same `TaxiFarePrediction` architecture. Then, deploy the flow to Argo as the `--branch challenger`. 
<br>
<br>
Hint: Modify the `linear_model` step. 
<br>
Bonus: Write a paragraph summary of how you developed the second model and tested it before deploying the challenger flow. Let us know in Slack what you found challenging about the task? 

In [45]:
%%writefile ../flows/cloud/event_triggered_xgboost_regression.py
from metaflow import FlowSpec, step, card, conda_base, project, current, Parameter, Flow, trigger
from metaflow.cards import Markdown, Table, Image, Artifact

URL = "https://outerbounds-datasets.s3.us-west-2.amazonaws.com/taxi/latest.parquet"
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"


@trigger(events=["s3"])
@conda_base(libraries={"conda-forge::xgboost": '1.5.1', "conda-forge::scikit-learn": '1.1.2', "conda-forge::pandas": '1.4.2', 
"conda-forge::pyarrow": '14.0.1.'})
@project(name="taxi_fare_prediction")
class TaxiFarePrediction(FlowSpec):
    data_url = Parameter("data_url", default=URL)

    def transform_features(self, df):
        # TODO:
        # Try to complete tasks 2 and 3 with this function doing nothing like it currently is.
        # Understand what is happening.
        # Revisit task 1 and think about what might go in this function.
        obviously_bad_data_filters = [
        df.fare_amount > 0,  # fare_amount in US Dollars
        df.trip_distance <= 100,  # trip_distance in miles
        df.trip_distance > 0,
        # TODO: add some logic to filter out what you decide is bad data!
        # TIP: Don't spend too much time on this step for this project though, it practice it is a never-ending process.
        df.airport_fee > 0,
        df.congestion_surcharge > 0,
        df.total_amount > 0,
        df.tolls_amount > 0,
        df.tip_amount > 0,
        df.mta_tax > 0,
        df.extra > 0
        ]

        for f in obviously_bad_data_filters:
            df = df[f]

        df.dropna()

        Q1 = df['trip_distance'].quantile(0.25)
        Q3 = df['trip_distance'].quantile(0.75)
        IQR = Q3 - Q1

        # Define lower and upper bounds
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        # Filter the DataFrame to keep rows within the IQR
        df = df[(df['trip_distance'] >= lower_bound) & (df['trip_distance'] <= upper_bound)]
        
        return df

    @step
    def start(self):
        import pandas as pd
        from sklearn.model_selection import train_test_split

        self.df = self.transform_features(pd.read_parquet(self.data_url))

        # NOTE: we are split into training and validation set in the validation step which uses cross_val_score.
        # This is a simple/naive way to do this, and is meant to keep this example simple, to focus learning on deploying Metaflow flows.
        # In practice, you want split time series data in more sophisticated ways and run backtests.
        self.X = self.df["trip_distance"].values.reshape(-1, 1)
        self.y = self.df["total_amount"].values
        self.next(self.xgboost_model)

    @step
    def xgboost_model(self):
        "Fit a single variable, xgboost model to the data."
        import xgboost
        from sklearn.model_selection import cross_val_score
        from sklearn.model_selection import RepeatedKFold
        from xgboost import XGBRegressor

        # TODO: Play around with the model if you are feeling it.
        self.model = XGBRegressor()

        self.next(self.validate)

    def gather_sibling_flow_run_results(self):
        # storage to populate and feed to a Table in a Metaflow card
        rows = []

        # loop through runs of this flow
        for run in Flow(self.__class__.__name__):
            if run.id != current.run_id:
                if run.successful:
                    icon = "✅"
                    msg = "OK"
                    score = str(run.data.scores.mean())
                else:
                    icon = "❌"
                    msg = "Error"
                    score = "NA"
                    for step in run:
                        for task in step:
                            if not task.successful:
                                msg = task.stderr
                row = [
                    Markdown(icon),
                    Artifact(run.id),
                    Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                    Artifact(score),
                    Markdown(msg),
                ]
                rows.append(row)
            else:
                rows.append(
                    [
                        Markdown("✅"),
                        Artifact(run.id),
                        Artifact(run.created_at.strftime(DATETIME_FORMAT)),
                        Artifact(str(self.scores.mean())),
                        Markdown("This run..."),
                    ]
                )
        return rows

    @card(type="corise")
    @step
    def validate(self):
        from numpy import absolute
        from sklearn.model_selection import cross_val_score
        from sklearn.model_selection import RepeatedKFold

        # define model evaluation method
        cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1)
        # evaluate model
        self.scores = cross_val_score(self.model, self.X, self.y, scoring='r2', cv=cv, n_jobs=-1)
        
        current.card.append(Markdown("# Taxi Fare Prediction Results"))
        current.card.append(
            Table(
                self.gather_sibling_flow_run_results(),
                headers=["Pass/fail", "Run ID", "Created At", "R^2 score", "Stderr"],
            )
        )
        self.next(self.end)

    @step
    def end(self):
        print("Success!")


if __name__ == "__main__":
    TaxiFarePrediction()

Overwriting ../flows/cloud/event_triggered_xgboost_regression.py


In [46]:
! python ../flows/cloud/event_triggered_xgboost_regression.py --environment=conda --production --branch challenger --production --with retry argo-workflows create 

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.challenger[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mDeploying [0m[31m[1mtaxifareprediction.prod.challenger.taxifareprediction[0m[1m to Argo Workflows...[K[0m[1m[0m
[22m[K[0m[22m[0m
[22mThe namespace of this production flow is[K[0m[22m[0m
[32m[22m    production:mfprj-6iffsxtybx6fkjku-0-crtx[K[0m[32m[22m[0m
[22mTo analyze results of this production flow add this line in your notebooks:[K[0m[22m[0m
[32m[22m    namespace("production:mfprj-6iffsxtybx6fkjku-0-crtx")[K[0m[32m[22m[0m

In [47]:
# ! python ../flows/cloud/event_triggered_xgboost_regression.py --environment=conda argo-workflows trigger
! python ../flows/cloud/event_triggered_xgboost_regression.py --environment=conda --production --branch challenger --production argo-workflows trigger

[35m[1mMetaflow 2.10.6+ob(v1)[0m[35m[22m executing [0m[31m[1mTaxiFarePrediction[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:sandbox[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mProject: [0m[32m[1mtaxi_fare_prediction[0m[35m[22m, Branch: [0m[32m[1mprod.challenger[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[1mWorkflow [0m[31m[1mtaxifareprediction.prod.challenger.taxifareprediction[0m[1m triggered on Argo Workflows (run-id [0m[31m[1margo-taxifareprediction.prod.challenger.taxifareprediction-4nq67[0m[1m).[K[0m[1m[0m
[1mSee the run in the UI at https://ui-pw-1309721101.outerbounds.dev/TaxiFarePrediction/argo-taxifareprediction.prod.challenger.taxifareprediction-4nq67[K[0m[1m[0m


## Task 3: Analyze the results
Return to this notebook, and read in the results of the challenger and champion flow using the Metaflow Client API.
<br><br>

#### Questions
- Does your model perform better on the metrics you selected? Yes
- Think about your day job, how would you go about assessing whether to roll forward the production "champion" to your new model? 
    - What gives you confidence one model is better than another?

        **1. Performance Metrics:** Compare performance metrics such as mean squared error and r2 between the champion and the new model. A consistent improvement in these metrics suggests the superiority of the new model.

        **2. Validation on Holdout Data:** Evaluate both models on a holdout dataset that neither has seen during training. This helps ensure that the models generalize well to unseen data and are not overfitting.

        **3. Cross-Validation Results:** Perform cross-validation to assess the models' robustness and generalization across different subsets of the data. If the new model consistently outperforms the champion in various folds, it indicates a more stable performance.

        **4. Feature Importance Analysis:** Examine feature importance to understand which features contribute most to the model's predictions. If the new model captures more relevant patterns in the data, it may be a better choice.

        **5. Bias and Fairness Assessment:** Evaluate the models for biases and fairness, especially if the new model incorporates changes in features or data. Ensuring fairness is crucial, and improvements in this aspect can be a strong argument for deploying the new model.

    - What kinds of information do you need to monitor to get buy-in from stakeholders that model A is preferable to model B? 

        **1. Explanation of Model Changes:** Clearly articulate the changes made in the new model and how these changes address limitations or improve upon the champion model.

        **2. Business Impact:** Quantify and communicate the expected business impact of deploying the new model. This could include improvements in cost savings, revenue generation, or customer satisfaction.

        **3. Risk Analysis:** Assess and communicate the risks associated with deploying the new model, including potential downsides and mitigations. Transparency about uncertainties is key.

        **4. Comparison Visualizations:** Create visualizations that effectively communicate the performance differences between the two models. Graphs, charts, and comparison tables can make complex information more digestible for stakeholders.

        **5. User Feedback:** If applicable, gather feedback from end-users or domain experts who interact with the model. Positive feedback on the new model's predictions can be a compelling argument. 

In [55]:
from metaflow import Flow, namespace

# these values are unique to your deployment!
CHAMPION_MODEL_NAMESPACE = "production:mfprj-ovzw7jjg7psagpyw-0-pcke" # "production:mfprj-xsfdb3gtsiboqyrd-0-vqsy"
CHALLENGER_MODEL_NAMESPACE = "production:mfprj-6iffsxtybx6fkjku-0-crtx" # "production:mfprj-cfyzlfzievjlmsf4-0-tbgz"

best_score = -1; winner = None; winner_namespace = None
for n in [CHAMPION_MODEL_NAMESPACE, CHALLENGER_MODEL_NAMESPACE]:
    namespace(n)
    run = Flow('TaxiFarePrediction').latest_successful_run
    score = run.data.scores.mean()
    if score > best_score:
        best_score = score
        winner = run.data.model
        winner_namespace = n

In [49]:
run = Flow('TaxiFarePrediction').latest_successful_run
run.data

<MetaflowData: scores, model, X, df, y, data_url, name>

In [56]:
print("The winner is the {} model, with r2 of {}. You can find the model in the flow deployed to the {} namespace.".format(winner, round(best_score, 2), winner_namespace))

The winner is the XGBRegressor(base_score=None, booster=None, colsample_bylevel=None,
             colsample_bynode=None, colsample_bytree=None,
             enable_categorical=False, gamma=None, gpu_id=None,
             importance_type=None, interaction_constraints=None,
             learning_rate=None, max_delta_step=None, max_depth=None,
             min_child_weight=None, missing=nan, monotone_constraints=None,
             n_estimators=100, n_jobs=None, num_parallel_tree=None,
             predictor=None, random_state=None, reg_alpha=None, reg_lambda=None,
             scale_pos_weight=None, subsample=None, tree_method=None,
             validate_parameters=None, verbosity=None) model, with r2 of 0.86. You can find the model in the flow deployed to the production:mfprj-6iffsxtybx6fkjku-0-crtx namespace.


## CONGRATULATIONS! 🎉✨🍾
If you made it this far, you have completed the Full Stack Machine Learning Corise course. 
We are so glad that you chose to learn with us, and hope to see you again in future courses. Stay tuned for more content and come join us in [Slack](http://slack.outerbounds.co/) to keep learning about Metaflow!