# Taxi drips example

## Peaking at the data

Running `simulate.py` will insert data into two topics. The first topic contains all the taxi departures. The topic is located in the default Redpanda broker provided by Beaver. Beaver also provides a default Materialize instance to process the data in Redpanda with SQL. We'll do just that to take a look at the taxi departures.

In [47]:
import psycopg

conn = psycopg.connect("postgresql://materialize@localhost:6875/materialize?sslmode=disable")
conn.autocommit = True
with conn.cursor() as cur:
    cur.execute("DROP VIEW IF EXISTS taxi_departures")
    cur.execute("DROP SOURCE IF EXISTS taxi_departures_src")

    cur.execute("""
    CREATE MATERIALIZED SOURCE taxi_departures_src
    FROM KAFKA BROKER 'redpanda:29092' TOPIC 'taxi-departures'
        KEY FORMAT TEXT
        VALUE FORMAT BYTES
        INCLUDE KEY AS trip_no, TIMESTAMP AS received_at;
    """)

    cur.execute("""
    CREATE VIEW taxi_departures AS (
        SELECT
            trip_no,
            received_at,
            CAST(CONVERT_FROM(data, 'utf8') AS JSONB) AS trip
        FROM taxi_departures_src
    );
    """)

In [50]:
import pandas as pd

pd.read_sql('SELECT * FROM taxi_departures ORDER BY received_at DESC LIMIT 10', conn)

  pd.read_sql('SELECT * FROM taxi_departures ORDER BY received_at DESC LIMIT 10', conn)


Unnamed: 0,trip_no,received_at,trip
0,188,2022-11-22 21:38:38.152,"{'dropoff_latitude': 40.761478424072266, 'drop..."
1,187,2022-11-22 21:38:28.146,"{'dropoff_latitude': 40.72039031982422, 'dropo..."
2,186,2022-11-22 21:38:28.144,"{'dropoff_latitude': 40.70429992675781, 'dropo..."
3,185,2022-11-22 21:38:26.136,"{'dropoff_latitude': 40.879737854003906, 'drop..."
4,184,2022-11-22 21:38:25.128,"{'dropoff_latitude': 40.76662063598633, 'dropo..."
5,183,2022-11-22 21:38:20.124,"{'dropoff_latitude': 40.74039077758789, 'dropo..."
6,182,2022-11-22 21:38:11.109,"{'dropoff_latitude': 40.73457336425781, 'dropo..."
7,181,2022-11-22 21:38:10.100,"{'dropoff_latitude': 40.83300018310547, 'dropo..."
8,180,2022-11-22 21:38:09.096,"{'dropoff_latitude': 40.752925872802734, 'drop..."
9,179,2022-11-22 21:37:47.069,"{'dropoff_latitude': 40.74924087524415, 'dropo..."


Let's do the same for taxi arrivals.

In [56]:
with conn.cursor() as cur:
    cur.execute("DROP VIEW IF EXISTS taxi_arrivals")
    cur.execute("DROP SOURCE IF EXISTS taxi_arrivals_src")

    cur.execute("""
    CREATE MATERIALIZED SOURCE taxi_arrivals_src
    FROM KAFKA BROKER 'redpanda:29092' TOPIC 'taxi-arrivals'
        KEY FORMAT TEXT
        VALUE FORMAT BYTES
        INCLUDE KEY AS trip_no, TIMESTAMP AS received_at
    """)

    cur.execute("""
    CREATE VIEW taxi_arrivals AS (
        SELECT
            trip_no,
            received_at,
            CAST(CONVERT_FROM(data, 'utf8') AS JSONB) AS arrival
        FROM taxi_arrivals_src
    )
    """)

pd.read_sql('SELECT * FROM taxi_arrivals ORDER BY received_at DESC LIMIT 10', conn)

  pd.read_sql('SELECT * FROM taxi_arrivals ORDER BY received_at DESC LIMIT 10', conn)


Unnamed: 0,trip_no,received_at,arrival


## Streaming features

Beaver encourages you to process your streaming data with SQL. We'll start by building up some features which we'll then feed to a machine learning model. Let's start simple and calculate two features based on the distance between the pick-up and drop-off locations, as well as some basic temporal features.

