In [1]:
pip show mage-ai

Name: mage-ai
Version: 0.9.76
Summary: Mage is a tool for building and deploying data pipelines.
Home-page: https://github.com/mage-ai/mage-ai
Author: Mage
Author-email: eng@mage.ai
License: 
Location: c:\users\lenovo\practicals\3_ml_ops\.venv\lib\site-packages
Requires: aiofiles, aiohttp, alembic, bcrypt, cachetools, croniter, cryptography, dask, datadog, Faker, freezegun, GitPython, great-expectations, httpx, inflection, ipykernel, ipython, itsdangerous, Jinja2, joblib, jupyter-server, jupyter_client, ldap3, memory_profiler, newrelic, numpy, pandas, Pillow, polars, protobuf, psutil, pyairtable, pyarrow, PyGithub, PyJWT, python-dateutil, pytz, pyyaml, redis, requests, ruamel.yaml, scikit-learn, sentry-sdk, setuptools, simplejson, six, sqlalchemy, sqlglot, terminado, thefuzz, tornado, typer, typing_extensions, watchdog, Werkzeug
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [None]:
# load_data.py

import pandas as pd
from mage_ai.io.file import FileIO
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_file(*args, **kwargs):
    """
    Load March 2023 Yellow Taxi data from parquet file.
    """
    df = pd.read_parquet(
        'C:/Users/3_ML_OPS/03-orchestration/data/yellow_tripdata_2023-03.parquet'
    )
    print(f'✅ Loaded data with shape: {df.shape}')
    return df


@test
def test_output(output, *args) -> None:
    assert output.shape[0] == 3403766, 'Row count mismatch!'


In [None]:
# prepare_data.py

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

import pandas as pd


@transformer
def prepare_data(df_raw: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
    """
    Transformer block to clean and filter NYC Yellow Taxi data.
    """
    df = df_raw.copy()

    # Calculate trip duration in minutes
    df['duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
    df['duration'] = df['duration'].dt.total_seconds() / 60

    # Filter out trips with duration less than 1 or more than 60 mins
    df = df[(df['duration'] >= 1) & (df['duration'] <= 60)]

    # Convert location IDs to strings (categorical)
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    print(f" ✅ Filtered data shape: {df.shape}")  # Print number of rows and columns

    return df

@test
def test_output(output, *args) -> None:
    """
    Basic tests for filtered DataFrame
    """
    assert output is not None, 'Output is None'
    assert 'duration' in output.columns, 'Missing duration column'
    assert output['duration'].between(1, 60).all(), 'Duration filter failed'


In [None]:
# train_data.py

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression

import pandas as pd


@transformer
def train_model(df: pd.DataFrame, *args, **kwargs):
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)

    # Dicts for DictVectorizer
    dicts = df[categorical].to_dict(orient='records')

    dv = DictVectorizer()
    X_train = dv.fit_transform(dicts)

    y_train = df['duration'].values

    model = LinearRegression()
    model.fit(X_train, y_train)

    # ✅ Print intercept for homework
    print(f"✅ Intercept: {model.intercept_:.2f}")

    return model, dv

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


In [None]:
# mlflow logging

import mlflow
import os

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("homework_module-3")

@transformer
def train_model(df, *args, **kwargs):


    # Prepare the data
    categorical = ['PULocationID', 'DOLocationID']
    dv = DictVectorizer()

    train_dicts = df[categorical].to_dict(orient='records')
    X = dv.fit_transform(train_dicts)
    y = df['duration'].values

    # Train/test split (not strictly required, but useful)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    model = LinearRegression()
    model.fit(X_train, y_train)

    print(f"✅ Intercept: {model.intercept_:.2f}")

    # Log model to MLflow
    with mlflow.start_run():
        mlflow.log_param("model_type", "LinearRegression")
        mlflow.log_param("features", categorical)
        mlflow.log_metric("intercept", model.intercept_)

        # Save model and vectorizer
        mlflow.sklearn.log_model(model, artifact_path="model")
        #mlflow.log_artifact(local_path="dict_vectorizer.pkl")

    return model, dv
