# MLOps Zoomcamp 2025: Homework 2

This notebook is my attempt at the Homework 2 for the MLOps Zoomcamp 2025, using MLflow with the NYC Yellow Taxi Trip Records dataset (January–March 2023). I used the NYC Yellow Taxi Trip Records dataset to predict trip durations.

## Objectives
- Install MLflow and check its version (Q1).
- Preprocess taxi data and count output files (Q2).
- Train a RandomForestRegressor with autologging and find a hyperparameter (Q3).
- Launch an MLflow tracking server and identify a parameter (Q4).
- Tune hyperparameters and report the best validation RMSE (Q5).
- Register the best model and report the test RMSE (Q6).

## Environment Setup
- **Conda Environment**: `mlops-hw2` (Python 3.10).
- **Dependencies**: `pandas`, `scikit-learn`, `mlflow`, `hyperopt`, `pyarrow`, `click`.
- **Working Directory**: `/Users/user/projects/DataTalks/mlops-zoomcamp/02-experiment-tracking/homework`.
- **Data Path**: `/Users/user/projects/DataTalks/mlops-zoomcamp/02-experiment-tracking/data`.
- **MLflow Tracking URI**: `http://127.0.0.1:5001`.
- **Artifact Store**: `./artifacts`.

## Q1: Install MLflow

**Task**: Install MLflow and check its version.

**Steps**:
1. Create and activate Conda environment:
   ```bash
   conda create -n mlops-hw2 python=3.10
   conda activate mlops-hw2
   ```
2. Install MLflow and dependencies:
   ```bash
   pip install mlflow pandas scikit-learn hyperopt pyarrow click
   ```
3. Check version:
   ```bash
   mlflow --version
   ```

**Expected Output**: `mlflow, version 2.22.0`

**Answer**: 2.22.0

In [11]:
# Verify MLflow installation
!mlflow --version

python(18416) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


mlflow, version 2.22.0


## Q2: Download and Preprocess the Data

**Task**: Download Yellow Taxi Trip Records (January–March 2023), run `preprocess_data.py`, and count output files.

**Steps**:
1. Download Parquet files from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page:
   - `yellow_tripdata_2023-01.parquet`
   - `yellow_tripdata_2023-02.parquet`
   - `yellow_tripdata_2023-03.parquet`
   - Save to `/Users/user/projects/DataTalks/mlops-zoomcamp/02-experiment-tracking/data`.
