In [0]:
dim_rider = spark.sql('''
select rider_id, first_name, last_name, address, birthday, start_date, end_date, is_member from riders''')

dim_rider.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_rider")

In [0]:
dim_station = spark.sql('''
select station_id, name, latitude, longtitude from stations''')

dim_station.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_station")

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

min_date_query = spark.sql('''select MIN(started_at) as started_at from trips''')
min_date = min_date_query.first().asDict()['started_at']

max_date_query = spark.sql('''select DATEADD(year, 5, MAX(started_at)) as started_at from trips''')
max_date = max_date_query.first().asDict()['started_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

In [0]:
dim_time = spark.createDataFrame([(1,)], ["time_id"])

dim_time = dim_time.withColumn("dateinit", f.explode(f.expr(expression)))
dim_time = dim_time.withColumn("date", f.to_timestamp(dim_time.dateinit, "yyyy-MM-dd"))
dim_time = dim_time \
            .withColumn("dayofweek", f.dayofweek(dim_time.date)) \
            .withColumn("dayofmonth", f.dayofmonth(dim_time.date)) \
            .withColumn("weekofyear", f.weekofyear(dim_time.date)) \
            .withColumn("year", f.year(dim_time.date)) \
            .withColumn("quarter", f.quarter(dim_time.date)) \
            .withColumn("month", f.month(dim_time.date)) \
            .withColumn("time_id", dim_time.date.cast(StringType())) \
            .drop(f.col("dateinit"))
                  
display(dim_time)

time_id,date,dayofweek,dayofmonth,weekofyear,year,quarter,month
2021-02-01 00:00:00,2021-02-01T00:00:00.000+0000,2,1,5,2021,1,2
2021-02-02 00:00:00,2021-02-02T00:00:00.000+0000,3,2,5,2021,1,2
2021-02-03 00:00:00,2021-02-03T00:00:00.000+0000,4,3,5,2021,1,2
2021-02-04 00:00:00,2021-02-04T00:00:00.000+0000,5,4,5,2021,1,2
2021-02-05 00:00:00,2021-02-05T00:00:00.000+0000,6,5,5,2021,1,2
2021-02-06 00:00:00,2021-02-06T00:00:00.000+0000,7,6,5,2021,1,2
2021-02-07 00:00:00,2021-02-07T00:00:00.000+0000,1,7,5,2021,1,2
2021-02-08 00:00:00,2021-02-08T00:00:00.000+0000,2,8,6,2021,1,2
2021-02-09 00:00:00,2021-02-09T00:00:00.000+0000,3,9,6,2021,1,2
2021-02-10 00:00:00,2021-02-10T00:00:00.000+0000,4,10,6,2021,1,2


In [0]:
dim_time.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("dim_time")

In [0]:
#from pyspark.sql.functions import col
trips = spark.sql('''select * from trips''')

fact_trip = spark.sql('''
    SELECT 
        trips.trip_id,
        riders.rider_id,
        trips.start_station_id, 
        trips.end_station_id, 
        trips.rideable_type, 
        trips.started_at, 
        trips.ended_at, 
        DATEDIFF(hour, trips.started_at, trips.ended_at) AS duration,
        DATEDIFF(year, riders.birthday, trips.started_at) AS rider_age
    FROM trips JOIN riders ON riders.rider_id = trips.rider_id
''')
display(fact_trip)

trip_id,rider_id,start_station_id,end_station_id,rideable_type,started_at,ended_at,duration,rider_age
222BB8E5059252D7,34062,KA1503000064,13021,classic_bike,2021-06-13 09:48:47,2021-06-13 10:07:23,0,30
1826E16CB5486018,5342,TA1306000010,13021,classic_bike,2021-06-21 22:59:13,2021-06-21 23:04:29,0,26
3D9B6A0A5330B04D,3714,TA1305000030,13021,classic_bike,2021-06-18 16:06:42,2021-06-18 16:12:02,0,26
07E82F5E9C9E490F,18793,TA1305000034,13021,classic_bike,2021-06-17 16:46:23,2021-06-17 17:02:45,0,18
A8E94BAECBF0C2DD,43342,TA1308000009,TA1308000009,docked_bike,2021-06-13 17:36:29,2021-06-13 18:30:39,0,28
378F4AB323AA1D14,6693,TA1308000009,TA1308000009,docked_bike,2021-06-13 13:20:10,2021-06-13 14:06:14,0,28
38AD311DC2EB1FBE,71480,KA1503000019,KA1503000019,docked_bike,2021-06-16 17:14:30,2021-06-16 17:28:34,0,56
1D466737F0B18097,50846,TA1308000009,TA1308000009,docked_bike,2021-06-27 14:51:52,2021-06-27 15:26:39,0,40
27E1142E1ACFAEFB,18951,13257,13257,electric_bike,2021-06-21 13:58:26,2021-06-21 13:58:53,0,21
67F2A115DAE77924,63987,TA1308000009,TA1308000009,classic_bike,2021-06-22 00:51:43,2021-06-22 01:08:25,0,37


In [0]:
fact_trip.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("fact_trip")

In [0]:
fact_payment = spark.sql('''
    SELECT
        payment_id,
        payments.amount,
        payments.rider_id,
        dim_time.time_id
    FROM payments
    JOIN dim_time ON dim_time.date = payments.date
''')

display(fact_payment)

payment_id,amount,rider_id,time_id
22,9.0,1000,2021-02-01 00:00:00
23,9.0,1000,2021-03-01 00:00:00
24,9.0,1000,2021-04-01 00:00:00
25,9.0,1000,2021-05-01 00:00:00
26,9.0,1000,2021-06-01 00:00:00
27,9.0,1000,2021-07-01 00:00:00
28,9.0,1000,2021-08-01 00:00:00
29,9.0,1000,2021-09-01 00:00:00
30,9.0,1000,2021-10-01 00:00:00
31,9.0,1000,2021-11-01 00:00:00


In [0]:
fact_payment.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")