## Transform data into Bikeshare star schema

In [0]:
# Import necessary packages
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType
import pyspark.sql.functions as f

### fact_payment

In [0]:
# Load staging_payments table
payments = spark.table("default.staging_payments")
display(payments.limit(10))

payment_id,date,amount,rider_id
1574726,2021-02-01,9.0,61831
1574727,2021-03-01,9.0,61831
1574728,2021-04-01,9.0,61831
1574729,2021-05-01,9.0,61831
1574730,2021-06-01,9.0,61831
1574731,2021-07-01,9.0,61831
1574732,2021-08-01,9.0,61831
1574733,2021-09-01,9.0,61831
1574734,2021-10-01,9.0,61831
1574735,2021-11-01,9.0,61831


In [0]:
# Write payments data to fact_payment
spark.sql("DROP TABLE IF EXISTS default.fact_payment")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payment")

### dim_rider

In [0]:
# Load staging_riders table
riders = spark.table("default.staging_riders")
display(riders.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True
57262,Tami,Rivera,910 Lopez Pass Apt. 426,2001-06-29,2020-11-12,,True
57263,Joseph,Hodge,411 Mccoy Haven,1993-04-30,2020-05-29,,False
57264,Lauren,Brown,667 Rodriguez Ramp,2002-10-20,2020-05-10,,False
57265,Stephanie,Reed,90789 Fowler Circle,1993-03-20,2018-09-05,2019-09-01,True
57266,Brittney,Lamb,55635 Valerie Falls,1993-12-09,2020-10-10,,True


In [0]:
# Write data to dim_rider
spark.sql("DROP TABLE IF EXISTS default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")

### dim_station

In [0]:
# Load staging_stations table
stations = spark.table("default.staging_stations")
display(stations.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
# Write data to dim_station
spark.sql("DROP TABLE IF EXISTS default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

### fact_trips

In [0]:
# Load staging_trips table
trips = spark.table("default.staging_trips")
display(trips.limit(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24 15:43:33,2021-02-24 15:49:05,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01 17:47:42,2021-02-01 17:48:33,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11 18:33:53,2021-02-11 18:35:09,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27 15:13:39,2021-02-27 15:36:36,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20 08:59:42,2021-02-20 09:17:04,KP1705001026,KP1705001026,57068


In [0]:
fact_trip = trips.join(riders, trips.rider_id == riders.rider_id,"inner") \
            .withColumn('duration_min', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round(datediff( to_date("start_at"), to_date("birthday"))/365.25)) \
            .select("trip_id", riders.rider_id, "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", "duration_min", "rider_age")
            
display(fact_trip.limit(10))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_at,ended_at,duration_min,rider_age
222BB8E5059252D7,34062,classic_bike,KA1503000064,13021,2021-06-13 09:48:47,2021-06-13 10:07:23,19.0,30.0
1826E16CB5486018,5342,classic_bike,TA1306000010,13021,2021-06-21 22:59:13,2021-06-21 23:04:29,5.0,26.0
3D9B6A0A5330B04D,3714,classic_bike,TA1305000030,13021,2021-06-18 16:06:42,2021-06-18 16:12:02,5.0,26.0
07E82F5E9C9E490F,18793,classic_bike,TA1305000034,13021,2021-06-17 16:46:23,2021-06-17 17:02:45,16.0,19.0
A8E94BAECBF0C2DD,43342,docked_bike,TA1308000009,TA1308000009,2021-06-13 17:36:29,2021-06-13 18:30:39,54.0,28.0
378F4AB323AA1D14,6693,docked_bike,TA1308000009,TA1308000009,2021-06-13 13:20:10,2021-06-13 14:06:14,46.0,29.0
38AD311DC2EB1FBE,71480,docked_bike,KA1503000019,KA1503000019,2021-06-16 17:14:30,2021-06-16 17:28:34,14.0,56.0
1D466737F0B18097,50846,docked_bike,TA1308000009,TA1308000009,2021-06-27 14:51:52,2021-06-27 15:26:39,35.0,41.0
27E1142E1ACFAEFB,18951,electric_bike,13257,13257,2021-06-21 13:58:26,2021-06-21 13:58:53,0.0,22.0
67F2A115DAE77924,63987,classic_bike,TA1308000009,TA1308000009,2021-06-22 00:51:43,2021-06-22 01:08:25,17.0,37.0


In [0]:
# Write data to fact_trip
spark.sql("DROP TABLE IF EXISTS default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

### dim_date

In [0]:
# Get min date from trips
min_date = trips.selectExpr('MIN(start_at) as started_at').first().asDict()['started_at']

# Add 5 years to max date from trips
max_date = trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']

# Create date_id
expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = spark.createDataFrame([(1,)], ["date_id"])

# Add other columns
dim_date = dim_date.withColumn("dateinit", f.explode(f.expr(expression)))
dim_date = dim_date.withColumn("date", f.to_timestamp(dim_date.dateinit, "yyyy-MM-dd"))

dim_date = dim_date \
            .withColumn("day_of_month", f.dayofmonth(dim_date.date)) \
            .withColumn("day_of_year", f.dayofyear(dim_date.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_date.date)) \
            .withColumn("month", f.month(dim_date.date)) \
            .withColumn("week_of_year", f.weekofyear(dim_date.date)) \
            .withColumn("quarter", f.quarter(dim_date.date)) \
            .withColumn("year", f.year(dim_date.date)) \
            .withColumn("date_id", dim_date.date.cast(StringType())) \
            .drop(f.col("dateinit"))

display(dim_date.limit(10))

date_id,date,day_of_month,day_of_year,day_of_week,month,week_of_year,quarter,year
2021-02-01 00:00:00,2021-02-01T00:00:00Z,1,32,2,2,5,1,2021
2021-02-02 00:00:00,2021-02-02T00:00:00Z,2,33,3,2,5,1,2021
2021-02-03 00:00:00,2021-02-03T00:00:00Z,3,34,4,2,5,1,2021
2021-02-04 00:00:00,2021-02-04T00:00:00Z,4,35,5,2,5,1,2021
2021-02-05 00:00:00,2021-02-05T00:00:00Z,5,36,6,2,5,1,2021
2021-02-06 00:00:00,2021-02-06T00:00:00Z,6,37,7,2,5,1,2021
2021-02-07 00:00:00,2021-02-07T00:00:00Z,7,38,1,2,5,1,2021
2021-02-08 00:00:00,2021-02-08T00:00:00Z,8,39,2,2,6,1,2021
2021-02-09 00:00:00,2021-02-09T00:00:00Z,9,40,3,2,6,1,2021
2021-02-10 00:00:00,2021-02-10T00:00:00Z,10,41,4,2,6,1,2021


In [0]:
# Write data to dim_date
spark.sql("DROP TABLE IF EXISTS default.dim_date")
dim_date.write.format("delta").mode("overwrite").saveAsTable("default.dim_date")