# Transform the data into the star schema for a Gold data store

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.functions import *

from pyspark.sql.types import *

nrows_display = 10

## 1. Fact table - Payment

### Fetch data from corresponding staging table

In [0]:
payments = spark.table("stagingpayment")
display(payments.limit(nrows_display))

payment_id,date,amount,rider
1064462,2020-06-01,9.0,42106
1064463,2020-07-01,9.0,42106
1064464,2020-08-01,9.0,42106
1064465,2020-09-01,9.0,42106
1064466,2020-10-01,9.0,42106
1064467,2020-11-01,9.0,42106
1064468,2020-12-01,9.0,42106
1064469,2021-01-01,9.0,42106
1064470,2021-02-01,9.0,42106
1064471,2021-03-01,9.0,42106


### Data wrangling and then save as Gold fact/dim table

Since the table schema here is exactly the same with our star schema design, we just need to drop duplicates in surrogate column and then save the Gold fact table.

In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.fact_payment")

# Data wrangling (basically dropping duplicates in surrogate column), and then save as Gold table in `default` directory
payments.dropDuplicates(["payment_id"])     \
    .write.format("delta")                  \
    .mode("overwrite")                      \
    .saveAsTable("default.fact_payment")

# Final check
review_fact_payment = spark.table("fact_payment")
display(review_fact_payment.limit(nrows_display))

payment_id,date,amount,rider
5,2019-09-01,9.0,1000
273,2019-06-01,9.0,1011
401,2020-01-01,9.0,1015
421,2021-09-01,9.0,1015
1152,2020-10-01,9.0,1042
1505,2016-03-01,9.0,1054
1592,2019-04-01,12.82,1056
1861,2020-12-01,9.0,1070
1981,2021-03-01,8.93,1072
2048,2020-07-01,9.0,1078


Next, just do the same for the rest of 4 tables, be sure to add/edit additional columns to match star schema design.

## 2. Dimension table - Rider

In [0]:
riders = spark.table("stagingrider")
display(riders.limit(nrows_display))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
57257,Mark,Mcfarland,9928 Hunter Ranch,1982-02-01,2020-12-05,,False
57258,Mark,Davis,20036 Barrett Summit Apt. 714,1963-07-28,2017-07-12,,True
57259,Bryan,Manning,089 Sarah Square,1984-11-05,2018-08-10,,True
57260,Michele,Rowe,3157 Nicole Ferry Apt. 826,1997-09-21,2016-06-03,,True
57261,John,Mckenzie,312 Jessica Wells,2002-10-13,2016-02-01,2018-07-01,True
57262,Tami,Rivera,910 Lopez Pass Apt. 426,2001-06-29,2020-11-12,,True
57263,Joseph,Hodge,411 Mccoy Haven,1993-04-30,2020-05-29,,False
57264,Lauren,Brown,667 Rodriguez Ramp,2002-10-20,2020-05-10,,False
57265,Stephanie,Reed,90789 Fowler Circle,1993-03-20,2018-09-05,2019-09-01,True
57266,Brittney,Lamb,55635 Valerie Falls,1993-12-09,2020-10-10,,True


In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.dim_rider")

# Data wrangling (basically dropping duplicates in surrogate column), and then save as Gold table in `default` directory
riders.dropDuplicates(["rider_id"])         \
    .write.format("delta")                  \
    .mode("overwrite")                      \
    .saveAsTable("default.dim_rider")

