In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType, StringType, IntegerType

## 1. Creating **fact_payments** table

In [0]:
payments = spark.table("default.staging_payments")

display(payments)

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:
payments.printSchema

<bound method DataFrame.printSchema of DataFrame[payment_id: string, date: string, amount: string, rider_id: string]>

In [0]:
payments = payments.dropDuplicates(payments.columns)
spark.sql("DROP TABLE IF EXISTS fact_payments")

DataFrame[]

In [0]:
payments.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("fact_payments")

## 2. Creating dim_stations table

In [0]:
stations = spark.table("default.staging_stations")

display(stations)

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:
stations.printSchema

<bound method DataFrame.printSchema of DataFrame[station_id: string, name: string, latitude: string, longitude: string]>

In [0]:
stations = stations.dropDuplicates(stations.columns)
spark.sql("DROP TABLE IF EXISTS fact_stations")

DataFrame[]

In [0]:
stations.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("dim_stations")

## 3. Creating **dim_riders** table

In [0]:
riders = spark.table("default.staging_riders")

display(riders.limit(10))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


In [0]:
riders.printSchema

<bound method DataFrame.printSchema of DataFrame[rider_id: string, first_name: string, last_name: string, address: string, birthday: string, account_start_date: string, account_end_date: string, is_member: string]>

In [0]:
riders = riders.dropDuplicates(riders.columns)
spark.sql("DROP TABLE IF EXISTS dim_riders")

DataFrame[]

In [0]:
riders.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("dim_riders")

## 4. Creating fact_trips table

In [0]:
trips = spark.table("default.staging_trips")

display(trips.limit(10))

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
7E1E50AC37E2DAD3,classic_bike,2021-08-14 14:01:36,2021-08-14 14:34:49,TA1309000007,13089,2644
ADFF32195521E952,classic_bike,2021-08-29 16:16:36,2021-08-29 16:24:43,13288,TA1308000031,37747
7C59843DB8D13CC7,electric_bike,2021-08-27 11:06:34,2021-08-27 11:12:52,TA1307000062,TA1305000020,63224
5B788004F8A5204C,classic_bike,2021-08-27 07:35:33,2021-08-27 07:59:35,13353,13242,45050
078629DD14B634AE,classic_bike,2021-08-08 15:00:30,2021-08-08 15:22:57,13353,13242,33762
5E98DA99CB0B52E4,classic_bike,2021-08-15 18:01:33,2021-08-15 18:26:52,13353,13242,33902
6A3F6243C9164889,classic_bike,2021-08-14 02:22:42,2021-08-14 02:26:10,13033,TA1305000020,47737
F034B9F0C7194317,classic_bike,2021-08-20 14:28:33,2021-08-20 14:55:42,TA1307000130,13089,28123
74EE09157161558A,classic_bike,2021-08-21 18:17:28,2021-08-21 19:03:19,15578,TA1308000031,60078
7EF8ED3865996053,electric_bike,2021-08-16 14:54:06,2021-08-16 15:09:13,13109,519,34360


In [0]:
trips = trips.dropDuplicates(trips.columns)

