# Impport Data from CSV into Delta format

## Payments

In [0]:
# Read payment csv file into a dataframe
df = spark.read.format("csv")\
    .option("header", "false")\
    .load("/Volumes/research_databricks_workspace/default/bikeshare/payments.csv")\
    .toDF("payment_id", "date", "amount", "rider_id")

# create a delta table from the dataframe
df.write.format("delta").mode("overwrite").saveAsTable("payment_table")

In [0]:
%sql
-- Step 1: Create a new payment table with the desired schema and partitioning
CREATE OR REPLACE TABLE payment_table_new (
    payment_id INT NOT NULL,
    date DATE,
    amount DECIMAL(10,2),
    rider_id INT
)
USING delta
PARTITIONED BY (date);

-- Step 2: Insert data from the old table into the new table
INSERT INTO payment_table_new
SELECT
    cast(payment_id as INT) as payment_id,
    cast(date as date) as date,
    cast(amount as decimal(10,2)) as amount,
    cast(rider_id as INT) as rider_id
FROM payment_table;

-- Step 3: Drop the old table
DROP TABLE payment_table;

-- Step 4: Drop the existing payment table if it exists
DROP TABLE IF EXISTS payment;

-- Step 5: Rename the new table to the old table name
ALTER TABLE payment_table_new RENAME TO payment;

-- Step 6 (Optional): Optimize the table
OPTIMIZE payment ZORDER BY (rider_id);

path,metrics
abfss://unity-catalog-storage@dbstorageu5dkpujmn7ulo.dfs.core.windows.net/4006262863301269/__unitystorage/catalogs/ffab9ea6-89fc-42e2-aa27-831965571766/tables/7a7829a5-0545-4dcc-b4f4-175bf86c0a3e,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 109, List(minCubeSize(107374182400), List(0, 0), List(109, 17780577), 0, List(0, 0), 0, null), null, 0, 0, 109, 109, false, 0, 0, 1732438471056, 1732438473759, 16, 0, null, List(0, 0), 4, 4, 0, 0, null)"


In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`payment` limit 5;

payment_id,date,amount,rider_id
960,2015-04-01,9.0,1040
1172,2015-04-01,9.0,1043
1374,2015-04-01,9.38,1052
1544,2015-04-01,4.5,1056
1633,2015-04-01,12.84,1057


## Riders

In [0]:

# Import data by using the schema defenition
from pyspark.sql.types import *
from pyspark.sql.functions import col

# Define schema
rider_schema = StructType([
    StructField("rider_id", IntegerType(), False),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("address", StringType(), True),
    StructField("birthday", DateType(), True),
    StructField("account_start_date", DateType(), True),
    StructField("account_end_date", DateType(), True),
    StructField("is_member", BooleanType(), True)
])

# Load the riders CSV without a header
df = spark.read.format("csv").option("sep", ",").schema(rider_schema).load("/Volumes/research_databricks_workspace/default/bikeshare/riders.csv")

# Create temporary view
df.createOrReplaceTempView("temp_riders")

# Create stations table table without constraints
spark.sql("""
CREATE OR REPLACE TABLE rider (
    rider_id INT NOT NULL,
    first VARCHAR(50),
    last VARCHAR(50),
    address VARCHAR(150),
    birthday DATE,
    account_start_date DATE,
    account_end_date DATE,
    is_member BOOLEAN
) USING DELTA
""")

