Transforming staging table data to star schema

In [0]:
# Importing all necessary things
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.window import Window

Creating fact_payment table

In [0]:
payments = spark.table("default.staging_payments")
display(payments.limit(5))

payment_id,date,amount,rider_id
802962,2018-03-01,9.0,32004
802963,2018-04-01,9.0,32004
802964,2018-05-01,9.0,32004
802965,2018-06-01,9.0,32004
802966,2014-08-01,9.0,32005


In [0]:
spark.sql("DROP TABLE IF EXISTS default.fact_payment")
payments.dropDuplicates(["payment_id"]) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.fact_payment")

Creating dim_rider table

In [0]:
riders = spark.table("default.staging_riders")
display(riders.limit(5))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True


In [0]:
spark.sql("DROP TABLE IF EXISTS default.dim_rider")
riders.dropDuplicates(["rider_id"]) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_rider")

Creating dim_station table

In [0]:
stations = spark.table("default.staging_stations")
display(stations.limit(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
spark.sql("DROP TABLE IF EXISTS default.dim_station")
stations.dropDuplicates(["station_id"]) \
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_station")

Creating fact_trip table

In [0]:
trips = spark.table("default.staging_trips")
display(trips.limit(5))

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12 16:14:56,2021-02-12 16:21:43,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14 17:52:38,2021-02-14 18:12:09,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09 19:10:18,2021-02-09 19:19:10,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02 17:49:41,2021-02-02 17:54:06,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23 15:07:23,2021-02-23 15:22:37,13216,TA1309000055,39608


In [0]:
# duration is in minutes
fact_trip = trips.join(riders, trips.rider_id == riders.rider_id, "inner") \
    .withColumnRenamed("started_at", "start_at") \
    .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp("start_at")).cast("bigint")/60)) \
    .withColumn('rider_age', round(datediff(to_date("start_at"), to_date("birthday"))/365.25)) \
    .select("trip_id", riders.rider_id, "rideable_type", "start_at", "ended_at", "duration", "start_station_id", "end_station_id", "rider_age")

display(fact_trip.limit(5))

trip_id,rider_id,rideable_type,start_at,ended_at,duration,start_station_id,end_station_id,rider_age
222BB8E5059252D7,34062,classic_bike,2021-06-13 09:48:47,2021-06-13 10:07:23,19.0,KA1503000064,13021,30.0
1826E16CB5486018,5342,classic_bike,2021-06-21 22:59:13,2021-06-21 23:04:29,5.0,TA1306000010,13021,26.0
3D9B6A0A5330B04D,3714,classic_bike,2021-06-18 16:06:42,2021-06-18 16:12:02,5.0,TA1305000030,13021,26.0
07E82F5E9C9E490F,18793,classic_bike,2021-06-17 16:46:23,2021-06-17 17:02:45,16.0,TA1305000034,13021,19.0
A8E94BAECBF0C2DD,43342,docked_bike,2021-06-13 17:36:29,2021-06-13 18:30:39,54.0,TA1308000009,TA1308000009,28.0


In [0]:
spark.sql("DROP TABLE IF EXISTS default.fact_trip")
fact_trip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.fact_trip")

Creating dim_date table

In [0]:
window_spec = Window.orderBy("date")
unique_dates = payments.dropDuplicates(["date"])
dim_date = unique_dates.withColumn("date_id", f.row_number().over(window_spec))
dim_date = dim_date.select(
    "date_id",
    "date",
    f.dayofweek("date").alias("day_of_week"),
    f.dayofmonth("date").alias("day_of_month"),
    f.weekofyear("date").alias("week_of_year"),
    f.quarter("date").alias("quarter"),
    f.month("date").alias("month"),
    f.year("date").alias("year")
)

display(dim_date.limit(5))

date_id,date,day_of_week,day_of_month,week_of_year,quarter,month,year
1,2013-02-01,6,1,5,1,2,2013
2,2013-03-01,6,1,9,1,3,2013
3,2013-04-01,2,1,14,2,4,2013
4,2013-05-01,4,1,18,2,5,2013
5,2013-06-01,7,1,22,2,6,2013


In [0]:
spark.sql("DROP TABLE IF EXISTS default.dim_date")
dim_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.dim_date")