In [46]:
feature_set_query = """
DROP VIEW IF EXISTS taxi_features;

CREATE VIEW taxi_features AS (
    SELECT 
        trip_no,
        ABS(dropoff_lat - pickup_lat) + ABS(dropoff_lon - pickup_lon) AS manhattan_distance,
        SQRT(POWER(dropoff_lat - pickup_lat, 2) + POWER(dropoff_lon - pickup_lon, 2)) AS euclidean_distance,
        EXTRACT(HOUR FROM pickup_datetime) AS pickup_hour,
        EXTRACT(DOW FROM pickup_datetime) = 1 AS is_monday,
        EXTRACT(DOW FROM pickup_datetime) = 2 AS is_tuesday,
        EXTRACT(DOW FROM pickup_datetime) = 3 AS is_wednesday,
        EXTRACT(DOW FROM pickup_datetime) = 4 AS is_thursday,
        EXTRACT(DOW FROM pickup_datetime) = 5 AS is_friday,
        EXTRACT(DOW FROM pickup_datetime) = 6 AS is_saturday,
        EXTRACT(DOW FROM pickup_datetime) = 7 AS is_sunday
    FROM (
        SELECT
            trip_no,
            CAST(trip ->> 'dropoff_latitude' AS FLOAT) AS dropoff_lat,
            CAST(trip ->> 'pickup_latitude' AS FLOAT) AS pickup_lat,
            CAST(trip ->> 'dropoff_longitude' AS FLOAT) AS dropoff_lon,
            CAST(trip ->> 'pickup_longitude' AS FLOAT) AS pickup_lon,
            CAST(trip ->> 'pickup_datetime' AS TIMESTAMP) AS pickup_datetime
        FROM taxi_departures
    )
)
"""

Instead of running this query like we did above, we'll register it in Beaver so it can be associated to a model. Registering a feature set happens through the API.

In [None]:
requests.post(
    "http://localhost:8000/api/features/",
    json={
        "name": "taxi_features",
        "query": feature_set_query,
        "key_field": "trip_no",
        "processor_id": 1
    },
)

## Streaming targets

The features we built can be used to do inference. But to train a model, we also need a target. Beaver also encourages you to define this target with SQL. For this example we'll predict the duration in seconds of each trip, which is a regression task.

In [47]:
target_query = """
DROP VIEW IF EXISTS taxi_targets;

CREATE VIEW taxi_targets AS (
    SELECT
        trip_no,
        CAST(arrival ->> 'duration' AS INTEGER) AS duration
    FROM taxi_arrivals
)
"""

requests.post(
    "http://localhost:8000/api/targets/",
    json={
        "name": "taxi_targets",
        "query": target_query,
        "key_field": "taxi_no",
        "target_field": "duration",
        "task": "REGRESSION",
        "processor_id": 1
    }
)

## Sending a first model

Now let's upload a first model. We'll start with a plain and simple linear regression.

In [40]:
import base64
import dill
import requests
from river import linear_model, preprocessing

model = preprocessing.StandardScaler() | linear_model.LinearRegression()
model.learn = model.learn_one
model.predict = model.predict_one

requests.post(
    "http://localhost:8000/api/models/",
    json={
        "name": "taxis-linear-regression",
        "task": "REGRESSION",
        "content": base64.b64encode(dill.dumps(model)).decode("ascii"),
    },
)

In this example, the models are hosted in Beaver, which is why we encode the model and send it in the payload.

## Creating an experiment

We now have all we need to run a first experiment. An experiment boils down to training a model on a feature set to predict a target. The model has already been uploaded. Beaver will thus take care of training the model in real-time. Beaver will also issue a prediction for each new arriving sample.

In [51]:
requests.post(
    "http://localhost:8000/api/experiments/",
    json={
        "name": "Taxi trips lin reg experiment",
        "feature_set_id": 2,
        "target_id": 2,
        "model_id": 2,
        "runner_id": 1,
        "sink_id": 1
    },
)

## Sending a new model

TODO: send a random forest on the same dataset. Show how it compares to the existing model.

## Defining new features

TODO: creating stateful features with Materialize. Create a new experiment with the random forest on these features.