# INSERT INTO statement
spark.sql("""
INSERT INTO rider 
SELECT
    rider_id,
    CAST(first AS VARCHAR(50)),
    CAST(last as VARCHAR(50)),
    CAST(address AS VARCHAR(150)),
    birthday,
    account_start_date,
    account_end_date,
    is_member
FROM temp_riders
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`rider` limit 5;

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


## Stations

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import col

# Define schema
schema = StructType([
    StructField("station_id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True)
])

# Load the stations CSV without a header
df = spark.read.format("csv").option("sep", ",").schema(schema).load("/Volumes/research_databricks_workspace/default/bikeshare/stations.csv")

# Create temporary view
df.createOrReplaceTempView("temp_stations")

# Create stations table without constraints
spark.sql("""
CREATE OR REPLACE TABLE station (
    station_id VARCHAR(50) NOT NULL,
    name VARCHAR(150),
    latitude FLOAT,
    longitude FLOAT
) USING DELTA
""")

# Insert data into the stations table
spark.sql("""
INSERT INTO station 
SELECT
    CAST(station_id AS VARCHAR(50)),
    CAST(name AS VARCHAR(150)),
    latitude,
    longitude
FROM temp_stations
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`station` limit 5;

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.0127,-87.66606
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255


## Trips

In [0]:
# Import trips CSV file 
df = spark.read.format("csv")\
    .option("header", "false")\
    .load("/Volumes/research_databricks_workspace/default/bikeshare/trips.csv")\
    .toDF("trip_id", "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "rider_id")

# create a delta table from the dataframe
df.write.format("delta").mode("overwrite").saveAsTable("trip_table")

In [0]:
%sql
-- Step 0: Drop tables if they exist
DROP TABLE IF EXISTS trip;
DROP TABLE IF EXISTS trip_table_new;

-- Step 1: Create a new table with the desired schema
CREATE TABLE trip_table_new (
    trip_id VARCHAR(50),
    rideable_type VARCHAR(20),
    started_at TIMESTAMP,
    ended_at TIMESTAMP,
    start_station_id VARCHAR(50),
    end_station_id VARCHAR(50),
    rider_id INT
);

-- Step 2: Insert data from the old table into the new table
INSERT INTO trip_table_new
SELECT 
    trip_id,
    rideable_type,
    CAST(started_at AS TIMESTAMP) AS started_at,
    CAST(ended_at AS TIMESTAMP) AS ended_at,
    CAST(start_station_id AS varchar(50)) AS start_station_id,
    CAST(end_station_id AS varchar(50)) AS end_station_id,
    CAST(rider_id AS INT) AS rider_id
FROM trip_table;

-- Step 3: Drop the old table
DROP TABLE trip_table;

-- Step 4: Rename the new table to the old table name
ALTER TABLE trip_table_new RENAME TO trip;

In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`trip` limit 5;

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
6EFDB32C10BB3796,docked_bike,2021-07-09T15:20:42Z,2021-07-09T15:23:16Z,13146,13146,2603
DD8ADD31D6960CD3,classic_bike,2021-07-24T12:05:48Z,2021-07-24T12:27:51Z,13146,13146,26447
591169F08465CA9F,docked_bike,2021-07-17T04:42:40Z,2021-07-17T04:57:16Z,13146,13146,10516
24C424E55DB4B85D,classic_bike,2021-07-09T20:09:14Z,2021-07-09T20:11:03Z,13146,13146,41018
7133D3F95D071A5A,classic_bike,2021-07-01T16:45:43Z,2021-07-01T16:57:38Z,13296,13146,21093


# Create Dimensions and Fact Tables

## Rider Dimension Table

In [0]:
%sql
-- Create Rider Dimension Table without primary key constraint
drop table if exists dim_rider;
create table dim_rider (
 rider_sk bigint generated always as identity, -- Surrogate key
 rider_id int, -- Natural/business key
 first_name varchar(50),
 last_name varchar(50),
 address varchar(255),
 birthday date,
 account_start_date date,
 account_end_date date,
 rider_age_account_start int, -- New field
 is_member boolean,
 effective_start_date date, -- SCD Type 2 fields
 effective_end_date date,
 is_current boolean
) using delta;

-- Populate Rider Dimension Table from existing source table
insert into dim_rider (
    rider_id, 
    first_name, 
    last_name, 
    address, 
    birthday, 
    account_start_date, 
    account_end_date, 
    rider_age_account_start,
    is_member, 
    effective_start_date, 
    effective_end_date, 
    is_current
)
select 
    rider_id, 
    first as first_name, 
    last as last_name, 
    address, 
    birthday, 
    account_start_date, 
    account_end_date,
    datediff(year, birthday, account_start_date) as rider_age_account_start,
    is_member,
    account_start_date, 
    coalesce(account_end_date, '9999-12-31'),
    case when account_end_date is null then true else false end
from rider;

num_affected_rows,num_inserted_rows
75000,75000


In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`dim_rider` limit 5;

rider_sk,rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,rider_age_account_start,is_member,effective_start_date,effective_end_date,is_current
1,1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,30,True,2019-04-23,9999-12-31,True
2,1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,43,True,2019-11-01,2020-09-01,False
3,1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,23,True,2022-02-04,9999-12-31,True
4,1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,20,False,2019-08-26,9999-12-31,True
5,1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,50,True,2019-09-14,9999-12-31,True


## Station Dimension Table

In [0]:
%sql
-- Create Station Dimension Table
drop table if exists dim_station;
create table dim_station (
    station_sk bigint generated always as identity,  -- Surrogate key
    station_id varchar(50),                          -- Natural/business key
    station_name varchar(100),
    latitude float,
    longitude float,
    effective_start_date date,                       -- SCD Type 2 fields
    effective_end_date date,
    is_current boolean
) using delta;

-- Populate Station Dimension Table from existing source table
insert into dim_station (station_id, station_name, latitude, longitude, effective_start_date, effective_end_date, is_current)
select station_id, name as station_name, latitude, longitude, current_date(), '9999-12-31', true
from station;

num_affected_rows,num_inserted_rows
838,838


In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`dim_station` limit 5;

station_sk,station_id,station_name,latitude,longitude,effective_start_date,effective_end_date,is_current
1,525,Glenwood Ave & Touhy Ave,42.0127,-87.66606,2024-11-24,9999-12-31,True
2,KA1503000012,Clark St & Lake St,41.885796,-87.6311,2024-11-24,9999-12-31,True
3,637,Wood St & Chicago Ave,41.895634,-87.672066,2024-11-24,9999-12-31,True
4,13216,State St & 33rd St,41.834732,-87.625824,2024-11-24,9999-12-31,True
5,18003,Fairbanks St & Superior St,41.89581,-87.620255,2024-11-24,9999-12-31,True


## Dim Date

In [0]:
%sql
-- Create DateTime Dimension Table for trips
CREATE OR REPLACE TABLE dim_date (
    date_sk BIGINT GENERATED ALWAYS AS IDENTITY, 
    datetime_id TIMESTAMP,
    date_id DATE,
    time_id STRING,                     -- Format: HH:mm
    
    -- Time Components
    hour_24 INT,                        -- 0-23
    time_of_day_name STRING,            -- Early Morning, Morning, Afternoon, Evening, Night
    is_peak_time STRING,                -- Y/N (customizable peak hours)
    
    -- Calendar Components
    calendar_year INT,
    calendar_quarter INT,
    calendar_quarter_name STRING,       -- Q1, Q2, Q3, Q4
    
    -- Month
    calendar_month INT,                 -- 1 to 12
    calendar_month_name STRING,         -- January, February, etc.
    
    -- Day
    calendar_day INT,                   -- 1 to 31
    calendar_day_name STRING,           -- Sunday, Monday, etc.

    -- Flags
    week_day_flag STRING,               -- Y/N
    weekend_flag STRING,                -- Y/N
    holiday_flag STRING,                -- Y/N
    holiday_name STRING,
    
    -- Business Period
    fiscal_year INT,
    fiscal_quarter INT,
    fiscal_quarter_name STRING,
    fiscal_month INT,
    
    -- Additional Flags
    end_of_quarter_flag STRING,         -- Y/N
    end_of_year_flag STRING             -- Y/N
) USING DELTA;

-- Generate datetime sequence and populate the dimension table
WITH datetime_range AS (
    SELECT DISTINCT full_datetime
    FROM (
        SELECT DISTINCT started_at AS full_datetime FROM trip
        UNION ALL
        SELECT DISTINCT ended_at AS full_datetime FROM trip
        UNION ALL
        SELECT DISTINCT CAST(date AS TIMESTAMP) AS full_datetime FROM payment
    )
)
INSERT INTO dim_date (
    datetime_id,
    date_id,
    time_id,
    hour_24,
    time_of_day_name,
    is_peak_time,
    calendar_year,
    calendar_quarter,
    calendar_quarter_name,
    calendar_month,
    calendar_month_name,
    calendar_day,
    calendar_day_name,
    week_day_flag,
    weekend_flag,
    holiday_flag,
    holiday_name,
    fiscal_year,
    fiscal_quarter,
    fiscal_quarter_name,
    fiscal_month,
    end_of_quarter_flag,
    end_of_year_flag
)
SELECT
    full_datetime AS datetime_id,
    TO_DATE(full_datetime) AS date_id,
    DATE_FORMAT(full_datetime, 'HH:mm') AS time_id,
    HOUR(full_datetime) AS hour_24,
    CASE 
        WHEN HOUR(full_datetime) BETWEEN 0 AND 4 THEN 'Late Night'
        WHEN HOUR(full_datetime) BETWEEN 5 AND 8 THEN 'Early Morning'
        WHEN HOUR(full_datetime) BETWEEN 9 AND 11 THEN 'Morning'
        WHEN HOUR(full_datetime) BETWEEN 12 AND 16 THEN 'Afternoon'
        WHEN HOUR(full_datetime) BETWEEN 17 AND 20 THEN 'Evening'
        ELSE 'Night'
    END AS time_of_day_name,
    CASE 
        WHEN HOUR(full_datetime) BETWEEN 7 AND 9 
          OR HOUR(full_datetime) BETWEEN 16 AND 18 THEN 'Y'
        ELSE 'N'
    END AS is_peak_time,
    YEAR(full_datetime) AS calendar_year,
    QUARTER(full_datetime) AS calendar_quarter,
    CONCAT('Q', QUARTER(full_datetime)) AS calendar_quarter_name,
    MONTH(full_datetime) AS calendar_month,
    DATE_FORMAT(full_datetime, 'MMMM') AS calendar_month_name,
    DAYOFMONTH(full_datetime) AS calendar_day,
    DATE_FORMAT(full_datetime, 'EEEE') AS calendar_day_name,
    CASE 
        WHEN DAYOFWEEK(full_datetime) BETWEEN 2 AND 6 THEN 'Y'
        ELSE 'N'
    END AS week_day_flag,
    CASE 
        WHEN DAYOFWEEK(full_datetime) IN (1, 7) THEN 'Y'
        ELSE 'N'
    END AS weekend_flag,
    CASE 
        WHEN DATE_FORMAT(full_datetime, 'MMdd') IN ('0101', '0704', '1225', '1231') THEN 'Y'
        ELSE 'N'
    END AS holiday_flag,
    CASE 
        WHEN DATE_FORMAT(full_datetime, 'MMdd') = '0101' THEN 'New Year''s Day'
        WHEN DATE_FORMAT(full_datetime, 'MMdd') = '0704' THEN 'Independence Day'
        WHEN DATE_FORMAT(full_datetime, 'MMdd') = '1225' THEN 'Christmas Day'
        WHEN DATE_FORMAT(full_datetime, 'MMdd') = '1231' THEN 'New Year''s Eve'
        ELSE NULL
    END AS holiday_name,
    CASE 
        WHEN MONTH(full_datetime) >= 4 THEN YEAR(full_datetime)
        ELSE YEAR(full_datetime) - 1
    END AS fiscal_year,
    CASE 
        WHEN MONTH(full_datetime) BETWEEN 4 AND 6 THEN 1
        WHEN MONTH(full_datetime) BETWEEN 7 AND 9 THEN 2
        WHEN MONTH(full_datetime) BETWEEN 10 AND 12 THEN 3
        ELSE 4
    END AS fiscal_quarter,
    CASE 
        WHEN MONTH(full_datetime) BETWEEN 4 AND 6 THEN 'Q1'
        WHEN MONTH(full_datetime) BETWEEN 7 AND 9 THEN 'Q2'
        WHEN MONTH(full_datetime) BETWEEN 10 AND 12 THEN 'Q3'
        ELSE 'Q4'
    END AS fiscal_quarter_name,
    CASE 
        WHEN MONTH(full_datetime) >= 4 THEN MONTH(full_datetime) - 3
        ELSE MONTH(full_datetime) + 9
    END AS fiscal_month,
    CASE 
        WHEN MONTH(full_datetime) IN (3, 6, 9, 12) AND 
             DAYOFMONTH(full_datetime) = DAY(LAST_DAY(full_datetime)) THEN 'Y'
        ELSE 'N'
    END AS end_of_quarter_flag,
    CASE 
        WHEN MONTH(full_datetime) = 12 AND 
             DAYOFMONTH(full_datetime) = 31 THEN 'Y'
        ELSE 'N'
    END AS end_of_year_flag
FROM datetime_range;


num_affected_rows,num_inserted_rows
6880277,6880277


In [0]:
%sql
-- Create partitioned and optimized dim_date table
CREATE OR REPLACE TABLE dim_date
USING DELTA
PARTITIONED BY (date_id)
AS
SELECT 
    datetime_id,
    date_id,
    time_id,
    hour_24,
    time_of_day_name,
    is_peak_time,
    calendar_year,
    calendar_quarter,
    calendar_quarter_name,
    calendar_month,
    calendar_month_name,
    calendar_day,
    calendar_day_name,
    week_day_flag,
    weekend_flag,
    holiday_flag,
    holiday_name,
    fiscal_year,
    fiscal_quarter,
    fiscal_quarter_name,
    fiscal_month,
    end_of_quarter_flag,
    end_of_year_flag
FROM dim_date;

-- Z-ORDER the table by datetime_id
OPTIMIZE dim_date
ZORDER BY (datetime_id);



path,metrics
abfss://unity-catalog-storage@dbstorageu5dkpujmn7ulo.dfs.core.windows.net/4006262863301269/__unitystorage/catalogs/ffab9ea6-89fc-42e2-aa27-831965571766/tables/d141b830-57da-4921-ab3f-4cb710b2900c,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 462, List(minCubeSize(107374182400), List(0, 0), List(462, 70749316), 0, List(0, 0), 0, null), null, 0, 0, 462, 462, false, 0, 0, 1732441138631, 1732441140510, 8, 0, null, List(0, 0), 23, 23, 0, 0, null)"


## Payment Fact Table

In [0]:
%sql
-- Create Payment Fact Table
drop table if exists fact_payment;
create table fact_payment (
    payment_sk bigint generated always as identity,  -- Surrogate key
    payment_id int,                                 -- Natural/business key
    payment_date date,
    rider_id bigint,
    payment_amount decimal(10, 2),                  -- Facts/measures
    created_timestamp timestamp,
    created_by varchar(50)
) using delta;

-- Populate Payment Fact Table with optimized joins
INSERT INTO fact_payment (payment_id, payment_date, rider_id, payment_amount, created_timestamp, created_by)
SELECT 
    p.payment_id,
    p.`date` AS payment_date,
    r.rider_id,
    p.amount AS payment_amount,
    current_timestamp() AS created_timestamp,
    'SYSTEM' AS created_by
FROM payment p
INNER JOIN dim_rider r ON p.rider_id = r.rider_id


num_affected_rows,num_inserted_rows
1946607,1946607


In [0]:
%sql
use catalog `research_databricks_workspace`; select * from `default`.`fact_payment` limit 5;

payment_sk,payment_id,payment_date,rider_id,payment_amount,created_timestamp,created_by
1,303070,2013-02-01,12687,12.9,2024-11-24T09:45:44.289Z,SYSTEM
2,1019993,2013-03-01,40408,9.0,2024-11-24T09:45:44.289Z,SYSTEM
3,1023993,2013-03-01,40565,9.0,2024-11-24T09:45:44.289Z,SYSTEM
4,1077572,2013-03-01,42593,9.0,2024-11-24T09:45:44.289Z,SYSTEM
5,1083874,2013-03-01,42824,9.0,2024-11-24T09:45:44.289Z,SYSTEM


In [0]:
%sql
select count(*) as records_count, count(distinct payment_id) as distinct_payments 
from fact_payment;

records_count,distinct_payments
1946607,1946607


## Trip Fact Using PySpark

In [0]:
from pyspark.sql.functions import col, unix_timestamp, round

# First join trips with rider, and then join the result with stations
fact_trips = trips.alias("t") \
    .join(rider.alias("r"), col("t.rider_id") == col("r.rider_id"), "left") \
    .join(stations.alias("s_start"), col("t.start_station_id") == col("s_start.station_id"), "left") \
    .join(stations.alias("s_end"), col("t.end_station_id") == col("s_end.station_id"), "left") \
    .withColumn('duration_in_minutes', round((unix_timestamp(col("t.ended_at")) - unix_timestamp(col('t.started_at'))) / 60)) \
    .withColumn('rider_age', round((unix_timestamp(col("r.account_start_date")) - unix_timestamp(col('r.birthday'))) / 3600 / 24 / 365.25)) \
    .withColumn('start_time_id', col('t.started_at')) \
    .withColumn('end_time_id', col('t.ended_at')) \
    .select("t.trip_id", "t.rider_id", "rideable_type", 
            "start_station_id", col("s_start.name").alias("start_station_name"), 
            "end_station_id", col("s_end.name").alias("end_station_name"),
            "start_time_id", "end_time_id", "duration_in_minutes", "rider_age")

display(fact_trips.limit(10))


trip_id,rider_id,rideable_type,start_station_id,start_station_name,end_station_id,end_station_name,start_time_id,end_time_id,duration_in_minutes,rider_age
6EFDB32C10BB3796,2603,docked_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-09T15:20:42Z,2021-07-09T15:23:16Z,3.0,39.0
DD8ADD31D6960CD3,26447,classic_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-24T12:05:48Z,2021-07-24T12:27:51Z,22.0,36.0
591169F08465CA9F,10516,docked_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-17T04:42:40Z,2021-07-17T04:57:16Z,15.0,37.0
24C424E55DB4B85D,41018,classic_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-09T20:09:14Z,2021-07-09T20:11:03Z,2.0,16.0
7133D3F95D071A5A,21093,classic_bike,13296,Ashland Ave & Wrightwood Ave,13146,Clark St & Armitage Ave,2021-07-01T16:45:43Z,2021-07-01T16:57:38Z,12.0,31.0
45A945E964718AA0,49400,classic_bike,13296,Ashland Ave & Wrightwood Ave,13146,Clark St & Armitage Ave,2021-07-15T18:08:53Z,2021-07-15T18:23:12Z,14.0,23.0
B5C228AFC20BD3F0,37485,classic_bike,13196,Carpenter St & Huron St,13146,Clark St & Armitage Ave,2021-07-22T18:12:12Z,2021-07-22T18:31:03Z,19.0,53.0
FB03057FE1489422,59834,classic_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-17T11:54:50Z,2021-07-17T11:56:37Z,2.0,33.0
BDE673E69DB2CCDF,70883,docked_bike,605,University Library (NU),605,University Library (NU),2021-07-20T18:40:16Z,2021-07-20T19:35:39Z,55.0,26.0
DDD461A1EBFDA30E,72862,classic_bike,13146,Clark St & Armitage Ave,13146,Clark St & Armitage Ave,2021-07-10T14:23:05Z,2021-07-10T14:24:57Z,2.0,31.0


In [0]:
# Write data to fact_trip
spark.sql("drop table if exists default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

In [0]:
%sql
select * --count(*) as total_records, count(distinct trip_id) as unique_trips
from fact_trip;

trip_id,rider_id,rideable_type,start_station_id,start_station_name,end_station_id,end_station_name,start_time_id,end_time_id,duration_in_minutes,rider_age
F6F309843C09CAAC,70456,classic_bike,TA1306000026,Racine Ave & Fullerton Ave,TA1307000041,Lake Shore Dr & Wellington Ave,2021-09-02T18:36:00Z,2021-09-02T18:48:43Z,13.0,35.0
BD496FA19316E89C,24732,classic_bike,TA1306000026,Racine Ave & Fullerton Ave,TA1309000019,Lakeview Ave & Fullerton Pkwy,2021-09-02T17:23:28Z,2021-09-02T17:32:43Z,9.0,23.0
657E606A35206CC1,11875,classic_bike,15642,California Ave & Fletcher St,TA1305000041,Damen Ave & Pierce Ave,2021-09-05T23:13:43Z,2021-09-05T23:30:26Z,17.0,28.0
B10DEEC4A5D90EB3,33064,electric_bike,TA1307000001,Greenview Ave & Fullerton Ave,TA1308000049,Wells St & Evergreen Ave,2021-09-09T18:26:00Z,2021-09-09T18:40:17Z,14.0,24.0
268A00298A05078B,71976,classic_bike,TA1306000026,Racine Ave & Fullerton Ave,TA1308000049,Wells St & Evergreen Ave,2021-09-06T18:49:27Z,2021-09-06T19:02:22Z,13.0,33.0
41E9E476004232FD,43348,electric_bike,13235,Southport Ave & Waveland Ave,TA1305000041,Damen Ave & Pierce Ave,2021-09-04T21:41:58Z,2021-09-04T22:03:02Z,21.0,56.0
3F89D8A2BEE07478,1159,classic_bike,13235,Southport Ave & Waveland Ave,TA1307000041,Lake Shore Dr & Wellington Ave,2021-09-11T16:10:38Z,2021-09-11T16:36:36Z,26.0,29.0
0BE315967E011514,75374,electric_bike,13235,Southport Ave & Waveland Ave,TA1309000019,Lakeview Ave & Fullerton Pkwy,2021-09-22T21:53:46Z,2021-09-22T22:11:58Z,18.0,34.0
AE58DD50675A5E4E,3290,classic_bike,TA1306000026,Racine Ave & Fullerton Ave,TA1308000049,Wells St & Evergreen Ave,2021-09-05T00:41:37Z,2021-09-05T00:54:15Z,13.0,22.0
B435E6AE404F547E,31610,classic_bike,TA1306000026,Racine Ave & Fullerton Ave,TA1307000041,Lake Shore Dr & Wellington Ave,2021-09-11T17:23:49Z,2021-09-11T17:37:19Z,14.0,33.0


# Data Analysis

### How much time is spent per ride?

In [0]:
%sql
-- On average ride lasts 22 munites
select round(avg(t.duration_in_minutes),1) as average_ride_duration
from fact_trip t;


average_ride_duration
21.8


In [0]:
%sql

-- On average ride by different starting station.  There is a big difference in average ride duration between stations with Throop St & 52nd St having the average duration of 9 hours
select t.start_station_name,round(avg(t.duration_in_minutes)/60,1) as average_ride_duration_hours
from fact_trip t
group by t.start_station_name
order by average_ride_duration_hours desc;




start_station_name,average_ride_duration_hours
Throop St & 52nd St,9.0
South Chicago Ave & Elliot Ave,8.9
Wabash Ave & 83rd St,6.8
Racine Ave & 65th St,5.9
Central Ave & Harrison St,5.3
Western Ave & 111th St,5.1
Clyde Ave & 87th St,4.8
Kenton Ave & Madison St,4.7
Latrobe Ave & Chicago Ave,4.7
Eberhart Ave & 131st St,4.4


In [0]:
%sql
-- Riders aged between 19-29 have the longest average ride duration
select case when t.rider_age < 19 then 'under 19' when t.rider_age < 30 then '19-29' when t.rider_age < 40 then '30-39' when t.rider_age < 50 then '40-49' else '50+' end as rider_age_group,
round(avg(t.duration_in_minutes),1) as average_ride_duration_munutes
from fact_trip t
group by rider_age_group
order by rider_age_group desc;

rider_age_group,average_ride_duration_munutes
under 19,21.7
50+,21.4
40-49,21.4
30-39,21.9
19-29,22.0


In [0]:
%sql
-- Members have longer average ride duration than non-members
select r.is_member, 
round(avg(t.duration_in_minutes),1) as average_ride_duration_munutes
from fact_trip t join dim_rider r on t.rider_id = r.rider_id
group by r.is_member 
order by average_ride_duration_munutes desc;

is_member,average_ride_duration_munutes
True,21.9
False,21.3


## How much money is spent?

In [0]:
%sql
-- Overall $19,457,105.25 in revenue was generated
SELECT FORMAT_NUMBER(SUM(payment_amount), 2) AS total_revenue
FROM fact_payment p
JOIN dim_date d
ON p.payment_date = d.datetime_id;


total_revenue
19457105.25


In [0]:
%sql
-- It appears that revenue trend is on the rise overtime.  The dip in 2022 is due to the fact that only two month of the year are included.
SELECT d.calendar_year, FORMAT_NUMBER(SUM(payment_amount), 2) AS total_revenue
FROM fact_payment p join dim_date d on p.payment_date = d.datetime_id
group by d.calendar_year
order by d.calendar_year;

calendar_year,total_revenue
2013,53693.34
2014,227402.95
2015,477233.22
2016,825120.81
2017,1308372.54
2018,2000105.5
2019,2978658.79
2020,4315449.4
2021,6081098.25
2022,1189970.45


In [0]:
%sql
-- in 2021 the highest revenue was generated in Q4
SELECT d.calendar_year, d.calendar_quarter, FORMAT_NUMBER(SUM(payment_amount), 2) AS total_revenue
FROM fact_payment p join dim_date d on p.payment_date = d.datetime_id
where d.calendar_year = 2021
group by d.calendar_year, d.calendar_quarter
order by d.calendar_quarter;

calendar_year,calendar_quarter,total_revenue
2021,1,1341105.6
2021,2,1457677.56
2021,3,1581287.96
2021,4,1701027.13


In [0]:
%sql
-- in 2021 the highest revenue was in December month
SELECT d.calendar_year,  d.calendar_month_name, 
      FORMAT_NUMBER(SUM(payment_amount), 2) AS total_revenue
FROM fact_payment p 
JOIN dim_date d 
ON p.payment_date = d.datetime_id
WHERE d.calendar_year = 2021
GROUP BY d.calendar_year,  d.calendar_month_name  
ORDER BY d.calendar_month_name;

calendar_year,calendar_month_name,total_revenue
2021,April,471751.66
2021,August,527280.76
2021,December,579229.92
2021,February,447430.37
2021,January,434891.06
2021,July,513013.67
2021,June,500206.65
2021,March,458784.17
2021,May,485719.25
2021,November,566140.61


In [0]:
%sql
-- Revenue by age at account start. It appears that the highest amount paid was by a rider who was 27 years of age at the time of account start
SELECT 
    p.rider_id, 
    FLOOR(DATEDIFF(r.account_start_date, r.birthday) / 365) AS rider_age_at_payment, 
    SUM(payment_amount) AS total_payment
FROM 
    research_databricks_workspace.default.fact_payment p 
JOIN 
    research_databricks_workspace.default.dim_rider r 
ON 
    p.rider_id = r.rider_id 
GROUP BY 
    p.rider_id, 
    FLOOR(DATEDIFF(r.account_start_date, r.birthday) / 365)
ORDER BY 
    total_payment DESC
LIMIT 5;

rider_id,rider_age_at_payment,total_payment
65089,27,1658.83
63569,14,1608.3
11368,10,1600.9
19270,30,1594.47
39136,11,1586.01


## Extra Credit

In [0]:
%sql
-- Revenue by rider and trip count by month
WITH trip_count AS (
    SELECT 
        t.rider_id, 
        ds.calendar_month_name AS month_name, 
        ds.calendar_year,
        COUNT(t.trip_id) AS trip_count
    FROM research_databricks_workspace.default.fact_trip t
    JOIN research_databricks_workspace.default.dim_date ds 
        ON t.start_time_id = ds.datetime_id
    GROUP BY t.rider_id, ds.calendar_month_name, ds.calendar_year
)
SELECT 
    t.rider_id, 
    t.trip_count as trip_count_per_month, 
    SUM(p.payment_amount) AS total_payment
FROM research_databricks_workspace.default.fact_payment p 
JOIN trip_count t 
    ON p.rider_id = t.rider_id
GROUP BY t.rider_id, t.trip_count
LIMIT 5;

rider_id,trip_count_per_month,total_payment
34484,13,567.0
14308,11,607.65
4694,4,162.0
33135,22,432.0
62350,17,252.0


In [0]:
%sql
-- Based on how many minutes the rider spends on a bike per month
select t.rider_id, dt.calendar_month, dt.calendar_year, sum(t.duration_in_minutes) as total_minutes_ride
from research_databricks_workspace.default.fact_trip t 
join research_databricks_workspace.default.dim_date dt 
on t.start_time_id = dt.datetime_id
group by 
t.rider_id, dt.calendar_month, dt.calendar_year
limit 5
;

rider_id,calendar_month,calendar_year,total_minutes_ride
58321,7,2021,253.0
49393,7,2021,222.0
29656,7,2021,736.0
18346,7,2021,1671.0
61581,7,2021,9931.0
