In [None]:
import requests
from io import BytesIO
from typing import List
import urllib.request
import os
import pandas as pd


def load_data(list_file_name: list):
    for file_name in list_file_name:
        # check if file already exist
        if not os.path.exists(f"./data/{file_name}"): # Or folder, will return true or false
            if not os.path.exists("./data/"):
                os.mkdir("./data/")
            site_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file_name}"
            file_name = f"./data/{file_name}"
            urllib.request.urlretrieve(site_url, filename=file_name)
            print(f"load file: {file_name}")
        else:
            print(f'file {file_name} already exist')


@data_loader
def read_dataframe(**kwargs):
    list_of_file = ["yellow_tripdata_2023-01.parquet", "yellow_tripdata_2023-03.parquet"]
    load_data(list_of_file)
    
    filename="./data/yellow_tripdata_2023-03.parquet"
    if filename.endswith('.csv'):
        df = pd.read_csv(filename)

        df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
        df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)
    elif filename.endswith('.parquet'):
        df = pd.read_parquet(filename)
    
    print(len(df))
    return df

In [None]:
from typing import Tuple

import pandas as pd

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test



@transformer
def transform(
    df: pd.DataFrame, **kwargs
) -> Tuple[pd.DataFrame]:
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df.duration = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    print(len(df))

    return df

In [None]:
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom

@custom
def preprocess(df):
    categorical = ['PULocationID', 'DOLocationID']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')

    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts)
    target = 'duration'
    y_train = df[target].values

    lr = LinearRegression()
    lr.fit(X_train, y_train)

    y_pred = lr.predict(X_train)

    print(mean_squared_error(y_train, y_pred, squared=False))

    return lr


In [None]:
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@custom
def model_result(lr):
    return lr.intercept_



In [None]:
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import mlflow

if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom

@custom
def preprocess(df):
    categorical = ['PULocationID', 'DOLocationID']
    numerical = ['trip_distance']
    train_dicts = df[categorical + numerical].to_dict(orient='records')

    
    dv = DictVectorizer()
    X_train = dv.fit_transform(train_dicts)
    target = 'duration'
    y_train = df[target].values

    mlflow.set_tracking_uri("sqlite:///mlflow.db")
    mlflow.set_experiment("homework-experiment")
    mlflow.autolog()

    lr = LinearRegression()
    lr.fit(X_train, y_train)

    y_pred = lr.predict(X_train)
    rmse = mean_squared_error(y_train, y_pred, squared=False)
    print(rmse)

    return lr