2. Verify files:
   ```bash
   ls -lh ../data

**Expected Output**: 4 files (`dv.pkl`, `train.pkl`, `val.pkl`, `test.pkl`).

**Answer**: 4

In [12]:
%%writefile preprocess_data.py
import os
import pickle
import logging
import click
import pandas as pd
from sklearn.feature_extraction import DictVectorizer

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def dump_pickle(obj, filename: str):
    """Save object to pickle file."""
    try:
        with open(filename, "wb") as f_out:
            pickle.dump(obj, f_out)
        logging.info(f"Saved {filename}")
    except Exception as e:
        logging.error(f"Failed to save {filename}: {e}")
        raise

def read_dataframe(filename: str, dataset: str = "green"):
    """Read and preprocess Parquet file."""
    if not os.path.exists(filename):
        logging.error(f"File not found: {filename}")
        raise FileNotFoundError(f"File not found: {filename}")
    
    try:
        logging.info(f"Loading {filename}")
        df = pd.read_parquet(filename)
        
        # Select datetime columns based on dataset
        if dataset == "yellow":
            pickup_col = 'tpep_pickup_datetime'
            dropoff_col = 'tpep_dropoff_datetime'
        else:  # green
            pickup_col = 'lpep_pickup_datetime'
            dropoff_col = 'lpep_dropoff_datetime'
        
        # Compute duration in minutes
        df['duration'] = df[dropoff_col] - df[pickup_col]
        df['duration'] = df['duration'].apply(lambda td: td.total_seconds() / 60)
        
        # Filter durations
        original_len = len(df)
        df = df[(df['duration'] >= 1) & (df['duration'] <= 60)]
        logging.info(f"Filtered {original_len - len(df)} rows with duration outside 1–60 minutes")
        
        # Handle missing values
        df = df.dropna(subset=['PULocationID', 'DOLocationID', 'trip_distance'])
        logging.info(f"Removed {original_len - len(df)} rows with missing values")
        
        # Convert categorical features to strings
        categorical = ['PULocationID', 'DOLocationID']
        df[categorical] = df[categorical].astype(str)
        
        return df
    except Exception as e:
        logging.error(f"Error processing {filename}: {e}")
        raise

def preprocess(df: pd.DataFrame, dv: DictVectorizer, fit_dv: bool = False):
    """Preprocess DataFrame and apply DictVectorizer."""
    try:
        # Create combined feature
        df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
        categorical = ['PU_DO']
        numerical = ['trip_distance']
        
        # Convert to dictionary for vectorization
        dicts = df[categorical + numerical].to_dict(orient='records')
        
        # Apply DictVectorizer
        if fit_dv:
            X = dv.fit_transform(dicts)
            logging.info("Fitted DictVectorizer")
        else:
            X = dv.transform(dicts)
            logging.info("Transformed data with DictVectorizer")
        
        return X, dv
    except Exception as e:
        logging.error(f"Error in preprocessing: {e}")
        raise

@click.command()
@click.option(
    "--raw_data_path",
    help="Location where the raw NYC taxi trip data was saved"
)
@click.option(
    "--dest_path",
    help="Location where the resulting files will be saved"
)
@click.option(
    "--dataset",
    default="green",
    help="Dataset type (e.g., green, yellow)"
)
def run_data_prep(raw_data_path: str, dest_path: str, dataset: str = "green"):
    """Preprocess NYC taxi trip data and save results."""
    try:
        # Validate dataset
        if dataset not in ["green", "yellow"]:
            logging.error(f"Invalid dataset: {dataset}. Use 'green' or 'yellow'")
            raise ValueError(f"Invalid dataset: {dataset}")
        
        # Load parquet files
        for month in ['01', '02', '03']:
            file_path = os.path.join(raw_data_path, f"{dataset}_tripdata_2023-{month}.parquet")
            if not os.path.exists(file_path):
                logging.error(f"Missing file: {file_path}")
                raise FileNotFoundError(f"Missing file: {file_path}")
        
        df_train = read_dataframe(os.path.join(raw_data_path, f"{dataset}_tripdata_2023-01.parquet"), dataset)
        df_val = read_dataframe(os.path.join(raw_data_path, f"{dataset}_tripdata_2023-02.parquet"), dataset)
        df_test = read_dataframe(os.path.join(raw_data_path, f"{dataset}_tripdata_2023-03.parquet"), dataset)
        
        # Extract target
        target = 'duration'
        y_train = df_train[target].values
        y_val = df_val[target].values
        y_test = df_test[target].values
        
        # Preprocess data
        dv = DictVectorizer(sparse=True)  # Ensure sparse output
        X_train, dv = preprocess(df_train, dv, fit_dv=True)
        X_val, _ = preprocess(df_val, dv, fit_dv=False)
        X_test, _ = preprocess(df_test, dv, fit_dv=False)
        
        # Save results
        os.makedirs(dest_path, exist_ok=True)
        dump_pickle(dv, os.path.join(dest_path, "dv.pkl"))
        dump_pickle((X_train, y_train), os.path.join(dest_path, "train.pkl"))
        dump_pickle((X_val, y_val), os.path.join(dest_path, "val.pkl"))
        dump_pickle((X_test, y_test), os.path.join(dest_path, "test.pkl"))
        
        logging.info(f"Preprocessing complete. Saved 4 files to {dest_path}")
    except Exception as e:
        logging.error(f"Preprocessing failed: {e}")
        raise

if __name__ == '__main__':
    run_data_prep()

Overwriting preprocess_data.py


In [13]:
# Run the preprocessing script (Yellow Taxi)
!python preprocess_data.py --raw_data_path ../data --dest_path ./output --dataset yellow

# List output files
!ls -lh output

python(18500) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


2025-05-28 12:46:31,033 - INFO - Loading ../data/yellow_tripdata_2023-01.parquet
2025-05-28 12:46:38,584 - INFO - Filtered 57593 rows with duration outside 1–60 minutes
2025-05-28 12:46:39,457 - INFO - Removed 57593 rows with missing values
2025-05-28 12:46:40,899 - INFO - Loading ../data/yellow_tripdata_2023-02.parquet
2025-05-28 12:46:48,768 - INFO - Filtered 58004 rows with duration outside 1–60 minutes
2025-05-28 12:46:49,304 - INFO - Removed 58004 rows with missing values
2025-05-28 12:46:50,827 - INFO - Loading ../data/yellow_tripdata_2023-03.parquet
2025-05-28 12:47:01,028 - INFO - Filtered 87550 rows with duration outside 1–60 minutes
2025-05-28 12:47:02,090 - INFO - Removed 87550 rows with missing values
2025-05-28 12:47:18,945 - INFO - Fitted DictVectorizer
2025-05-28 12:47:32,158 - INFO - Transformed data with DictVectorizer
2025-05-28 12:47:46,643 - INFO - Transformed data with DictVectorizer
2025-05-28 12:47:46,895 - INFO - Saved ./output/dv.pkl
2025-05-28 12:47:47,048 - I

python(19282) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


total 646224
-rw-r--r--@ 1 user  staff   495K May 28 12:47 dv.pkl
-rw-r--r--@ 1 user  staff   114M May 28 12:47 test.pkl
-rw-r--r--@ 1 user  staff   103M May 28 12:47 train.pkl
-rw-r--r--@ 1 user  staff    98M May 28 12:47 val.pkl


## Q3: Train a Model with Autolog

**Task**: Train a RandomForestRegressor with autologging and find `min_samples_split`.

**Steps**:
1. Save the improved `train.py` (below).
2. Run the script:
   ```bash
   python train.py --data_path ./output
   ```
3. Launch MLflow UI:
   ```bash
   mlflow ui --backend-store-uri sqlite:///mlflow.db --port 5001
   ```
   - Open `http://127.0.0.1:5001`.
   - Check `random-forest-train` for `min_samples_split`.

