# Baseline model for batch monitoring

### Prepare Dataset

In [2]:
import requests
import datetime
import pandas as pd
import psycopg

from evidently import ColumnMapping
from evidently.report import Report
from evidently.metrics import ColumnQuantileMetric, DatasetMissingValuesMetric

from joblib import load, dump
from tqdm import tqdm

from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import time

In [3]:
files = [('green_tripdata_2024-03.parquet', './data')]

print("Download files:")
for file, path in files:
    url=f"https://d37ci6vzurychx.cloudfront.net/trip-data/{file}"
    resp=requests.get(url, stream=True)
    save_path=f"{path}/{file}"
    with open(save_path, "wb") as handle:
        for data in tqdm(resp.iter_content(),
                        desc=f"{file}",
                        postfix=f"save to {save_path}",
                        total=int(resp.headers["Content-Length"])):
            handle.write(data)

Download files:


green_tripdata_2024-03.parquet: 100%|██████████| 1372372/1372372 [00:05<00:00, 247349.73it/s, save to ./data/green_tripdata_2024-03.parquet]


In [4]:
march_data = pd.read_parquet('data/green_tripdata_2024-03.parquet')

In [5]:
march_data.describe()

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
count,57457.0,57457,57457,55360.0,57457.0,57457.0,55360.0,57457.0,57457.0,57457.0,57457.0,57457.0,57457.0,0.0,57457.0,57457.0,55360.0,55353.0,55360.0
mean,1.877334,2024-03-16 04:02:52.405399,2024-03-16 04:21:00.076039,1.179986,95.524688,138.629149,1.309538,13.522828,17.313474,0.904472,0.57741,2.386255,0.192537,,0.979378,22.904832,1.321062,1.038047,0.73773
min,1.0,2008-12-31 23:02:24,2008-12-31 23:02:30,1.0,1.0,1.0,0.0,0.0,-295.08,-2.5,-0.5,-1.56,0.0,,-1.0,-296.08,1.0,1.0,-2.75
25%,2.0,2024-03-08 13:53:56,2024-03-08 14:13:49,1.0,74.0,74.0,1.0,1.1,9.3,0.0,0.5,0.0,0.0,,1.0,13.44,1.0,1.0,0.0
50%,2.0,2024-03-15 22:49:01,2024-03-15 23:09:52,1.0,75.0,138.0,1.0,1.79,13.5,0.0,0.5,2.0,0.0,,1.0,18.5,1.0,1.0,0.0
75%,2.0,2024-03-23 20:11:25,2024-03-23 20:34:48,1.0,97.0,220.0,1.0,3.1,19.8,1.0,0.5,3.61,0.0,,1.0,27.05,2.0,1.0,2.75
max,2.0,2024-04-01 00:01:45,2024-04-01 16:11:00,99.0,265.0,265.0,9.0,125112.2,841.6,10.0,4.25,150.0,26.76,,1.0,856.98,5.0,2.0,2.75
std,0.328056,,,1.356719,57.285088,76.295346,0.967749,770.416255,14.958249,1.382446,0.366916,3.159273,1.184551,,0.154253,17.013735,0.497858,0.191311,1.218039


In [6]:
march_data.shape

(57457, 20)

In [7]:
num_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
cat_features = ["PULocationID", "DOLocationID"]

In [8]:
reference_data = pd.read_parquet("data/reference.parquet")

### Metric

In [9]:
column_mapping = ColumnMapping(
    numerical_features=num_features,
    categorical_features=cat_features,
    target=None,
    prediction=None,
)

In [10]:
report = Report(metrics=[
    DatasetMissingValuesMetric(),
    ColumnQuantileMetric(column_name="fare_amount", quantile=0.5)
])


In [11]:
report.run(reference_data=reference_data, current_data=march_data, column_mapping=column_mapping)



In [12]:
result = report.as_dict()
result