# Final check
review_dim_rider = spark.table("dim_rider")
display(review_dim_rider.limit(nrows_display))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1231,Norman,Patel,0957 Elizabeth Pike Suite 566,1987-12-26,2015-09-21,,True
1397,Lacey,Smith,9260 Brendan Roads,1962-09-27,2019-05-05,,True
1461,Bill,Craig,078 Richards Shoal,2000-10-27,2019-02-05,,True
1627,Crystal,Moses,122 Gomez Springs Suite 303,1992-02-22,2017-08-20,2019-03-01,True
1802,Courtney,Roberts,154 Krueger Islands Suite 247,2002-04-03,2019-06-05,,True
1817,Sandra,Harper,90302 Brent Forks,2004-02-25,2016-03-13,2018-10-01,True
2152,Tiffany,Lewis,052 Alex Bridge Apt. 834,1978-03-13,2021-02-25,2021-04-01,True
2328,Lisa,Green,9928 Smith Cliffs Apt. 094,1978-01-29,2015-04-02,2021-08-01,True
2435,Lisa,Walker,82195 Richard Squares Suite 932,1992-01-16,2021-10-08,,True
2507,Jerome,Evans,02699 Johnathan Corner,1967-06-17,2022-01-23,,True


## 3. Dimension table - Station

In [0]:
stations = spark.table("stagingstation")
display(stations.limit(nrows_display))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.dim_station")

# Data wrangling (basically dropping duplicates in surrogate column), and then save as Gold table in `default` directory
stations.dropDuplicates(["station_id"])     \
    .write.format("delta")                  \
    .mode("overwrite")                      \
    .saveAsTable("default.dim_station")

# Final check
review_dim_station = spark.table("dim_station")
display(review_dim_station.limit(nrows_display))

station_id,name,latitude,longitude
13192,Halsted St & Dickens Ave,41.919936,-87.64883
KA1504000162,Clark St & Lunt Ave,42.009074,-87.67419
15634,Western Ave & Roscoe St,41.943093,-87.6873335
15539,Desplaines St & Jackson Blvd,41.878161166666665,-87.64428766666668
TA1305000022,Orleans St & Merchandise Mart Plaza,41.888243,-87.63639
KA1503000075,DuSable Museum,41.79156801058,-87.60785217739999
13133,Damen Ave & Cortland St,41.915983,-87.677335
TA1305000009,Clark St & Ida B Wells Dr,41.8759326655,-87.63058453549999
KA1504000135,Wells St & Elm St,41.90320733333333,-87.63461616666666
17660,California Ave & Cortez St,41.900363,-87.696704


## 4. Fact table - Trip

In [0]:
trips = spark.table("stagingtrip")
display(trips.limit(nrows_display))

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
7E1E50AC37E2DAD3,classic_bike,2021-08-14 14:01:36,2021-08-14 14:34:49,TA1309000007,13089,2644
ADFF32195521E952,classic_bike,2021-08-29 16:16:36,2021-08-29 16:24:43,13288,TA1308000031,37747
7C59843DB8D13CC7,electric_bike,2021-08-27 11:06:34,2021-08-27 11:12:52,TA1307000062,TA1305000020,63224
5B788004F8A5204C,classic_bike,2021-08-27 07:35:33,2021-08-27 07:59:35,13353,13242,45050
078629DD14B634AE,classic_bike,2021-08-08 15:00:30,2021-08-08 15:22:57,13353,13242,33762
5E98DA99CB0B52E4,classic_bike,2021-08-15 18:01:33,2021-08-15 18:26:52,13353,13242,33902
6A3F6243C9164889,classic_bike,2021-08-14 02:22:42,2021-08-14 02:26:10,13033,TA1305000020,47737
F034B9F0C7194317,classic_bike,2021-08-20 14:28:33,2021-08-20 14:55:42,TA1307000130,13089,28123
74EE09157161558A,classic_bike,2021-08-21 18:17:28,2021-08-21 19:03:19,15578,TA1308000031,60078
7EF8ED3865996053,electric_bike,2021-08-16 14:54:06,2021-08-16 15:09:13,13109,519,34360


We have to add these columns to match current table schema with our star schema design:
- `trip_date` : date of the start of the trip, date format of `yyyy-mm-dd`.
- `trip_time` : time of the start of the trip, time format of `hh:mm:ss`.
- `rider_age` : age of the rider, int format.
- `duration` : total time length of the trip, in SECONDS, bigint format.

