# Ridesharing Demo

This notebooks provides the required SingleStore setup for a sample ridesharing application. It consists of the following parts:
1. Ingesting historic trip information from a Snowflake Iceberg table stored on S3
2. Creating Kafka pipelines to stream real-time trip information + rider/driver locations
3. Some sample queries featuring trend analysis on the trip data

For more information on running this demo, check out https://github.com/singlestore-labs/demo-ridesharing-sim.

In [4]:
%%sql
DROP DATABASE IF EXISTS rideshare_demo;
CREATE DATABASE rideshare_demo;
USE rideshare_demo;

Drop all existing pipelines if recreating tables.

In [7]:
%%sql
DROP PIPELINE IF EXISTS rideshare_ice_trips;
DROP PIPELINE IF EXISTS rideshare_kafka_trips;
DROP PIPELINE IF EXISTS rideshare_kafka_riders;
DROP PIPELINE IF EXISTS rideshare_kafka_drivers;

Create the trips table.

In [8]:
%%sql
DROP TABLE IF EXISTS trips;
CREATE TABLE trips (
    id VARCHAR(255) NOT NULL,
    driver_id VARCHAR(255),
    rider_id VARCHAR(255),
    status VARCHAR(20),
    request_time DATETIME(6),
    accept_time DATETIME(6),
    pickup_time DATETIME(6),
    dropoff_time DATETIME(6),
    fare INT NOT NULL,
    distance DOUBLE NOT NULL,
    pickup_lat DOUBLE NOT NULL,
    pickup_long DOUBLE NOT NULL,
    dropoff_lat DOUBLE NOT NULL,
    dropoff_long DOUBLE NOT NULL,
    city VARCHAR(255) NOT NULL,
    PRIMARY KEY (id),
    SORT KEY (status, city)
);

Setup a pipeline to ingest trip data from an iceberg catalog. This assumes that the catalog is a Snowflake catalog stored on S3.

In [4]:
%config SqlMagic.named_parameters=True

In [5]:
%%sql
SET GLOBAL enable_iceberg_ingest = ON;
SET GLOBAL pipelines_extractor_get_offsets_timeout_ms = 90000;

CREATE OR REPLACE PIPELINE rideshare_ice_trips AS
LOAD DATA S3 ''
CONFIG '{"region" : "us-west-2",
        "catalog_type": "SNOWFLAKE",
        "table_id": "RIDESHARE_DEMO.public.trips_ice",
        "catalog.uri": "jdbc:snowflake://account-identifier.snowflakecomputing.com",
        "catalog.jdbc.user":"USER",
        "catalog.jdbc.password":"PASSWORD",
        "catalog.jdbc.role":"ACCOUNTADMIN"}'
CREDENTIALS '{"aws_access_key_id" : "KEY_ID",
             "aws_secret_access_key": "SECRET_KEY"
}'
REPLACE INTO TABLE trips (
    id <- ID,
    driver_id <- DRIVER_ID,
    rider_id <- RIDER_ID,
    status <- STATUS,
    @request_time <- REQUEST_TIME,
    @accept_time <- ACCEPT_TIME,
    @pickup_time <- PICKUP_TIME,
    @dropoff_time <- DROPOFF_TIME,
    fare <- FARE,
    distance <- DISTANCE,
    pickup_lat <- PICKUP_LAT,
    pickup_long <- PICKUP_LONG,
    dropoff_lat <- DROPOFF_LAT,
    dropoff_long <- DROPOFF_LONG,
    city <- CITY
)
FORMAT ICEBERG
SET request_time = FROM_UNIXTIME(@request_time/1000000),
    accept_time = FROM_UNIXTIME(@accept_time/1000000),
    pickup_time = FROM_UNIXTIME(@pickup_time/1000000),
    dropoff_time = FROM_UNIXTIME(@dropoff_time/1000000);

In [6]:
%%sql
START PIPELINE rideshare_ice_trips FOREGROUND;

Create a kafka pipeline to ingest trip data in real-time. Consumes the `ridesharing-sim-trips` topic and upserts into the trips table.

