In [1]:
import duckdb
import os
import time

In [2]:
DB_FILE = "cycling.duckdb"
DATA_DIR = "data"
DATA_DIR_NYC = "data_nyc"

if os.path.exists(DB_FILE):
    os.remove(DB_FILE)


In [3]:
con = duckdb.connect(DB_FILE)

In [4]:
start = time.time()

# UPDATED LINE: Added union_by_name=True
print("Ingesting data with schema union...")
con.sql(f"""
    CREATE TABLE raw_london_bike_data AS 
    SELECT * FROM read_csv('{DATA_DIR}/*.csv', union_by_name=True, filename=True)
""")

print(f"Done in {time.time() - start:.2f}s")
try:
    count = con.sql('SELECT count(*) FROM raw_london_bike_data').fetchone()[0]
    print(f"Rows: {count:,}")
except:
    print("Could not count rows.")

# con.close()

Ingesting data with schema union...
Done in 10.10s
Rows: 61,838,623


In [5]:
con.sql("select * from raw_london_bike_data") 

┌───────────┬──────────┬─────────┬──────────────────┬───────────────┬─────────────────────────────────────────────┬──────────────────┬─────────────────┬────────────────────────────────────────┬────────┬──────────────────────┬───────────────┬────────────────────┬─────────────┬─────────────┬────────────┬────────────────┬─────────────────────┬───────────────────────────────────────────────────────────────┐
│ Rental Id │ Duration │ Bike Id │     End Date     │ EndStation Id │               EndStation Name               │    Start Date    │ StartStation Id │           StartStation Name            │ Number │ Start station number │ Start station │ End station number │ End station │ Bike number │ Bike model │ Total duration │ Total duration (ms) │                           filename                            │
│   int64   │  int64   │  int64  │     varchar      │     int64     │                   varchar                   │     varchar      │      int64      │                varchar           

In [6]:
columns = [row[0] for row in con.sql("DESCRIBE raw_london_bike_data").fetchall()]
columns

['Rental Id',
 'Duration',
 'Bike Id',
 'End Date',
 'EndStation Id',
 'EndStation Name',
 'Start Date',
 'StartStation Id',
 'StartStation Name',
 'Number',
 'Start station number',
 'Start station',
 'End station number',
 'End station',
 'Bike number',
 'Bike model',
 'Total duration',
 'Total duration (ms)',
 'filename']

In [7]:
def safe_col(col_name):
    """Returns the column name in double quotes if it exists, else NULL"""
    if col_name in columns:
        return f'"{col_name}"'
    return "NULL"


sql_transform = f"""
    DROP TABLE IF EXISTS london_bike_data;
    CREATE TABLE london_bike_data AS
    SELECT
        COALESCE(CAST({safe_col('Number')} AS VARCHAR), CAST({safe_col('Rental Id')} AS VARCHAR)) AS rental_id,
        
        COALESCE(
            -- Try UK format (DD/MM/YYYY HH:MM)
            try_strptime({safe_col('Start date')}, '%d/%m/%Y %H:%M'),
            try_strptime({safe_col('Start Date')}, '%d/%m/%Y %H:%M'),
            
            -- Try ISO format (YYYY-MM-DD HH:MM)
            try_strptime({safe_col('Start date')}, '%Y-%m-%d %H:%M'),
            try_strptime({safe_col('Start Date')}, '%Y-%m-%d %H:%M'),
            
            -- Fallback to generic timestamp cast (handles strict ISO with seconds)
            TRY_CAST({safe_col('Start date')} AS TIMESTAMP), 
            TRY_CAST({safe_col('Start Date')} AS TIMESTAMP)
        ) AS start_date,
        
        COALESCE(
            -- Try UK format
            try_strptime({safe_col('End date')}, '%d/%m/%Y %H:%M'),
            try_strptime({safe_col('End Date')}, '%d/%m/%Y %H:%M'),
            
            -- Try ISO format
            try_strptime({safe_col('End date')}, '%Y-%m-%d %H:%M'),
            try_strptime({safe_col('End Date')}, '%Y-%m-%d %H:%M'),
            
            -- Fallback
            TRY_CAST({safe_col('End date')} AS TIMESTAMP), 
            TRY_CAST({safe_col('End Date')} AS TIMESTAMP)
        ) AS end_date,
        
        COALESCE(
            CAST({safe_col('Total duration (ms)')} AS DOUBLE)/1000.0, 
            CAST({safe_col('Total duration')} AS DOUBLE), 
            CAST({safe_col('Duration')} AS DOUBLE)
        ) AS duration_seconds,
        
        COALESCE(CAST({safe_col('Bike number')} AS VARCHAR), CAST({safe_col('Bike Id')} AS VARCHAR)) AS bike_id,
        
        -- FIX: Cast Station IDs to VARCHAR
        COALESCE(CAST({safe_col('Start station number')} AS VARCHAR), CAST({safe_col('StartStation Id')} AS VARCHAR)) AS start_station_id,
        COALESCE({safe_col('Start station')}, {safe_col('StartStation Name')}) AS start_station_name,
        
        -- FIX: Cast Station IDs to VARCHAR
        COALESCE(CAST({safe_col('End station number')} AS VARCHAR), CAST({safe_col('EndStation Id')} AS VARCHAR)) AS end_station_id,
        COALESCE({safe_col('End station')}, {safe_col('EndStation Name')}) AS end_station_name,
        
        {safe_col('Bike model')} AS bike_model,
        filename
    FROM raw_london_bike_data
    WHERE start_date IS NOT NULL
        AND end_date IS NOT NULL
        AND duration_seconds > 0
        AND duration_seconds <= 1000 * 60  -- keep rides <= 1000 minutes
"""

