In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType

## Fact Payments

In [0]:
df_staging_payments = spark.table("staging_payments")

In [0]:
display(df_staging_payments.limit(5))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


In [0]:
spark.sql("DROP TABLE IF EXISTS fact_payments")

df_staging_payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("fact_payments")

In [0]:
fact_payments = spark.sql("SELECT * FROM fact_payments")

display(fact_payments.limit(5))

payment_id,date,amount,rider_id
10000,2021-03-01,12.81,1393
1000001,2018-08-01,9.0,39647
1000011,2019-06-01,9.0,39647
1000012,2019-07-01,9.0,39647
1000015,2019-10-01,9.0,39647


## Dim Riders

In [0]:
df_staging_riders = spark.table("staging_riders")

In [0]:
display(df_staging_riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_riders")

df_staging_riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_riders")

In [0]:
dim_riders = spark.sql("SELECT * FROM dim_riders")

display(dim_riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1016,Andrew,Jones,72226 Casey Square,1991-12-13,2022-02-02,,True
1019,Tina,Garcia,00348 Brandi Parks Suite 405,1997-05-03,2021-07-10,2022-01-01,False
1025,Bill,Gregory,5182 Michelle Place Apt. 142,2004-12-06,2020-05-01,,False


## Dim Stations

In [0]:
df_staging_stations = spark.table("staging_stations")

In [0]:
display(df_staging_stations.limit(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_stations")

df_staging_stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("dim_stations")

In [0]:
dim_stations = spark.sql("SELECT * FROM dim_stations")

display(dim_stations.limit(5))

station_id,name,latitude,longitude
13001,Michigan Ave & Washington St,41.8839840647265,-87.6246839761734
13006,LaSalle St & Washington St,41.882664,-87.63253
13008,Millennium Park,41.8810317,-87.62408432
13011,Canal St & Adams St,41.879255,-87.639904
13016,St. Clair St & Erie St,41.89434513742426,-87.62279838323593


## Fact Trips

In [0]:
df_staging_trips = spark.table("staging_trips")

In [0]:
display(df_staging_trips.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608


In [0]:
fact_trips = spark.sql('SELECT * FROM staging_trips') \
                        .join(df_staging_riders,on='rider_id') \
                        .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
                        .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
                        .select("trip_id", "rider_id", "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", "duration", "rider_age")
            


In [0]:
spark.sql("DROP TABLE IF EXISTS fact_trips")

df_staging_trips.write.format("delta").mode("overwrite").saveAsTable("fact_trips")

In [0]:
fact_trips = spark.sql("SELECT * FROM fact_trips")

display(fact_trips.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608


## Dim Time

In [0]:
min_date = df_staging_trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']
max_date = df_staging_trips.selectExpr('MAX(start_at) AS started_at').first().asDict()['started_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_times = spark.createDataFrame([(1,)], ["time_id"])

dim_times = dim_times.withColumn("dateinit", f.explode(f.expr(expression)))
dim_times = dim_times.withColumn("date", f.to_timestamp(dim_times.dateinit, "yyyy-MM-dd"))

dim_times = dim_times \
            .withColumn("day_of_month", f.dayofmonth(dim_times.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_times.date)) \
            .withColumn("year", f.year(dim_times.date)) \
            .withColumn("month", f.month(dim_times.date)) \
            .withColumn("quarter", f.quarter(dim_times.date)) \
            .withColumn("week_of_year", f.weekofyear(dim_times.date)) \
            .withColumn("time_id", dim_times.date.cast(StringType())) \
            .drop(f.col("dateinit"))

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_times")

dim_times.write.format("delta").mode("overwrite").saveAsTable("dim_times")

In [0]:
dim_times = spark.sql("SELECT * FROM dim_times")

display(dim_times.limit(5))

time_id,date,day_of_month,day_of_week,year,month,quarter,week_of_year
2021-02-01 00:00:00,2021-02-01T00:00:00.000+0000,1,2,2021,2,1,5
2021-02-02 00:00:00,2021-02-02T00:00:00.000+0000,2,3,2021,2,1,5
2021-02-03 00:00:00,2021-02-03T00:00:00.000+0000,3,4,2021,2,1,5
2021-02-04 00:00:00,2021-02-04T00:00:00.000+0000,4,5,2021,2,1,5
2021-02-05 00:00:00,2021-02-05T00:00:00.000+0000,5,6,2021,2,1,5