In [12]:
%%sql
DROP PIPELINE IF EXISTS rideshare_kafka_trips;
CREATE OR REPLACE PIPELINE rideshare_kafka_trips AS
    LOAD DATA KAFKA 'pkc-rgm37.us-west-2.aws.confluent.cloud:9092/ridesharing-sim-trips'
    CONFIG '{"sasl.username": "username",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
    CREDENTIALS '{"sasl.password": "password"}'
    DISABLE OUT_OF_ORDER OPTIMIZATION
    REPLACE INTO TABLE trips
    FORMAT JSON
    (
        id <- id,
        rider_id <- rider_id,
        driver_id <- driver_id,
        status <- status,
        @request_time <- request_time,
        @accept_time <- accept_time,
        @pickup_time <- pickup_time,
        @dropoff_time <- dropoff_time,
        fare <- fare,
        distance <- distance,
        pickup_lat <- pickup_lat,
        pickup_long <- pickup_long,
        dropoff_lat <- dropoff_lat,
        dropoff_long <- dropoff_long,
        city <- city
    )
    SET request_time = STR_TO_DATE(@request_time, '%Y-%m-%dT%H:%i:%s.%f'),
        accept_time = STR_TO_DATE(@accept_time, '%Y-%m-%dT%H:%i:%s.%f'),
        pickup_time = STR_TO_DATE(@pickup_time, '%Y-%m-%dT%H:%i:%s.%f'),
        dropoff_time = STR_TO_DATE(@dropoff_time, '%Y-%m-%dT%H:%i:%s.%f');

In [13]:
%%sql
START PIPELINE rideshare_kafka_trips;

Create a riders table and kafka pipeline that consumes the `ridesharing-sim-riders` topic and upserts data.

In [15]:
%%sql
DROP TABLE IF EXISTS riders;
CREATE TABLE riders (
    id VARCHAR(255) NOT NULL,
    first_name VARCHAR(255),
    last_name VARCHAR(255),
    email VARCHAR(255),
    phone_number VARCHAR(255),
    date_of_birth DATETIME(6),
    created_at DATETIME(6),
    location_city VARCHAR(255),
    location_lat DOUBLE,
    location_long DOUBLE,
    status VARCHAR(20),
    PRIMARY KEY (id),
    SORT KEY (status, location_city)
);

In [16]:
%%sql
DROP PIPELINE IF EXISTS rideshare_kafka_riders;
CREATE OR REPLACE PIPELINE rideshare_kafka_riders AS
    LOAD DATA KAFKA 'pkc-rgm37.us-west-2.aws.confluent.cloud:9092/ridesharing-sim-riders'
    CONFIG '{"sasl.username": "username",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
    CREDENTIALS '{"sasl.password": "password"}'
    DISABLE OUT_OF_ORDER OPTIMIZATION
    REPLACE INTO TABLE riders
    FORMAT JSON
    (
        id <- id,
        first_name <- first_name,
        last_name <- last_name,
        email <- email,
        phone_number <- phone_number,
        @date_of_birth <- date_of_birth,
        @created_at <- created_at,
        location_city <- location_city,
        location_lat <- location_lat,
        location_long <- location_long,
        status <- status
    )
    SET date_of_birth = STR_TO_DATE(@date_of_birth, '%Y-%m-%dT%H:%i:%s.%f'),
        created_at = STR_TO_DATE(@created_at, '%Y-%m-%dT%H:%i:%s.%f');

In [17]:
%%sql
START PIPELINE rideshare_kafka_riders;

Create a drivers table and kafka pipeline that consumes the `ridesharing-sim-drivers` topic and upserts data.

In [18]:
%%sql
DROP TABLE IF EXISTS drivers;
CREATE TABLE drivers (
    id VARCHAR(255) NOT NULL,
    first_name VARCHAR(255),
    last_name VARCHAR(255),
    email VARCHAR(255),
    phone_number VARCHAR(255),
    date_of_birth DATETIME(6),
    created_at DATETIME(6),
    location_city VARCHAR(255),
    location_lat DOUBLE,
    location_long DOUBLE,
    status VARCHAR(20),
    PRIMARY KEY (id),
    SORT KEY (status, location_city)
);

In [83]:
%%sql
DROP PIPELINE IF EXISTS rideshare_kafka_drivers;
CREATE OR REPLACE PIPELINE rideshare_kafka_drivers AS
    LOAD DATA KAFKA 'pkc-rgm37.us-west-2.aws.confluent.cloud:9092/ridesharing-sim-drivers'
    CONFIG '{"sasl.username": "username",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
    CREDENTIALS '{"sasl.password": "password"}'
    DISABLE OUT_OF_ORDER OPTIMIZATION
    REPLACE INTO TABLE drivers
    FORMAT JSON
    (
        id <- id,
        first_name <- first_name,
        last_name <- last_name,
        email <- email,
        phone_number <- phone_number,
        @date_of_birth <- date_of_birth,
        @created_at <- created_at,
        location_city <- location_city,
        location_lat <- location_lat,
        location_long <- location_long,
        status <- status
    )
    SET date_of_birth = STR_TO_DATE(@date_of_birth, '%Y-%m-%dT%H:%i:%s.%f'),
        created_at = STR_TO_DATE(@created_at, '%Y-%m-%dT%H:%i:%s.%f');