In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.fact_trip")

# Data wrangling
fact_trip = trips.alias("T").join(                                                                      \
    riders.alias("R"),                                                                                  \
    col("T.rider_id") == col("R.rider_id"),                                                             \
    "left"                                                                                              \
)                                                                                                       \
    .withColumn(                                                                                        \
        "trip_date",                                                                                    \
        f.to_date(substring(col("started_at").cast("string"), 1, 10), "yyyy-MM-dd")                     \
    )                                                                                                   \
    .withColumn(                                                                                        \
        "trip_time",                                                                                    \
        substring(col("started_at").cast("string"), 12, 8)                                              \
    )                                                                                                   \
    .withColumn(                                                                                        \
        "rider_age",                                                                                    \
        round((unix_timestamp("started_at") - unix_timestamp("birthday", "yyyy-MM-dd")) /3600/24/365)   \
    )                                                                                                   \
    .withColumn(                                                                                        \
        "duration",                                                                                     \
        round((unix_timestamp("ended_at") - unix_timestamp("started_at")))                              \
    )                                                                                                   \
    .select(                                                                                            \
        "trip_id", "trip_date", "trip_time", "started_at", "ended_at", col("T.rider_id"),               \
        "rideable_type", "rider_age", "start_station_id", "end_station_id", "duration"                  \
    )                                                                                                   \

display(fact_trip.limit(nrows_display))

trip_id,trip_date,trip_time,started_at,ended_at,rider_id,rideable_type,rider_age,start_station_id,end_station_id,duration
7E1E50AC37E2DAD3,2021-08-14,14:01:36,2021-08-14 14:01:36,2021-08-14 14:34:49,2644,classic_bike,46.0,TA1309000007,13089,1993
ADFF32195521E952,2021-08-29,16:16:36,2021-08-29 16:16:36,2021-08-29 16:24:43,37747,classic_bike,20.0,13288,TA1308000031,487
7C59843DB8D13CC7,2021-08-27,11:06:34,2021-08-27 11:06:34,2021-08-27 11:12:52,63224,electric_bike,35.0,TA1307000062,TA1305000020,378
5B788004F8A5204C,2021-08-27,07:35:33,2021-08-27 07:35:33,2021-08-27 07:59:35,45050,classic_bike,28.0,13353,13242,1442
078629DD14B634AE,2021-08-08,15:00:30,2021-08-08 15:00:30,2021-08-08 15:22:57,33762,classic_bike,45.0,13353,13242,1347
5E98DA99CB0B52E4,2021-08-15,18:01:33,2021-08-15 18:01:33,2021-08-15 18:26:52,33902,classic_bike,37.0,13353,13242,1519
6A3F6243C9164889,2021-08-14,02:22:42,2021-08-14 02:22:42,2021-08-14 02:26:10,47737,classic_bike,25.0,13033,TA1305000020,208
F034B9F0C7194317,2021-08-20,14:28:33,2021-08-20 14:28:33,2021-08-20 14:55:42,28123,classic_bike,44.0,TA1307000130,13089,1629
74EE09157161558A,2021-08-21,18:17:28,2021-08-21 18:17:28,2021-08-21 19:03:19,60078,classic_bike,29.0,15578,TA1308000031,2751
7EF8ED3865996053,2021-08-16,14:54:06,2021-08-16 14:54:06,2021-08-16 15:09:13,34360,electric_bike,51.0,13109,519,907


In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.fact_trip")

# A lil bit more wrangling and then save as Gold table in `default` directory
fact_trip.dropDuplicates(["trip_id"])       \
    .write.format("delta")                  \
    .mode("overwrite")                      \
    .saveAsTable("default.fact_trip")

# Final check
review_fact_trip = spark.table("fact_trip")
display(review_fact_trip.limit(nrows_display))