In [0]:
fact_trips = trips.join(riders, trips.rider_id == riders.rider_id, "inner") \
      .withColumn("rider_age", round(floor(months_between(col("started_at"), col("birthday")) / 12))) \
      .withColumn("duration", round((unix_timestamp(col("ended_at")) - unix_timestamp(col("started_at"))) / 3600, 2)) \
      .select("trip_id", riders.rider_id, "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "rider_age", "duration")

display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_age,duration
8A4A6F580B69FD4B,39075,classic_bike,2021-06-09 14:49:55,2021-06-09 14:57:59,13056,13021,50,0.13
78D26D69DB707B75,27587,classic_bike,2021-06-27 07:33:20,2021-06-27 07:44:23,KA1503000052,TA1308000009,47,0.18
12AE8B14675B028E,48258,electric_bike,2021-06-06 18:05:54,2021-06-06 18:40:38,15544,TA1308000009,38,0.58
E8F7BC00C6CDDB8C,36657,classic_bike,2021-06-25 15:20:33,2021-06-25 15:34:23,TA1309000064,13021,21,0.23
8174BF8FAF5BBDA8,24733,electric_bike,2021-06-11 23:41:23,2021-06-11 23:48:01,TA1308000005,13257,39,0.11
821C12BD0E83B595,43929,classic_bike,2021-06-10 15:41:22,2021-06-10 15:54:51,18067,13257,25,0.22
A971BD12F0BA5EE6,26557,electric_bike,2021-06-08 17:38:25,2021-06-08 17:41:42,WL-011,13021,48,0.05
3DE9F8FBA3E35279,30829,classic_bike,2021-06-22 15:34:58,2021-06-22 16:47:29,KA1503000074,TA1308000009,31,1.21
DDAF7BDAB39753E5,63089,electric_bike,2021-06-17 09:47:55,2021-06-17 09:53:14,13011,13021,45,0.09
9B5CEAE2575DFC13,48730,classic_bike,2021-06-01 22:04:20,2021-06-01 22:14:36,632,13059,25,0.17


In [0]:
spark.sql("DROP TABLE IF EXISTS fact_trips")

fact_trips.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("fact_trips")

## 5. Creating dim_date table

In [0]:
min_max_dates = (
    payments.select(min("date").alias("min_payment_date"), max("date").alias("max_payment_date"))
    .crossJoin(trips.select(min("started_at").alias("min_start_date"), max("ended_at").alias("max_end_date")))
    .select(
        least(to_timestamp(col("min_payment_date"), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col("min_start_date"), 'yyyy-MM-dd HH:mm:ss')).alias("min_date"),
        greatest(to_timestamp(col("max_payment_date"), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col("max_end_date"), 'yyyy-MM-dd HH:mm:ss')).alias("max_date")
    )
).collect()[0]

min_date = min_max_dates["min_date"]
max_date = min_max_dates["max_date"]

In [0]:
date_df = spark.createDataFrame([(min_date, max_date)], ["min_date", "max_date"]) \
    .withColumn("date", explode(sequence(col("min_date"), col("max_date")))) \
    .drop("min_date", "max_date")

display(date_df)

date
2021-02-01T01:07:04Z
2021-02-02T01:07:04Z
2021-02-03T01:07:04Z
2021-02-04T01:07:04Z
2021-02-05T01:07:04Z
2021-02-06T01:07:04Z
2021-02-07T01:07:04Z
2021-02-08T01:07:04Z
2021-02-09T01:07:04Z
2021-02-10T01:07:04Z


In [0]:
dim_date = date_df.withColumn("date_key", date_format(col("date"), "yyyyMMdd").cast(IntegerType())) \
                  .withColumn("year", year(date_df.date)) \
                  .withColumn("quarter", quarter(date_df.date)) \
                  .withColumn("month", month(date_df.date)) \
                  .withColumn("month_name", date_format(col("date"), "MMM").cast(StringType())) \
                  .withColumn("day", dayofmonth(date_df.date)) \
                  .withColumn("day_of_week", dayofweek(date_df.date)) \
                  .withColumn("day_name", date_format(col("date"), "EE").cast(StringType()))

display(dim_date.limit(10))

date,date_key,year,quarter,month,month_name,day,day_of_week,day_name
2021-02-01T01:07:04Z,20210201,2021,1,2,Feb,1,2,Mon
2021-02-02T01:07:04Z,20210202,2021,1,2,Feb,2,3,Tue
2021-02-03T01:07:04Z,20210203,2021,1,2,Feb,3,4,Wed
2021-02-04T01:07:04Z,20210204,2021,1,2,Feb,4,5,Thu
2021-02-05T01:07:04Z,20210205,2021,1,2,Feb,5,6,Fri
2021-02-06T01:07:04Z,20210206,2021,1,2,Feb,6,7,Sat
2021-02-07T01:07:04Z,20210207,2021,1,2,Feb,7,1,Sun
2021-02-08T01:07:04Z,20210208,2021,1,2,Feb,8,2,Mon
2021-02-09T01:07:04Z,20210209,2021,1,2,Feb,9,3,Tue
2021-02-10T01:07:04Z,20210210,2021,1,2,Feb,10,4,Wed


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_date")

dim_date.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("dim_date")