{'metrics': [{'metric': 'DatasetMissingValuesMetric',
   'result': {'current': {'different_missing_values': {'': 0,
      -inf: 0,
      inf: 0,
      None: 70046},
     'number_of_different_missing_values': 1,
     'different_missing_values_by_column': {'VendorID': {'': 0,
       -inf: 0,
       inf: 0,
       None: 0},
      'lpep_pickup_datetime': {'': 0, -inf: 0, inf: 0, None: 0},
      'lpep_dropoff_datetime': {'': 0, -inf: 0, inf: 0, None: 0},
      'store_and_fwd_flag': {'': 0, -inf: 0, inf: 0, None: 2097},
      'RatecodeID': {'': 0, -inf: 0, inf: 0, None: 2097},
      'PULocationID': {'': 0, -inf: 0, inf: 0, None: 0},
      'DOLocationID': {'': 0, -inf: 0, inf: 0, None: 0},
      'passenger_count': {'': 0, -inf: 0, inf: 0, None: 2097},
      'trip_distance': {'': 0, -inf: 0, inf: 0, None: 0},
      'fare_amount': {'': 0, -inf: 0, inf: 0, None: 0},
      'extra': {'': 0, -inf: 0, inf: 0, None: 0},
      'mta_tax': {'': 0, -inf: 0, inf: 0, None: 0},
      'tip_amount': {'': 0, -

### Monitoring

In [13]:
create_table_statement = """
drop table if exists nyc_taxi_metrics;
create table nyc_taxi_metrics(
	timestamp timestamp,
	fare_amount_quantile_val float,
	share_missing_val float
)
"""

def prep_db():
    with psycopg.connect("host=localhost port=5432 user=postgres password=example123!", autocommit=True) as conn:
        res = conn.execute("SELECT 1 FROM pg_database WHERE datname='taxi_monitoring'")
        if len(res.fetchall()) == 0:
            conn.execute("create database taxi_monitoring;")

    with psycopg.connect("host=localhost port=5432 user=postgres password=example123! dbname=taxi_monitoring", autocommit=True) as conn:
        conn.execute(create_table_statement)

In [14]:
begin = datetime.datetime(2024, 3, 1, 0, 0)

def calculate_metrics_postgresql(curr, i):
    current_data = march_data[(march_data.lpep_pickup_datetime >= (begin + datetime.timedelta(i))) &
		(march_data.lpep_pickup_datetime < (begin + datetime.timedelta(i + 1)))]

    report.run(reference_data=reference_data,
                current_data=current_data,
                column_mapping=column_mapping)

    result = report.as_dict()
    share_missing_values = result["metrics"][0]["result"]["current"]["share_of_missing_values"]
    fare_amount_quantile_val = result["metrics"][1]["result"]["current"]["value"]

    curr.execute(
        "insert into nyc_taxi_metrics(timestamp, share_missing_val, fare_amount_quantile_val) values (%s, %s, %s)",
        (begin + datetime.timedelta(i), share_missing_values, fare_amount_quantile_val)
    )

In [1]:
SEND_TIMEOUT = 10

prep_db()
last_send = datetime.datetime.now() - datetime.timedelta(seconds=10)
with psycopg.connect("host=localhost port=5432 user=postgres password=example123! dbname=taxi_monitoring", autocommit=True) as conn:
    for i in range(0, 31):
        with conn.cursor() as curr:
            calculate_metrics_postgresql(curr, i)

            new_send = datetime.datetime.now()
            seconds_elapsed = (new_send - last_send).total_seconds()
            if seconds_elapsed < SEND_TIMEOUT:
                time.sleep(SEND_TIMEOUT - seconds_elapsed)
            while last_send < new_send:
                last_send = last_send + datetime.timedelta(seconds=10)

NameError: name 'prep_db' is not defined

In [42]:
with psycopg.connect("host=localhost port=5432 user=postgres password=example123! dbname=taxi_monitoring", autocommit=True) as conn:
    res = conn.execute("select max(fare_amount_quantile_val) from nyc_taxi_metrics")
    max_value = res.fetchone()
    print("Maximum value", max_value)

Maximum value (14.2,)