trip_id,trip_date,trip_time,started_at,ended_at,rider_id,rideable_type,rider_age,start_station_id,end_station_id,duration
BB9370C505A989B3,2021-06-22,07:53:55,2021-06-22 07:53:55,2021-06-22 08:13:45,45612,electric_bike,44.0,TA1307000134,13021,1190
C91B7E06269B8EA4,2021-06-29,11:42:24,2021-06-29 11:42:24,2021-06-29 11:58:31,56083,electric_bike,39.0,13001,13021,967
05AD08DACC3C1292,2021-06-24,12:46:08,2021-06-24 12:46:08,2021-06-24 13:12:29,46659,classic_bike,16.0,TA1309000024,13434,1581
316BE0A30B040826,2021-06-16,17:27:40,2021-06-16 17:27:40,2021-06-16 19:27:57,15201,docked_bike,29.0,15545,TA1308000009,7217
94AC802A09EA31BE,2021-06-03,09:36:17,2021-06-03 09:36:17,2021-06-03 09:38:21,42409,classic_bike,49.0,TA1306000003,13021,124
D47DF29C94C08D58,2021-06-14,19:12:40,2021-06-14 19:12:40,2021-06-14 19:14:42,45834,electric_bike,55.0,TA1306000003,13021,122
A14E11622DEFD9CB,2021-06-06,12:47:16,2021-06-06 12:47:16,2021-06-06 13:02:03,63348,classic_bike,55.0,13224,13271,887
8CC18A43D3768E99,2021-06-05,12:41:03,2021-06-05 12:41:03,2021-06-05 12:47:03,11572,classic_bike,36.0,TA1307000164,TA1308000009,360
D995F46485664573,2021-06-18,19:52:30,2021-06-18 19:52:30,2021-06-18 20:35:52,52753,classic_bike,21.0,TA1306000012,KA1504000171,2602
24943516958EF917,2021-06-23,10:30:18,2021-06-23 10:30:18,2021-06-23 10:45:11,70159,electric_bike,47.0,TA1307000134,TA1309000033,893


## 5. Dimension table - Date

In [0]:
dates = spark.table("stagingdate")
display(dates.limit(nrows_display))

date_id,day,month,quarter,year,day_of_week,day_of_year
2013-01-01,1,1,1,2013,3,1
2013-01-02,2,1,1,2013,4,2
2013-01-03,3,1,1,2013,5,3
2013-01-04,4,1,1,2013,6,4
2013-01-05,5,1,1,2013,7,5
2013-01-06,6,1,1,2013,1,6
2013-01-07,7,1,1,2013,2,7
2013-01-08,8,1,1,2013,3,8
2013-01-09,9,1,1,2013,4,9
2013-01-10,10,1,1,2013,5,10


In [0]:
# Drop if exist
spark.sql("DROP TABLE IF EXISTS default.dim_date")

# Data wrangling (basically dropping duplicates in surrogate column), and then save as Gold table in `default` directory
dates.withColumn(                           \
    "date_id",                              \
    f.to_date(col("date_id"), "yyy-MM-dd")  \
)                                           \
    .dropDuplicates(["date_id"])            \
    .write.format("delta")                  \
    .mode("overwrite")                      \
    .saveAsTable("default.dim_date")

# Final check
review_dim_date = spark.table("dim_date")
display(review_dim_date.limit(nrows_display))

date_id,day,month,quarter,year,day_of_week,day_of_year
2013-01-22,22,1,1,2013,3,22
2013-03-26,26,3,1,2013,3,85
2013-05-21,21,5,2,2013,3,141
2013-09-09,9,9,3,2013,2,252
2014-09-26,26,9,3,2014,6,269
2014-11-12,12,11,4,2014,4,316
2015-03-09,9,3,1,2015,2,68
2015-05-19,19,5,2,2015,3,139
2016-03-01,1,3,1,2016,3,61
2017-08-11,11,8,3,2017,6,223
