
# Extract Raw Data

In the first step we extract the raw csv data from DBFS and also provide a schema, so that the datatypes are represented correctly.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, BooleanType, TimestampType

schema = StructType([
    StructField("rider_id", IntegerType(), False),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("address", StringType(), True),
    StructField("birthday", DateType(), True),
    StructField("account_start_date", DateType(), True),
    StructField("account_end_date", DateType(), True),
    StructField("is_member", BooleanType(), True)
])
riders_csv = spark.read.csv("/FileStore/raw_data/riders.csv", schema=schema)
display(riders_csv.limit(10))


rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


In [0]:
schema = StructType([
    StructField("payment_id", IntegerType(), False),
    StructField("date", DateType(), True),
    StructField("amount", DoubleType(), True),
    StructField("rider_id", IntegerType(), True)
])
payments_csv = spark.read.csv("/FileStore/raw_data/payments.csv", schema=schema)
display(payments_csv.limit(10))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:
schema = StructType([
    StructField("station_id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("latitude", DoubleType(), False),
    StructField("longitude", DoubleType(), False)
])
stations_csv = spark.read.csv("/FileStore/raw_data/stations.csv", schema=schema)
display(stations_csv.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
schema = StructType([
    StructField("trip_id", StringType(), False),
    StructField("rideable_type", StringType(), True),
    StructField("start_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("rider_id", IntegerType(), True)
])
trips_csv = spark.read.csv("/FileStore/raw_data/trips.csv", schema=schema)
display(trips_csv.limit(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33Z,2021-02-24T15:49:05Z,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01T17:47:42Z,2021-02-01T17:48:33Z,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11T18:33:53Z,2021-02-11T18:35:09Z,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27T15:13:39Z,2021-02-27T15:36:36Z,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20T08:59:42Z,2021-02-20T09:17:04Z,KP1705001026,KP1705001026,57068



#### Creating Bronze Data Stores in Delta Lake

In [0]:
trips_csv.write.mode("overwrite").format("delta").save("/FileStore/delta/bronze_trips")
payments_csv.write.mode("overwrite").format("delta").save("/FileStore/delta/bronze_payments")
stations_csv.write.mode("overwrite").format("delta").save("/FileStore/delta/bronze_stations")
riders_csv.write.mode("overwrite").format("delta").save("/FileStore/delta/bronze_riders")


# Load Bronze Data from Delta Lake

Next the data will be loaded as bronze layer data.

In [0]:
spark.sql("DROP TABLE IF EXISTS Bronze_Trips")
spark.sql("DROP TABLE IF EXISTS Bronze_Payments")
spark.sql("DROP TABLE IF EXISTS Bronze_Stations")
spark.sql("DROP TABLE IF EXISTS Bronze_Riders")

spark.sql("CREATE TABLE Bronze_Trips USING DELTA LOCATION '/FileStore/delta/bronze_trips'")
spark.sql("CREATE TABLE Bronze_Payments USING DELTA LOCATION '/FileStore/delta/bronze_payments'")
spark.sql("CREATE TABLE Bronze_Stations USING DELTA LOCATION '/FileStore/delta/bronze_stations'")
spark.sql("CREATE TABLE Bronze_Riders USING DELTA LOCATION '/FileStore/delta/bronze_riders'")


DataFrame[]


# Transform Bronze Data into Gold Data as Star Schema

Subsequently the bronze data are being transformed into gold layer data by denormalzing it and reorganizing it as a star schema.

 
### Dimension: Station

In [0]:
%%file sql/dim_station.sql

SELECT 
    station_id,
    name,
    latitude,
    longitude
FROM Bronze_Stations 

Overwriting sql/dim_station.sql


In [0]:
with open('sql/dim_station.sql', 'r') as f:
    gold_dim_station = spark.sql(f.read())

display(gold_dim_station.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001



### Dimension Rider

In [0]:
%%file sql/dim_rider.sql

SELECT 
    rider_id, 
    address,
    first as first_name,
    last as last_name,
    birthday,
    is_member,
    account_start_date,
    account_end_date
FROM Bronze_Riders 

Overwriting sql/dim_rider.sql


In [0]:
with open('sql/dim_rider.sql', 'r') as f:
    gold_dim_rider = spark.sql(f.read())

display(gold_dim_rider.limit(10))

rider_id,address,first_name,last_name,birthday,is_member,account_start_date,account_end_date
57257,9928 Hunter Ranch,Mark,Mcfarland,1982-02-01,False,2020-12-05,
57258,20036 Barrett Summit Apt. 714,Mark,Davis,1963-07-28,True,2017-07-12,
57259,089 Sarah Square,Bryan,Manning,1984-11-05,True,2018-08-10,
57260,3157 Nicole Ferry Apt. 826,Michele,Rowe,1997-09-21,True,2016-06-03,
57261,312 Jessica Wells,John,Mckenzie,2002-10-13,True,2016-02-01,2018-07-01
57262,910 Lopez Pass Apt. 426,Tami,Rivera,2001-06-29,True,2020-11-12,
57263,411 Mccoy Haven,Joseph,Hodge,1993-04-30,False,2020-05-29,
57264,667 Rodriguez Ramp,Lauren,Brown,2002-10-20,False,2020-05-10,
57265,90789 Fowler Circle,Stephanie,Reed,1993-03-20,True,2018-09-05,2019-09-01
57266,55635 Valerie Falls,Brittney,Lamb,1993-12-09,True,2020-10-10,



### Dimension Time

In [0]:
%%file sql/dim_time.sql

SELECT DISTINCT
    date_format(a_timestamp, 'HHmmss') as time_string,
    CAST(a_timestamp AS timestamp) as time,
    hour(a_timestamp) as hour,
    minute(a_timestamp) as minute,
    second(a_timestamp) as second
FROM (
    SELECT start_at as a_timestamp FROM Bronze_Trips
    UNION ALL
    SELECT ended_at as a_timestamp FROM Bronze_Trips
) tmp

Overwriting sql/dim_time.sql


In [0]:
with open('sql/dim_time.sql', 'r') as f:
    gold_dim_time = spark.sql(f.read())

display(gold_dim_time.limit(10))

time_string,time,hour,minute,second
151519,2021-08-22T15:15:19Z,15,15,19
222851,2021-08-26T22:28:51Z,22,28,51
173420,2021-08-23T17:34:20Z,17,34,20
131207,2021-08-28T13:12:07Z,13,12,7
154510,2021-08-06T15:45:10Z,15,45,10
126,2021-08-22T00:01:26Z,0,1,26
160853,2021-08-23T16:08:53Z,16,8,53
221327,2021-08-09T22:13:27Z,22,13,27
205051,2021-08-18T20:50:51Z,20,50,51
184756,2021-08-02T18:47:56Z,18,47,56


### Dimension Date

As Spark SQL does not support any native function to get the last day of a quarter we need to write our own *User Defined Function* (**UDF**) for that.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
import datetime

def last_day_of_quarter(dt):
    if dt is None:
        return None
    q_month = ((dt.month - 1) // 3 + 1) * 3
    if q_month == 12:
        return datetime.date(dt.year, 12, 31)
    next_month_first = datetime.date(dt.year, q_month + 1, 1)
    return next_month_first - datetime.timedelta(days=1)

spark.udf.register("last_day_of_quarter_udf", last_day_of_quarter, DateType())

<function __main__.last_day_of_quarter(dt)>

In [0]:
%%file sql/dim_date.sql

SELECT DISTINCT
    date_format(a_date, 'yyyyMMdd') AS date_string,
    CAST(a_date AS date) AS date,
    year(a_date) AS year,
    quarter(a_date) AS quarter,
    month(a_date) AS month,
    weekofyear(a_date) AS week,
    day(a_date) AS day,
    dayofweek(a_date) AS weekday,
    date_format(a_date, 'EEEE') AS weekday_name,
    date_format(a_date, 'MMMM') AS month_name,
    trunc(a_date, 'YEAR') AS first_of_year,
    date_add(trunc(a_date, 'YEAR'), CASE 
        WHEN ((year(a_date) % 4 = 0 AND year(a_date) % 100 <> 0) OR year(a_date) % 400 = 0) THEN 366
        ELSE 365
    END - 1) AS last_of_year,
    trunc(a_date, 'QUARTER') AS first_of_quarter,
    last_day_of_quarter_udf(a_date) AS last_of_quarter,
    trunc(a_date, 'MONTH') AS first_of_month,
    last_day(a_date) AS last_of_month,
    ((year(a_date) % 4 = 0 AND year(a_date) % 100 <> 0) OR year(a_date) % 400 = 0) AS is_leap_year,
    (dayofweek(a_date) IN (1,7)) AS is_weekend
FROM (
    SELECT start_at AS a_date FROM Bronze_Trips
    UNION ALL
    SELECT ended_at AS a_date FROM Bronze_Trips
    UNION ALL
    SELECT date AS a_date FROM Bronze_Payments
) tmp

Overwriting sql/dim_date.sql


In [0]:
with open('sql/dim_date.sql', 'r') as f:
    gold_dim_date = spark.sql(f.read())

display(gold_dim_date.limit(10))

date_string,date,year,quarter,month,week,day,weekday,weekday_name,month_name,first_of_year,last_of_year,first_of_quarter,last_of_quarter,first_of_month,last_of_month,is_leap_year,is_weekend
20210805,2021-08-05,2021,3,8,31,5,5,Thursday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210817,2021-08-17,2021,3,8,33,17,3,Tuesday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210802,2021-08-02,2021,3,8,31,2,2,Monday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210816,2021-08-16,2021,3,8,33,16,2,Monday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210829,2021-08-29,2021,3,8,34,29,1,Sunday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,True
20210825,2021-08-25,2021,3,8,34,25,4,Wednesday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210822,2021-08-22,2021,3,8,33,22,1,Sunday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,True
20210814,2021-08-14,2021,3,8,32,14,7,Saturday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,True
20210813,2021-08-13,2021,3,8,32,13,6,Friday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False
20210811,2021-08-11,2021,3,8,32,11,4,Wednesday,August,2021-01-01,2021-12-31,2021-07-01,2021-09-30,2021-08-01,2021-08-31,False,False


### Fact Payment

In [0]:
%%file sql/fact_payment.sql

SELECT 
    payment_id,
    to_char(date, 'yyyyMMdd') as payment_date,
    rider_id,
    amount
FROM Bronze_Payments

Overwriting sql/fact_payment.sql


In [0]:
with open('sql/fact_payment.sql', 'r') as f:
    gold_fact_payment = spark.sql(f.read())

display(gold_fact_payment.limit(10))

payment_id,payment_date,rider_id,amount
1,20190501,1000,9.0
2,20190601,1000,9.0
3,20190701,1000,9.0
4,20190801,1000,9.0
5,20190901,1000,9.0
6,20191001,1000,9.0
7,20191101,1000,9.0
8,20191201,1000,9.0
9,20200101,1000,9.0
10,20200201,1000,9.0


### Fact Trip

In [0]:
%%file sql/fact_trip.sql

SELECT
    bt.trip_id,
    br.rider_id,
    bt.start_station_id,
    bt.end_station_id,
    date_format(bt.start_at, 'yyyyMMdd') AS start_date,
    date_format(bt.ended_at, 'yyyyMMdd') AS end_date,
    date_format(bt.start_at, 'HHmmss') AS start_time,
    date_format(bt.ended_at, 'HHmmss') AS end_time,
    bt.rideable_type,
    (unix_timestamp(bt.ended_at) - unix_timestamp(bt.start_at)) AS trip_duration_seconds,
    floor(months_between(bt.ended_at, br.birthday) / 12) AS rider_age
FROM Bronze_Trips bt
JOIN Bronze_Riders br
    ON bt.rider_id = br.rider_id


Overwriting sql/fact_trip.sql


In [0]:
with open('sql/fact_trip.sql', 'r') as f:
    gold_fact_trip = spark.sql(f.read())

display(gold_fact_trip.limit(10))

trip_id,rider_id,start_station_id,end_station_id,start_date,end_date,start_time,end_time,rideable_type,trip_duration_seconds,rider_age
0A7C219369D5AC4E,2114,KA1504000135,13059,20210825,20210825,164701,170312,classic_bike,971,24
867139A6860A767C,12944,657,13059,20210811,20210811,171557,173717,classic_bike,1280,30
FBFD35388445808B,43921,KA1504000135,TA1309000033,20210829,20210829,142027,143608,classic_bike,941,31
CDBA93066763B385,66653,13258,15653,20210822,20210822,151519,151821,classic_bike,182,23
34DED391233DBA3F,57338,TA1305000022,13271,20210805,20210805,123410,130110,electric_bike,1620,33
FB6800A9EC72C76D,26184,TA1309000029,13059,20210816,20210816,180227,181658,classic_bike,871,30
0CD8B45F2E01F618,18172,13071,TA1308000022,20210813,20210813,233213,234046,electric_bike,513,19
25F1F10082E41B5D,22308,13241,15643,20210813,20210813,224305,230514,classic_bike,1329,40
F91AD8B87273DA38,38213,TA1307000128,13059,20210814,20210814,131748,134434,classic_bike,1606,48
970B5A8879A1481F,24490,TA1307000163,13059,20210802,20210802,222102,222502,classic_bike,240,25


## Save the Star Schema a Delta Lake Tables

In [0]:
gold_dim_station.write.mode("overwrite").format("delta").saveAsTable("Gold_Dim_Station")
gold_dim_rider.write.mode("overwrite").format("delta").saveAsTable("Gold_Dim_Rider")
gold_dim_time.write.mode("overwrite").format("delta").saveAsTable("Gold_Dim_Time")
gold_dim_date.write.mode("overwrite").format("delta").saveAsTable("Gold_Dim_Date")
gold_fact_payment.write.mode("overwrite").format("delta").saveAsTable("Gold_Fact_Payment")
gold_fact_trip.write.mode("overwrite").format("delta").saveAsTable("Gold_Fact_Trip")