## Question 1. Select the Tool

I chose prefect for its flexibility and scalability

## Question 2. Version

In [1]:
!prefect version

Version:             3.4.4
API version:         0.8.4
Python version:      3.9.21
Git commit:          0367d7aa
Built:               Thu, May 29, 2025 09:37 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.11.5
Server:
  Database:          sqlite
  SQLite version:    3.45.3


In [2]:
!prefect server start

[32mSwitched to profile 'local'[0m

 ___ ___ ___ ___ ___ ___ _____
| _ \ _ \ __| __| __/ __|_   _|
|  _/   / _|| _|| _| (__  | |
|_| |_|_\___|_| |___\___| |_|

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

View the API reference documentation at http://127.0.0.1:4200/docs

Check out the dashboard at http://127.0.0.1:4200



^C


Run a sample code from prefect's "getting started" to check if everything's working

In [3]:
from prefect import flow, task
import random

@task
def get_customer_ids() -> list[str]:
    # Fetch customer IDs from a database or API
    return [f"customer{n}" for n in random.choices(range(100), k=10)]

@task
def process_customer(customer_id: str) -> str:
    # Process a single customer
    return f"Processed {customer_id}"

@flow
def main() -> list[str]:
    customer_ids = get_customer_ids()
    # Map the process_customer task across all customer IDs
    results = process_customer.map(customer_ids)
    return results


if __name__ == "__main__":
    main()

In [5]:
from prefect import flow, task
import random

@task
def get_customer_ids() -> list[str]:
    # Fetch customer IDs from a database or API
    return [f"customer{n}" for n in random.choices(range(100), k=10)]

@task
def process_customer(customer_id: str) -> str:
    # Process a single customer
    return f"Processed {customer_id}"

@flow
def main() -> list[str]:
    customer_ids = get_customer_ids()
    # Map the process_customer task across all customer IDs
    results = process_customer.map(customer_ids)
    return results


if __name__ == "__main__":
    main.serve(name="my-first-deployment")

RuntimeError: This event loop is already running

In [7]:
!prefect deployment run 'main/my-first-deployment'

23:15:58.449 | [36mINFO[0m    | prefect - Starting temporary server on [94mhttp://127.0.0.1:8859[0m
See [94mhttps://docs.prefect.io/3.0/manage/self-host#self-host-a-prefect-server[0m for more information on running a dedicated Prefect server.
[31mDeployment 'main/my-first-deployment' not found![0m
23:16:01.885 | [36mINFO[0m    | prefect - Stopping temporary server on [94mhttp://127.0.0.1:8859[0m


## Question 3. Creating a pipeline

In [4]:
import pandas as pd

In [5]:
def read_data(month='03'):
    filename = f'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-{month}.parquet'
    df = pd.read_parquet(filename)
    print(f"We loaded {len(df)} records for month {month} of 2023")
    return filename

In [8]:
filename = read_data()

We loaded 3403766 records for month 03 of 2023


## Question 4. Data preparation

In [12]:
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds()/60
    df = df[(df['duration'] >= 1) & (df['duration'] <=60)]
    df[['PULocationID', 'DOLocationID']] = df[['PULocationID', 'DOLocationID']].astype(str)
    
    return df

In [13]:
df = read_dataframe(filename)
print(f"The size of the dataframe after initial preprocessing is {len(df)}")

The size of the dataframe after initial preprocessing is 3316216


## Question 5. Train a model

In [15]:
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression

In [22]:
import mlflow
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient

import os
from pathlib import Path

In [23]:
def transform_data(df, v=None):
    df_dict_list = df.loc[:,['PULocationID', 'DOLocationID']].to_dict('records')
    if (v == None):
        v = DictVectorizer()
        X = v.fit_transform(df_dict_list)
    else:
        X = v.transform(df_dict_list)
    y = df['duration']

    # models_folder = Path('models')
    # models_folder.mkdir(exist_ok=True)
    
    # # Save DictVectorizer and datasets
    # dump_pickle(v, os.path.join(models_folder, "dv.pkl"))
    # dump_pickle((X, y), os.path.join(models_folder, "train.pkl"))
    return X, y, v

In [37]:
def train_data(X, y, v):

    mlflow.set_tracking_uri("http://127.0.0.1:5000")
    mlflow.set_experiment("nyc-taxi-experiment")
    with mlflow.start_run():
        mlflow.set_tag("model", "LinearRegressor")

        reg = LinearRegression().fit(X, y)
        
        print(f"The intercept of the model is {reg.intercept_:.2f}")
        
        mlflow.log_param("intercept", reg.intercept_)
        mlflow.sklearn.log_model(reg, artifact_path="artifacts_local")
    return v, reg

In [38]:
X_train, y_train, v= transform_data(df)
v, reg = train_data(X_train, y_train, v)



The intercept of the model is 24.77




🏃 View run dazzling-goose-518 at: http://127.0.0.1:5000/#/experiments/1/runs/5295b0e991cf412ead3270f96daa9b5a
🧪 View experiment at: http://127.0.0.1:5000/#/experiments/1


## Question 6. Register the model

In [41]:
def register_model():

    client = MlflowClient()

    # Retrieve the top_n model runs and log the models
    experiment = client.get_experiment_by_name("nyc-taxi-experiment")

    last_run = mlflow.last_active_run().info.run_id

    # Register the best model
    mlflow.register_model(
        model_uri=f"runs:/{last_run}/models",
        name="linear-regressor"
    )
    return f"runs:/{last_run}/models"

In [42]:
model_uri = register_model()

Registered model 'linear-regressor' already exists. Creating a new version of this model...
2025/06/09 18:10:54 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: linear-regressor, version 3
Created version '3' of model 'linear-regressor'.


From the artifacts in registered model, model_size_bytes: 4501

Now I transform this to a python script to run with prefect