# Create Fact and Dimension tables for Analytics

### Create Date Dimension table

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

In [None]:
df_trips = spark.table("trips")
# Get the min date from the started date of trips
min_date= df_trips.select(F.min("started_at").alias("min_date")).first().asDict()["min_date"]
# Get the max date from the ended date of trips and add more 5 years for future trips
max_date = df_trips.selectExpr("DATEADD(YEAR, 5, MAX(ended_at)) AS max_date").first().asDict()["max_date"]
dim_date = spark.createDataFrame([(1,)], ["date_key"])
date_interval = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = dim_date.withColumn("date",  F.explode(F.expr(date_interval)))

dim_date = dim_date.withColumn("year", F.year(dim_date.date))\
                    .withColumn("quarter", F.quarter(dim_date.date))\
                    .withColumn("month", F.month(dim_date.date))\
                    .withColumn("day_of_week", F.dayofweek(dim_date.date))\
                    .withColumn("date_key", F.expr("date_format(date, 'yyyyMMdd')").cast(IntegerType()))
dim_date.show(5)


+--------+----------+----+-------+-----+-----------+
|date_key|      date|year|quarter|month|day_of_week|
+--------+----------+----+-------+-----+-----------+
|20210201|2021-02-01|2021|      1|    2|          2|
|20210202|2021-02-02|2021|      1|    2|          3|
|20210203|2021-02-03|2021|      1|    2|          4|
|20210204|2021-02-04|2021|      1|    2|          5|
|20210205|2021-02-05|2021|      1|    2|          6|
+--------+----------+----+-------+-----+-----------+
only showing top 5 rows



In [None]:
dim_date.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("gold_dim_date")

### Create Station Dimension table

In [None]:
dim_station = spark.table("stations")
# Generate the surrogate key
dim_station = dim_station.withColumn("station_key", F.monotonically_increasing_id())
# Reoder column positions
dim_station = dim_station.select("station_key", "station_id", "name", "latitude", "longitude")
dim_station.show(5)

+-----------+------------+--------------------+---------+----------+
|station_key|  station_id|                name| latitude| longitude|
+-----------+------------+--------------------+---------+----------+
|          0|         525|Glenwood Ave & To...|  42.0127| -87.66606|
|          1|KA1503000012|  Clark St & Lake St|41.885796|  -87.6311|
|          2|         637|Wood St & Chicago...|41.895634|-87.672066|
|          3|       13216|  State St & 33rd St|41.834732|-87.625824|
|          4|       18003|Fairbanks St & Su...| 41.89581|-87.620255|
+-----------+------------+--------------------+---------+----------+
only showing top 5 rows



In [None]:
dim_station.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("gold_dim_station")

### Create Rider Dimension table

In [None]:
dim_rider = spark.table("riders")
# Generate the surrogate key
dim_rider = dim_rider.withColumn("rider_key", F.monotonically_increasing_id())
# Reorder column positions
dim_rider = dim_rider.selectExpr("rider_key", "rider_id", "first", "last", "address", "birthday", "start_date AS account_start_date", "end_date AS account_end_date", "member AS member_status")
dim_rider.show(5)


+---------+--------+--------+---------+--------------------+-------------------+-------------------+-------------------+-------------+
|rider_key|rider_id|   first|     last|             address|           birthday| account_start_date|   account_end_date|member_status|
+---------+--------+--------+---------+--------------------+-------------------+-------------------+-------------------+-------------+
|        0|    1000|   Diana|    Clark| 1200 Alyssa Squares|1989-02-13 00:00:00|2019-04-23 00:00:00|               NULL|         true|
|        1|    1001|Jennifer|    Smith|     397 Diana Ferry|1976-08-10 00:00:00|2019-11-01 00:00:00|2020-09-01 00:00:00|         true|
|        2|    1002|   Karen|    Smith|644 Brittany Row ...|1998-08-10 00:00:00|2022-02-04 00:00:00|               NULL|         true|
|        3|    1003|   Bryan|  Roberts|996 Dickerson Tur...|1999-03-29 00:00:00|2019-08-26 00:00:00|               NULL|        false|
|        4|    1004|   Jesse|Middleton|7009 Nathan Expr

In [None]:
dim_rider.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("gold_dim_rider")

### Create Payment Fact table

In [None]:
fact_payment = spark.table("payments")

fact_payment = fact_payment.alias("p").join(dim_rider.alias("r"), F.col("p.account_number") == F.col("r.rider_id"), "left")\
                            .withColumn("date_key", F.expr("date_format(date, 'yyyyMMdd')").cast(IntegerType()))\
                            .withColumn("rider_key", F.col("r.rider_key"))\
                            .selectExpr("payment_id AS payment_key", "date_key", "rider_key", "amount")
fact_payment.show(5)

+-----------+--------+---------+------+
|payment_key|date_key|rider_key|amount|
+-----------+--------+---------+------+
|     539256|20200801|    20826|   9.0|
|     539257|20200901|    20826|   9.0|
|     539258|20201001|    20826|   9.0|
|     539259|20201101|    20826|   9.0|
|     539260|20201201|    20826|   9.0|
+-----------+--------+---------+------+
only showing top 5 rows



In [None]:
fact_payment.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("gold_fact_payment")

### Create Trip Fact table

In [None]:
fact_trip = spark.table("trips")
fact_trip = fact_trip.alias("t").join(dim_station.alias("s1"), F.col("t.start_station_id") == F.col("s1.station_id"), "left")\
                    .join(dim_station.alias("s2"), F.col("t.end_station_id") == F.col("s2.station_id"), "left")\
                    .join(dim_rider.alias("r"), F.col("t.member_id") == F.col("r.rider_id"), "left")\
                    .withColumn("date_key", F.expr("date_format(t.started_at, 'yyyyMMdd')").cast(IntegerType()))\
                    .withColumn("duration", ((F.col("t.ended_at") - F.col("t.started_at")).cast('long')/60).cast('int'))\
                    .withColumn("rider_age", (F.months_between(F.col("t.started_at"), F.col("r.birthday")) / 12).cast('int'))\
                    .selectExpr("t.trip_id AS trip_key", "s1.station_key AS start_station_key", "s2.station_key AS end_station_key", "r.rider_key", "date_key", "rideable_type", "started_at", "ended_at", "duration", "rider_age")
fact_trip.show(5)

+----------------+-----------------+---------------+----------+--------+-------------+-------------------+-------------------+--------+---------+
|        trip_key|start_station_key|end_station_key| rider_key|date_key|rideable_type|         started_at|           ended_at|duration|rider_age|
+----------------+-----------------+---------------+----------+--------+-------------+-------------------+-------------------+--------+---------+
|7E1E50AC37E2DAD3|              224|            487|      1644|20210814| classic_bike|2021-08-14 14:01:36|2021-08-14 14:34:49|      33|       45|
|ADFF32195521E952|               74|            153|     36747|20210829| classic_bike|2021-08-29 16:16:36|2021-08-29 16:24:43|       8|       19|
|7C59843DB8D13CC7|              305|            365|8589940559|20210827|electric_bike|2021-08-27 11:06:34|2021-08-27 11:12:52|       6|       34|
|5B788004F8A5204C|              279|            457|     44050|20210827| classic_bike|2021-08-27 07:35:33|2021-08-27 07:59:3

In [None]:
fact_trip.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("gold_fact_trip")