In [84]:
%%sql
START PIPELINE rideshare_kafka_drivers;

Debug query to see the current number of trips, riders, and drivers grouped by their status.

In [14]:
%%sql
SELECT 'trips' as entity, status, COUNT(*) as count
    FROM trips
    GROUP BY status
    UNION ALL
    SELECT 'riders' as entity, status, COUNT(*) as count
    FROM riders
    GROUP BY status
    UNION ALL
    SELECT 'drivers' as entity, status, COUNT(*) as count
    FROM drivers
    GROUP BY status
    ORDER BY entity, status;

entity,status,count
trips,completed,21624322


Cleanup query to remove any orphaned trips and reset the riders and drivers tables. Run before restarting the simulator.

In [13]:
%%sql
DELETE FROM trips WHERE status != 'completed';
DELETE FROM riders;
DELETE FROM drivers;

# Queries

Minute by minute ride request breakdown in a given city, with percent_change for the last hour.

In [14]:
%%sql

WITH minute_counts AS (
    SELECT 
        DATE_FORMAT(request_time, '%Y-%m-%d %H:%i:00') AS minute_interval,
        COUNT(*) AS trip_count
    FROM 
        trips
    WHERE 
        city = 'San Francisco'
        AND request_time >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
    GROUP BY 
        minute_interval
)
SELECT 
    c.minute_interval,
    c.trip_count,
    COALESCE(
        ROUND(
            (c.trip_count - LAG(c.trip_count) OVER (ORDER BY c.minute_interval)) / 
            NULLIF(LAG(c.trip_count) OVER (ORDER BY c.minute_interval), 0) * 100,
            2
        ),
        0
    ) AS percent_change
FROM 
    minute_counts c
ORDER BY 
    c.minute_interval;

minute_interval,trip_count,percent_change
2024-09-04 00:05:00,153,0.0
2024-09-04 00:06:00,179,16.99
2024-09-04 00:07:00,168,-6.15
2024-09-04 00:08:00,176,4.76
2024-09-04 00:09:00,156,-11.36
2024-09-04 00:10:00,168,7.69
2024-09-04 00:11:00,170,1.19
2024-09-04 00:12:00,179,5.29
2024-09-04 00:13:00,162,-9.5
2024-09-04 00:14:00,175,8.02


Hourly ride request breakdown in a given city, with percent_change for the last 24 hours.

In [81]:
%%sql

WITH hourly_counts AS (
    SELECT 
        DATE_FORMAT(request_time, '%Y-%m-%d %H:00:00') AS hourly_interval,
        COUNT(*) AS trip_count
    FROM 
        trips
    WHERE 
        city = 'Fremont'
        AND request_time >= NOW() - INTERVAL 24 HOUR
    GROUP BY 
        hourly_interval
)
SELECT 
    c.hourly_interval,
    c.trip_count,
    COALESCE(
        ROUND(
            (c.trip_count - LAG(c.trip_count) OVER (ORDER BY c.hourly_interval)) / 
            LAG(c.trip_count) OVER (ORDER BY c.hourly_interval) * 100,
            2
        ),
        0
    ) AS percent_change
FROM 
    hourly_counts c
ORDER BY 
    c.hourly_interval;

hourly_interval,trip_count,percent_change
2024-09-03 22:00:00,1821,0.0
2024-09-03 23:00:00,2330,27.95
2024-09-04 00:00:00,1807,-22.45
2024-09-04 02:00:00,2036,12.67
2024-09-04 03:00:00,2357,15.77
2024-09-04 04:00:00,2152,-8.7
2024-09-04 05:00:00,2035,-5.44
2024-09-04 06:00:00,1976,-2.9
2024-09-04 07:00:00,1929,-2.38
2024-09-04 08:00:00,1926,-0.16


Daily ride request breakdown in a given city, with percent_change for the last 7 days.

In [82]:
%%sql

WITH daily_counts AS (
    SELECT 
        DATE(request_time) AS daily_interval,
        COUNT(*) AS trip_count
    FROM 
        trips
    WHERE 
        city = 'San Francisco'
        AND request_time >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
    GROUP BY 
        daily_interval
)
SELECT 
    c.daily_interval,
    c.trip_count,
    COALESCE(
        ROUND(
            (c.trip_count - LAG(c.trip_count) OVER (ORDER BY c.daily_interval)) / 
            LAG(c.trip_count) OVER (ORDER BY c.daily_interval) * 100,
            2
        ),
        0
    ) AS percent_change
FROM 
    daily_counts c
ORDER BY 
    c.daily_interval;

daily_interval,trip_count,percent_change
2024-09-03,18988,0.0
2024-09-04,47802,151.75