con.sql(sql_transform)

In [8]:
con.sql("SELECT * FROM london_bike_data ORDER BY start_date DESC")

┌───────────┬─────────────────────┬─────────────────────┬──────────────────┬─────────┬──────────────────┬────────────────────────────────────────────┬────────────────┬────────────────────────────────────────┬────────────┬───────────────────────────────────────────────────────────────┐
│ rental_id │     start_date      │      end_date       │ duration_seconds │ bike_id │ start_station_id │             start_station_name             │ end_station_id │            end_station_name            │ bike_model │                           filename                            │
│  varchar  │      timestamp      │      timestamp      │      double      │ varchar │     varchar      │                  varchar                   │    varchar     │                varchar                 │  varchar   │                            varchar                            │
├───────────┼─────────────────────┼─────────────────────┼──────────────────┼─────────┼──────────────────┼─────────────────────────────────────

# NYC Data Import

In [9]:
con.sql(f"""
    CREATE TABLE raw_nyc_bike_data AS 
    SELECT * FROM read_csv('{DATA_DIR_NYC}/**/*.csv', union_by_name=True, filename=True)
""")

In [10]:
con.sql("SELECT * FROM raw_nyc_bike_data")

┌──────────────┬─────────────────────────┬─────────────────────────┬──────────────────┬─────────────────────┬────────────────────────┬─────────────────────────┬────────────────┬────────────────────────────┬──────────────────────┬───────────────────────┬────────┬────────────┬────────────┬────────┬─────────┬───────────────┬────────────┬───────────┬────────────────────┬──────────────────┬──────────────────┬────────────────┬───────────┬───────────┬─────────┬─────────┬───────────────┬──────────────────────────────────────────────────────────────┐
│ tripduration │        starttime        │        stoptime         │ start station id │ start station name  │ start station latitude │ start station longitude │ end station id │      end station name      │ end station latitude │ end station longitude │ bikeid │  usertype  │ birth year │ gender │ ride_id │ rideable_type │ started_at │ ended_at  │ start_station_name │ start_station_id │ end_station_name │ end_station_id │ start_lat │ start_lng │ end

In [11]:
columns = [row[0] for row in con.sql("DESCRIBE raw_nyc_bike_data").fetchall()]
print(f"Staging columns detected: {len(columns)}")

def safe_col(col_name):
    return f'"{col_name}"' if col_name in columns else "NULL"

# 2. Transform & Unify
# Mapping Old Schema -> New Schema
print("Step 2: Creating unified 'nyc_biking_data' table...")

sql_transform = f"""
    CREATE OR REPLACE TABLE nyc_biking_data AS
    SELECT
        -- ID: New data has ride_id, Old data doesn't (we can use filename/row or NULL)
        {safe_col('ride_id')} AS ride_id,
        
        -- Dates: 'started_at' (New) vs 'starttime' (Old)
        COALESCE(
            TRY_CAST({safe_col('started_at')} AS TIMESTAMP), 
            TRY_CAST({safe_col('starttime')} AS TIMESTAMP)
        ) AS start_time,
        
        COALESCE(
            TRY_CAST({safe_col('ended_at')} AS TIMESTAMP), 
            TRY_CAST({safe_col('stoptime')} AS TIMESTAMP)
        ) AS end_time,
        
        -- Stations: 'start_station_id' (New) vs 'start station id' (Old)
        COALESCE(CAST({safe_col('start_station_id')} AS VARCHAR), CAST({safe_col('start station id')} AS VARCHAR)) AS start_station_id,
        COALESCE({safe_col('start_station_name')}, {safe_col('start station name')}) AS start_station_name,
        
        COALESCE(CAST({safe_col('end_station_id')} AS VARCHAR), CAST({safe_col('end station id')} AS VARCHAR)) AS end_station_id,
        COALESCE({safe_col('end_station_name')}, {safe_col('end station name')}) AS end_station_name,
        
        -- Lat/Lng: Only reliable in New data, Old data has it too usually
        COALESCE(CAST({safe_col('start_lat')} AS DOUBLE), CAST({safe_col('start station latitude')} AS DOUBLE)) AS start_lat,
        COALESCE(CAST({safe_col('start_lng')} AS DOUBLE), CAST({safe_col('start station longitude')} AS DOUBLE)) AS start_lng,
        
        COALESCE(CAST({safe_col('end_lat')} AS DOUBLE), CAST({safe_col('end station latitude')} AS DOUBLE)) AS end_lat,
        COALESCE(CAST({safe_col('end_lng')} AS DOUBLE), CAST({safe_col('end station longitude')} AS DOUBLE)) AS end_lng,

        -- Member Type: 'member_casual' (New: member/casual) vs 'usertype' (Old: Subscriber/Customer)
        COALESCE({safe_col('member_casual')}, CASE WHEN {safe_col('usertype')} = 'Subscriber' THEN 'member' ELSE 'casual' END) AS user_type,
        
        filename
    FROM raw_nyc_bike_data
    WHERE start_time IS NOT NULL
        AND end_time IS NOT NULL
        AND date_diff('second', start_time, end_time) > 0
        AND date_diff('second', start_time, end_time) <= 1000 * 60  -- keep rides <= 1000 minutes
"""