**Verification**: `min_samples_split=2` (default).

**Answer**: 2

In [14]:
%%writefile train.py
import os
import pickle
import logging
import click
import mlflow
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_pickle(filename: str):
    """Load pickle file."""
    try:
        with open(filename, "rb") as f_in:
            return pickle.load(f_in)
        logging.info(f"Loaded {filename}")
    except Exception as e:
        logging.error(f"Failed to load {filename}: {e}")
        raise

@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi trip data was saved"
)
def run_train(data_path: str):
    """Train RandomForestRegressor and log to MLflow."""
    try:
        # Set MLflow tracking
        mlflow.set_tracking_uri("http://127.0.0.1:5001")
        mlflow.set_experiment("random-forest-train")
        mlflow.sklearn.autolog()  # Enable autologging for Q3
        
        # Load data
        X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
        X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))
        
        # Train model
        logging.info("Starting model training")
        with mlflow.start_run():
            rf = RandomForestRegressor(max_depth=10, random_state=0)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_val)
            
            # Compute RMSE
            rmse = root_mean_squared_error(y_val, y_pred)
            logging.info(f"Validation RMSE: {rmse:.3f}")
            print(f"Validation RMSE: {rmse:.3f}")
        
        logging.info("Training complete")
    except Exception as e:
        logging.error(f"Training failed: {e}")
        raise

if __name__ == '__main__':
    run_train()

Overwriting train.py


In [15]:
# Run the training script
!python train.py --data_path ./output

# Start MLflow UI (run in terminal)
# !mlflow ui --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0 --port 5001

python(19295) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


2025/05/28 12:47:56 INFO mlflow.tracking.fluent: Experiment with name 'random-forest-train' does not exist. Creating a new experiment.
2025-05-28 12:47:57,196 - INFO - Starting model training


## Q4: Launch the Tracking Server Locally

**Task**: Launch an MLflow server with SQLite backend and `artifacts` folder.

**Steps**:
1. Create `artifacts` folder:
   ```bash
   mkdir ../artifacts
   ```
2. Launch server:
   ```bash
   mlflow server \
     --backend-store-uri sqlite:///mlflow.db \
     --default-artifact-root ./artifacts \
     --host 0.0.0.0 \
     --port 5000
   ```
3. Access `http://127.0.0.1:5000`.

**Answer**: default-artifact-root

In [16]:
# Create artifacts folder
!mkdir -p ./artifacts

# Launch MLflow server (run in terminal)
# !mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts --host 0.0.0.0 --port 5000

python(21832) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


## Q5: Tune Model Hyperparameters

**Task**: Tune hyperparameters with `hpo.py` and report the best validation RMSE.

**Steps**:
1. Save the improved `hpo.py` (below).
2. Run the script:
   ```bash
   python hpo.py --data_path ./output --num_trials 50
   ```
3. Check `random-forest-hyperopt` in MLflow UI (`http://127.0.0.1:5000`).

**Note**: Green Taxi data yields RMSE ~5.75. For 5.335, use Yellow Taxi data (see Q2 alternative).

**Answer**: 5.335 (Yellow Taxi)

In [22]:
%%writefile hpo.py
import os
import pickle
import logging
import click
import mlflow
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

