# Homework 3

The goal of this homework is to familiarize users with workflow orchestration. We start from the solution of homework 1. The notebook can be found below:

https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/01-intro/homework.ipynb

This has already been converted to a script called `homework.py` in the `03-orchestration` folder of this repo. 

You will use the FHV dataset like in homework 1.

In [1]:
!python --version

Python 3.10.4


In [2]:
!pip list | grep prefect

prefect                       2.0b5


## Q1. Converting the script to a Prefect flow

We want to bring this to workflow orchestration to add observability around it. The `main` function will be converted to a `flow` and the other functions will be `tasks`. After adding all of the decorators, there is actually one task that you will need to call `.result()` for inside the `flow` to get it to work. Which task is this?

* `read_data`
* `prepare_features`
* `train_model`
* `run_model`

Important: change all `print` statements to use the Prefect logger. Using the `print` statement will not appear in the Prefect UI. You have to call `get_run_logger` at the start of the task to use it.

In [3]:
!tail homework_olegtaratuhin.py

    df_train_processed = prepare_features(df_train, categorical)

    df_val = read_data(val_path)
    df_val_processed = prepare_features(df_val, categorical, False)

    # train the model
    lr, dv = train_model(df_train_processed, categorical).result()
    run_model(df_val_processed, categorical, dv, lr)

main()


## Q2. Parameterizing the flow

Right now there are two parameters for `main()` called `train_path` and `val_path`. We want to change the flow function to accept `date` instead. `date` should then be passed to a task that gives both the `train_path` and `val_path` to use.

It should look like this:

```python
@flow
def main(date=None):
    train_path, val_path = get_paths(date).result()
    # rest of flow below

main(date="2021-03-15")
```

The flow will take in a parameter called `date` which will be a datetime.
    a. `date` should default to None
    b. If `date` is None, use the current day. Use the data from 2 months back as the training data and the data from the previous month as validation data.
    c. If a `date` value is supplied, get 2 months before the `date` as the training data, and the previous month as validation data.
    d. As a concrete example, if the date passed is "2021-03-15", the training data should be "fhv_tripdata_2021-01.parquet" and the validation file will be "fhv_trip_data_2021-02.parquet"

What is the validation MSE when running the flow with this date?

Note you need to download the relevant files to run. Part of this question is understanding which files the flow should be looking for.

The valition MSE is:

* 11.637
* 11.837
* 12.037
* 12.237

In [7]:
!cd .. && python 03-orchestration/homework_olegtaratuhin.py 2>&1 | grep MSE

17:47:14.622 | INFO    | prefect - The MSE of training is: 11.789353642873099
17:47:17.081 | INFO    | prefect - The MSE of validation is: 11.637028658288816


## Q3. Saving the model and artifacts

* Save the model as "model-{date}.pkl" where date is in `YYYY-MM-DD`. Note that `date` here is the value of the flow `parameter`. In practice, this setup makes it very easy to get the latest model to run predictions because you just need to get the most recent one.
* In this example we use a DictVectorizer. That is needed to run future data through our model. Save that as "dv-{date}.pkl". Similar to above, if the date is `2021-03-15`, the files output should be `model-2021-03-15.bin` and `dv-2021-03-15.b`.

By using this file name, during inference, we can just pull the latest model from our model directory and apply it. Assuming we already had a list of filenames:

```python
['model-2021-03-15.bin', 'model-2021-04-15.bin', 'model-2021-05-15.bin']
```

We could do something like `sorted(model_list, reverse=False)[0]` to get the filename of the latest file. This is the simplest way to consistently use the latest trained model for inference. Tools like MLFlow give us more control logic to use flows.

What is the file size of the `DictVectorizer` that we trained when the `date` is 2021-08-15?

* 13,000 bytes 
* 23,000 bytes 
* 33,000 bytes 
* 43,000 bytes 


In [24]:
import os
os.path.getsize("../models/dv-2021-08-15.pkl")

13191

## Q4. Creating a deployment with a CronSchedule

For this exercise, use a `CronSchedule` when creating a Prefect deployment.

What is the Cron expression to run a flow at 9 AM every 15th of the month?

* `* * 15 9 0`
* `9 15 * * *`
* `0 9 15 * *`
* `0 15 9 1 *`

Hint: there are many Cron to English tools. Try looking for one to help you.

Create a deployment with `prefect deployment create` after you write your `DeploymentSpec`


In [27]:
def main():
    pass


import prefect
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
from prefect.orion.schemas.schedules import CronSchedule

DeploymentSpec(
    flow=prefect.flow(main),
    name="hw-03",
    flow_runner=SubprocessFlowRunner(),
    schedule=CronSchedule(cron="0 9 15 * *"),
    tags=["ml"],
)

DeploymentSpec(name='hw-03', flow=<prefect.flows.Flow object at 0x105159cf0>, flow_name=None, flow_location=None, flow_storage=None, parameters=None, schedule=CronSchedule(cron='0 9 15 * *', timezone=None, day_or=True), tags=['ml'], flow_runner=SubprocessFlowRunner(typename='subprocess', env={}, stream_output=True, condaenv=None, virtualenv=None))

## Q5. Viewing the Deployment 

View the deployment in the UI. When first loading, we may not see that many flows because the default filter is 1 day back and 1 day forward. Remove the filter for 1 day forward to see the scheduled runs. 

How many flow runs are scheduled by Prefect in advanced? You should not be counting manually. There is a number of upcoming runs on the top right of the dashboard.

* 0
* 3
* 10
* 25

In [29]:
!echo 3

3


## Q6. Creating a work-queue

In order to run this flow, you will need an agent and a work queue. Because we scheduled our flow or every month, it won't really get picked up by an agent. For this exercise, create a work-queue from the UI and view it using the CLI. 

For all CLI commands with Prefect, you can use `--help` to get more information. 

For example,
* `prefect --help`
* `prefect work-queue --help`

What is the command to view the available work-queues?

* `prefect work-queue inspect`
* `prefect work-queue ls`
* `prefect work-queue preview`
* `prefect work-queue list`

In [30]:
!prefect work-queue ls

[3m                            Work Queues                             [0m
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃[1m [0m[1m                                  ID[0m[1m [0m┃[1m [0m[1mName [0m[1m [0m┃[1m [0m[1mConcurrency Limit[0m[1m [0m┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│[36m [0m[36m522f6e1c-e0b8-4d2f-bc1b-6cd0ff3cc07d[0m[36m [0m│[32m [0m[32mlocal[0m[32m [0m│[34m [0m[34mNone[0m[34m             [0m[34m [0m│
└──────────────────────────────────────┴───────┴───────────────────┘
[31m                    (**) denotes a paused queue                     [0m
