# Q1. Converting the script to a Prefect flow

After adding all of the decorators, there is actually one task that you will need to call .result() for inside the flow to get it to work. Which task is this?

Answer: **train_model**

# Q2. Parameterizing the flow

The validation MSE is:

Answer: **11.637**

# Q3. Saving the model and artifacts

What is the file size of the DictVectorizer that we trained when the date is 2021-08-15?

Answer: **13,000 bytes**

# Q4. Creating a deployment with a CronSchedule

What is the Cron expression to run a flow at 9 AM every 15th of the month?

Answer: **0915xx**

# Q5. Viewing the Deployment

How many flow runs are scheduled by Prefect in advance? 

Answer: **3**

# Q6. Creating a work-queue

What is the command to view the available work-queues?

Answer: **prefect work-queue preview -h 3000 59472d4c-4703-48cb-9653-b8af9309c52b**

# Code

In [None]:
import pandas as pd

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import joblib
from prefect import task, flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import CronSchedule
from prefect.flow_runners import SubprocessFlowRunner
from datetime import timedelta
from prefect.task_runners import SequentialTaskRunner


@task
def read_data(path):
    logger = get_run_logger('read_data')
    df = pd.read_parquet(path)
    return df

@task
def prepare_features(df, categorical, train=True):
    logger = get_run_logger()
    df['duration'] = df.dropOff_datetime - df.pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60
    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    mean_duration = df.duration.mean()
    if train:
        print(f"The mean duration of training is {mean_duration}")
    else:
        print(f"The mean duration of validation is {mean_duration}")
    
    df[categorical] = df[categorical].fillna(-1).astype('int').astype('str')
    return df

@task
def train_model(df, categorical):
    logger = get_run_logger('train_model')
    train_dicts = df[categorical].to_dict(orient='records')
    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts) 
    y_train = df.duration.values

    print(f"The shape of X_train is {X_train.shape}")
    print(f"The DictVectorizer has {len(dv.feature_names_)} features")

    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred = lr.predict(X_train)
    mse = mean_squared_error(y_train, y_pred, squared=False)
    print(f"The MSE of training is: {mse}")
    return lr, dv

@task
def run_model(df, categorical, dv, lr):
    logger = get_run_logger('run_model')
    val_dicts = df[categorical].to_dict(orient='records')
    X_val = dv.transform(val_dicts) 
    y_pred = lr.predict(X_val)
    y_val = df.duration.values

    mse = mean_squared_error(y_val, y_pred, squared=False)
    print(f"The MSE of validation is: {mse}")
    return

@task
def get_paths(date=None):
    logger = get_run_logger('get_paths')
    if date is None:
        date = pd.Timestamp.now()
    date_train = (pd.Timestamp(date) - pd.Timedelta(days=60)).strftime('%Y-%m')
    date_val = (pd.Timestamp(date) - pd.Timedelta(days=30)).strftime('%Y-%m')
    train_path = f"../data/fhv_tripdata_{date_train}.parquet"
    val_path = f"../data/fhv_tripdata_{date_val}.parquet"
    return train_path, val_path



@flow(task_runner=SequentialTaskRunner())
def main(date=None):

    train_path, val_path = get_paths(date).result()
    categorical = ['PUlocationID', 'DOlocationID']

    df_train = read_data(train_path)
    df_train_processed = prepare_features(df_train, categorical)

    df_val = read_data(val_path)
    df_val_processed = prepare_features(df_val, categorical, False)

    # train the model
    lr, dv = train_model(df_train_processed, categorical).result()
    date2 = pd.Timestamp(date).strftime('%Y-%m-%d')

    joblib.dump(lr, 'model-'+date2+'.bin')
    joblib.dump(dv, 'dv-'+date2+'.b')
    run_model(df_val_processed, categorical, dv, lr)


DeploymentSpec(
    flow=main,
    name='model_training',
    schedule=CronSchedule(cron='0 9 15 * *'),
    flow_runner=SubprocessFlowRunner())