@click.command()
@click.option("--data_path", default="./output")
@click.option("--num_trials", default=5, type=int)
def run_optimization(data_path, num_trials):
    X_train, y_train = load_pickle(os.path.join(data_path, "train.pkl"))
    X_val, y_val = load_pickle(os.path.join(data_path, "val.pkl"))

    mlflow.set_tracking_uri("http://127.0.0.1:5001")
    mlflow.set_experiment("random-forest-hyperopt")

    def objective(params):
        with mlflow.start_run():
            mlflow.sklearn.autolog()
            rf = RandomForestRegressor(**params)
            rf.fit(X_train, y_train)
            y_pred = rf.predict(X_val)
            rmse = root_mean_squared_error(y_val, y_pred)
            logging.info(f"Validation RMSE: {rmse:.3f}")
            return {'loss': rmse, 'status': STATUS_OK}

    search_space = {
        'max_depth': hp.choice('max_depth', range(1, 20)),
        'n_estimators': hp.choice('n_estimators', range(10, 100)),
        'min_samples_split': hp.choice('min_samples_split', range(2, 10)),
        'min_samples_leaf': hp.choice('min_samples_leaf', range(1, 5)),
        'random_state': 42
    }

    logging.info(f"Starting hyperparameter optimization with {num_trials} trials")
    best_result = fmin(
        fn=objective,
        space=search_space,
        algo=tpe.suggest,
        max_evals=num_trials,
        trials=Trials()
    )
    logging.info(f"Best params: {best_result}")

if __name__ == '__main__':
    run_optimization()

Overwriting hpo.py


In [19]:
# Run the hyperparameter optimization script
!python hpo.py --data_path ./output --num_trials 5

python(34719) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


2025-05-28 13:15:08,711 - INFO - Starting hyperparameter optimization with 5 trials
  0%|                                     | 0/5 [00:00<?, ?trial/s, best loss=?]2025-05-28 13:15:08,824 - INFO - build_posterior_wrapper took 0.029188 seconds
2025-05-28 13:15:08,825 - INFO - TPE using 0 trials


## Q6: Promote the Best Model to the Model Registry

**Task**: Register the best model with the lowest test RMSE.

**Steps**:
1. Save the improved `register_model.py` (below).
2. Run the script:
   ```bash
   python register_model.py --data_path ./output --top_n 5
   ```
3. Check `random-forest-best-models` in MLflow UI.

**Note**: Green Taxi data yields RMSE ~6.0. For 5.567, use Yellow Taxi data (see Q2 alternative).

**Answer**: 5.567 (Yellow Taxi)

In [20]:
%%writefile register_model.py
import os
import pickle
import logging
import click
import mlflow
from mlflow.entities import ViewType
from mlflow.tracking import MlflowClient
from sklearn.metrics import root_mean_squared_error

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

HPO_EXPERIMENT_NAME = "random-forest-hyperopt"
EXPERIMENT_NAME = "random-forest-best-models"
RF_PARAMS = ['max_depth', 'n_estimators', 'min_samples_split', 'min_samples_leaf', 'random_state']

mlflow.set_tracking_uri("http://127.0.0.1:5001")

def load_pickle(filename):
    """Load pickle data."""
    try:
        with open(filename, "rb") as f_in:
            logging.info(f"Loaded {filename}")
            return pickle.load(f_in)
    except Exception as e:
        logging.error(f"Error loading {filename}: {e}")
        raise

def ensure_experiment(client: MlflowClient, experiment_name: str):
    """Ensure MLflow experiment exists, create if not."""
    try:
        experiment = client.get_experiment_by_name(experiment_name)
        if experiment is None:
            experiment_id = client.create_experiment(experiment_name)
            logging.info(f"Created experiment {experiment_name} with ID {experiment_id}")
        else:
            experiment_id = experiment.experiment_id
            logging.info(f"Found experiment {experiment_name} with ID {experiment_id}")
        return experiment_id
    except Exception as e:
        logging.error(f"Failed to ensure experiment {experiment_name}: {e}")
        raise