con.sql(sql_transform)

# 3. Validation
count = con.sql("SELECT count(*) FROM nyc_biking_data").fetchone()[0]
print(f"Step 3: Done. Total NYC Rows: {count:,}")

Staging columns detected: 29
Step 2: Creating unified 'nyc_biking_data' table...
Step 3: Done. Total NYC Rows: 150,892,056


In [12]:
con.sql("SELECT * FROM nyc_biking_data")

┌─────────┬─────────────────────────┬─────────────────────────┬──────────────────┬─────────────────────┬────────────────┬──────────────────────────────┬─────────────┬──────────────┬───────────────────┬───────────────────┬───────────┬──────────────────────────────────────────────────────────────┐
│ ride_id │       start_time        │        end_time         │ start_station_id │ start_station_name  │ end_station_id │       end_station_name       │  start_lat  │  start_lng   │      end_lat      │      end_lng      │ user_type │                           filename                           │
│ varchar │        timestamp        │        timestamp        │     varchar      │       varchar       │    varchar     │           varchar            │   double    │    double    │      double       │      double       │  varchar  │                           varchar                            │
├─────────┼─────────────────────────┼─────────────────────────┼──────────────────┼─────────────────────┼─────

# Combined Dataset

In [13]:
sql_transform = f"""
CREATE OR REPLACE TABLE joint_bike_data AS
SELECT
    start_date    AS start_time,
    end_date      AS end_time,
    duration_seconds AS duration_seconds,
    'London'      AS city
FROM london_bike_data

UNION ALL

SELECT
    start_time,
    end_time,
    date_diff('second', start_time, end_time) AS duration_seconds,
    'NYC' AS city
FROM nyc_biking_data
"""

con.sql(sql_transform)

In [29]:
con.sql("SELECT * FROM joint_bike_data")

┌─────────────────────┬─────────────────────┬──────────────────┬─────────┐
│     start_time      │      end_time       │ duration_seconds │  city   │
│      timestamp      │      timestamp      │      double      │ varchar │
├─────────────────────┼─────────────────────┼──────────────────┼─────────┤
│ 2018-03-10 12:14:00 │ 2018-03-10 12:41:00 │           1620.0 │ London  │
│ 2018-03-07 15:11:00 │ 2018-03-07 15:14:00 │            180.0 │ London  │
│ 2018-03-08 13:40:00 │ 2018-03-08 13:57:00 │           1020.0 │ London  │
│ 2018-03-09 08:53:00 │ 2018-03-09 08:57:00 │            240.0 │ London  │
│ 2018-03-13 06:56:00 │ 2018-03-13 07:00:00 │            240.0 │ London  │
│ 2018-03-13 16:34:00 │ 2018-03-13 16:38:00 │            240.0 │ London  │
│ 2018-03-13 12:17:00 │ 2018-03-13 12:20:00 │            180.0 │ London  │
│ 2018-03-13 09:07:00 │ 2018-03-13 09:11:00 │            240.0 │ London  │
│ 2018-03-13 20:29:00 │ 2018-03-13 20:38:00 │            540.0 │ London  │
│ 2018-03-10 15:08:00 │ 2

In [14]:
con.close()

# Build Data Layer

In [None]:
# Load the extension
%load_ext sql

# Connect to your existing database
%sql duckdb:///cycling.duckdb

In [None]:
%%sql
SHOW TABLES


In [None]:
%%sql
SELECT * FROM london_bike_data

In [None]:
%%sql
DROP TABLE IF EXISTS london_bike_data_daily;
CREATE TABLE london_bike_data_daily AS (
SELECT 
date_trunc('month', start_date) AS start_month,
date_trunc('month', end_date) AS end_month,
# bike_id,
start_station_id,
start_station_name,
end_station_id,
end_station_name,
bike_model,
COUNT(*) AS count,
SUM(duration_seconds) AS total_duration_seconds,
AVG(duration_seconds) AS avg_duration_seconds,
MAX(duration_seconds) AS max_duration_seconds,
MIN(duration_seconds) AS min_duration_seconds,
STDDEV(duration_seconds) AS std_duration_seconds
FROM london_bike_data
GROUP BY 1,2,3,4,5,6,7);

SELECT * FROM london_bike_data_daily;

In [None]:
%%sql
SELECT COUNT(*) FROM london_bike_data_summary;

In [None]:
%sql --close duckdb:///cycling.duckdb