
The goal of this homework is to create a simple training pipeline, use mlflow to track experiments and register best model, but use Mage for it.

We'll use [the same NYC taxi dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page), the **Yellow** taxi data for March, 2023. 

## Question 1. Select the Tool

You can use the same tool you used when completing the module,
or choose a different one for your homework.

What's the name of the orchestrator you chose? 

In [None]:
Airflow

## Question 2. Version

What's the version of the orchestrator? 

  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.0}

## Question 3. Creating a pipeline

Let's read the March 2023 Yellow taxi trips data.

How many records did we load? 

- 3,003,766
- 3,203,766
- 3,403,766
- 3,603,766

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
from io import BytesIO
import pandas as pd

def ingest_files(**context):
    url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet'

    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(response.text)
    df = pd.read_parquet(BytesIO(response.content))
    df.to_parquet('/tmp/yellow_taxi_2023_03.parquet')
    print(f"Saved to /tmp/yellow_taxi_2023_03.parquet, total records: {len(df)}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='download_yellow_taxi_data',
    default_args=default_args,
    schedule=None,  # Use 'schedule' instead of 'schedule_interval'
    catchup=False,
    tags=['example'],
) as dag:

    download_task = PythonOperator(
        task_id='download_and_save_yellow_taxi',
        python_callable=ingest_files,
    )

In [None]:
[2025-05-27, 15:05:33] INFO - Saved to /tmp/yellow_taxi_2023_03.parquet, total records: 3403766: chan="stdout": source="task"


## Question 4. Data preparation

Let's continue with pipeline creation.

We will use the same logic for preparing the data we used previously. 

This is what we used (adjusted for yellow dataset):

```python
def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df
```

Let's apply to the data we loaded in question 3. 

What's the size of the result? 

- 2,903,766
- 3,103,766
- 3,316,216 
- 3,503,766

In [None]:
def process_and_print_size(**context):
    df = read_dataframe('/tmp/yellow_taxi_2023_03.parquet')
    print(f"Filtered DataFrame size: {len(df)}")

with DAG(
    dag_id='download_yellow_taxi_data',
    default_args=default_args,
    schedule=None,  # Use 'schedule' instead of 'schedule_interval'
    catchup=False,
    tags=['example'],
) as dag:

    download_task = PythonOperator(
        task_id='download_and_save_yellow_taxi',
        python_callable=ingest_files,
    )

    process_task = PythonOperator(
        task_id='process_and_print_size',
        python_callable=process_and_print_size,
    )

    download_task >> process_task


In [None]:
[2025-05-27, 15:13:18] INFO - Filtered DataFrame size: 3316216: chan="stdout": source="task"

## Question 5. Train a model

We will now train a linear regression model using the same code as in homework 1.

* Fit a dict vectorizer.
* Train a linear regression with default parameters.
* Use pick up and drop off locations separately, don't create a combination feature.

Let's now use it in the pipeline. We will need to create another transformation block, and return both the dict vectorizer and the model.

What's the intercept of the model? 

Hint: print the `intercept_` field in the code block

- 21.77
- 24.77
- 27.77
- 31.77

In [None]:
def train_model(**context):
    df = read_dataframe('/tmp/yellow_taxi_2023_03.parquet')

    categorical = ['PULocationID', 'DOLocationID']
    # Ensure columns are string type before converting to dicts
    df[categorical] = df[categorical].astype(str)
    train_dicts = df[categorical].to_dict(orient='records')

    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts)

    target = 'duration'
    y_train = df[target].values

    lr = LinearRegression()
    lr.fit(X_train, y_train)

    print(f"Model intercept: {lr.intercept_}")

    # In Airflow 3.x0, return values are automatically XCom pushed
    # return dv, lr

train_model_task = PythonOperator(
    task_id='train_linear_regression_model',
    python_callable=train_model,
)

download_task >> process_task >> train_model_task

[2025-05-27, 16:15:28] INFO - Model intercept: 24.778964270944773: chan="stdout": source="task"

## Question 6. Register the model 

The model is trained, so let's save it with MLFlow.

Find the logged model, and find MLModel file. What's the size of the model? (`model_size_bytes` field):

* 14,534
* 9,534
* 4,534
* 1,534

In [None]:
FROM python:3.10-slim

RUN pip install mlflow==2.12.1

EXPOSE 5000

CMD [ \
    "mlflow", "server", \
    "--backend-store-uri", "sqlite:///home/mlflow_data/mlflow.db", \
    "--host", "0.0.0.0", \
    "--port", "5000" \
]

In [None]:
  mlflow:
    build:
      context: .
      dockerfile: mlflow.dockerfile
    ports:
      - "5001:5000"
    volumes:
      - "${PWD}/mlflow_data:/home/mlflow_data/"
    networks:
      - app-network

In [None]:
def register_latest_model(**context):
    # Get the run_id from the previous task's XCom
    ti = context['ti']
    run_id = ti.xcom_pull(task_ids='train_linear_regression_model')

    # Ensure the tracking URI is set for this task
    mlflow.set_tracking_uri("http://mlflow:5000")

    if not run_id:
        raise ValueError("Could not retrieve MLflow run_id from previous task.")

    client = MlflowClient()

    # The model was logged with artifact_path='model' in train_model task
    model_uri = f"runs:/{run_id}/model"

    # Register the model. If the model name doesn't exist, it will be created.
    model_name = "nyc-taxi-lr-model"
    try:
        model_details = mlflow.register_model(model_uri, name=model_name)
        print(f"Successfully registered model '{model_name}' version {model_details.version}")

        # Get model size
        run_id_from_details = model_details.run_id
        artifact_path = "model" # This is the artifact_path used in mlflow.sklearn.log_model
        total_size_bytes = 0
        try:
            artifacts = client.list_artifacts(run_id_from_details, artifact_path)
            for artifact in artifacts:
                if artifact.is_dir:
                    # If it's a directory, list its contents and sum their sizes
                    sub_artifacts = client.list_artifacts(run_id_from_details, f"{artifact_path}/{artifact.path}")
                    for sub_artifact in sub_artifacts:
                         if sub_artifact.file_size is not None:
                            total_size_bytes += sub_artifact.file_size
                elif artifact.file_size is not None:
                    total_size_bytes += artifact.file_size

            print(f"Registered model size: {total_size_bytes} bytes")

        except Exception as size_e:
            print(f"Could not retrieve model size: {size_e}")

    except Exception as e:
        print(f"Error registering model: {e}")

with DAG(
    dag_id='download_yellow_taxi_data',
    default_args=default_args,
    schedule=None,  # Use 'schedule' instead of 'schedule_interval'
    catchup=False,
    tags=['example'],
) as dag:

    download_task = PythonOperator(
        task_id='download_and_save_yellow_taxi',
        python_callable=ingest_files,
    )

    process_task = PythonOperator(
        task_id='process_and_print_size',
        python_callable=process_and_print_size,
    )

    train_model_task = PythonOperator(
        task_id='train_linear_regression_model',
        python_callable=train_model,
    )

    register_model_task = PythonOperator(
        task_id='register_latest_model',
        python_callable=register_latest_model,
    )

    download_task >> process_task >> train_model_task >> register_model_task

[2025-05-28, 11:53:23] INFO - Successfully registered model 'nyc-taxi-lr-model' version 1: chan="stdout": source="task"
[2025-05-28, 11:53:23] INFO - Registered model size: 18692 bytes: chan="stdout": source="task"