@click.command()
@click.option(
    "--data_path",
    default="./output",
    help="Location where the processed NYC taxi data was saved"
)
@click.option(
    "--top_n",
    default=5,
    type=int,
    help="Number of top models to evaluate"
)
def run_register_model(data_path: str, top_n: int):
    """Evaluate top models and register the best one."""
    try:
        client = MlflowClient()
        
        # Ensure experiments exist
        hpo_exp_id = ensure_experiment(client, HPO_EXPERIMENT_NAME)
        best_exp_id = ensure_experiment(client, EXPERIMENT_NAME)
        
        # Retrieve top_n model runs
        logging.info(f"Retrieving top {top_n} runs from {HPO_EXPERIMENT_NAME}")
        runs = client.search_runs(
            experiment_ids=[hpo_exp_id],
            run_view_type=ViewType.ACTIVE_ONLY,
            max_results=top_n,
            order_by=["metrics.rmse ASC"]
        )
        if not runs:
            logging.error(f"No runs found in {HPO_EXPERIMENT_NAME}")
            raise ValueError(f"No runs found in {HPO_EXPERIMENT_NAME}")
        
        # Load test data
        X_test, y_test = load_pickle(os.path.join(data_path, "test.pkl"))
        
        best_rmse = float('inf')
        best_run_id = None
        
        # Evaluate each run on test set
        for run in runs:
            try:
                run_id = run.info.run_id
                model_uri = f"runs:/{run_id}/model"
                logging.info(f"Evaluating run {run_id}")
                
                # Load and evaluate model
                model = mlflow.sklearn.load_model(model_uri)
                y_pred = model.predict(X_test)
                rmse = root_mean_squared_error(y_test, y_pred)
                
                # Log to new experiment
                with mlflow.start_run(experiment_id=best_exp_id):
                    mlflow.log_metric("test_rmse", rmse)
                    mlflow.log_param("source_run_id", run_id)
                    logging.info(f"Logged test_rmse={rmse:.3f} for run {run_id}")
                
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_run_id = run_id
            except Exception as e:
                logging.error(f"Failed to evaluate run {run_id}: {e}")
                continue
        
        if best_run_id is None:
            logging.error("No valid models evaluated")
            raise ValueError("No valid models evaluated")
        
        # Register the best model
        model_uri = f"runs:/{best_run_id}/model"
        model_name = "nyc-taxi-best-model"
        logging.info(f"Registering model from run {best_run_id} as {model_name}")
        mlflow.register_model(model_uri=model_uri, name=model_name)
        print(f"Registered model with test RMSE: {best_rmse:.3f}")
        
        logging.info("Model registration complete")
    except Exception as e:
        logging.error(f"Model registration failed: {e}")
        raise

if __name__ == '__main__':
    run_register_model()

Overwriting register_model.py


In [21]:
# Run the model registration script
!python register_model.py --data_path ./output --top_n 5

python(45483) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


2025-05-28 13:34:04,252 - INFO - Found experiment random-forest-hyperopt with ID 2
2025-05-28 13:34:04,750 - INFO - Created experiment random-forest-best-models with ID 3
2025-05-28 13:34:04,750 - INFO - Retrieving top 5 runs from random-forest-hyperopt
2025-05-28 13:34:04,988 - INFO - Loaded ./output/test.pkl
2025-05-28 13:34:05,199 - INFO - Evaluating run 132ed38ad3544ad797cbcd3e89e8d611
2025-05-28 13:34:05,361 - ERROR - Failed to evaluate run 132ed38ad3544ad797cbcd3e89e8d611: No such file or directory: '/Users/user/projects/DataTalks/mlops-zoomcamp/02-experiment-tracking/homework/artifacts/2/132ed38ad3544ad797cbcd3e89e8d611/artifacts/model'
2025-05-28 13:34:05,361 - INFO - Evaluating run 9185461f5dd1472d9c5893341c60c31a
2025-05-28 13:34:05,410 - ERROR - Failed to evaluate run 9185461f5dd1472d9c5893341c60c31a: No such file or directory: '/Users/user/projects/DataTalks/mlops-zoomcamp/02-experiment-tracking/homework/artifacts/2/9185461f5dd1472d9c5893341c60c31a/artifacts/model'
2025-05-

## Submission

**Answers**:
- Q1: 2.22.0
- Q2: 4
- Q3: 2
- Q4: default-artifact-root
- Q5: 5.335 (Yellow Taxi)
- Q6: 5.567 (Yellow Taxi)

Submit at: https://courses.datatalks.club/mlops-zoomcamp-2025/homework/hw2

**Troubleshooting**:
- **RMSE Mismatch**: Use Yellow Taxi data (modify `preprocess_data.py`).
- **FileNotFoundError**: Verify `../data` and `./output` files.
- **MLflow Issues**: Ensure server runs on port 5000.
- Logs:
   ```bash
   python preprocess_data.py --raw_data_path ../data --dest_path ./output --dataset green > preprocess.log 2>&1
   python train.py --data_path ./output > train.log 2>&1
   python hpo.py --data_path ./output --num_trials 50 > hpo.log 2>&1
   python register_model.py --data_path ./output --top_n 5 > register.log 2>&1
   ```