# Transform delta tables and create gold tables for star schema

In [0]:
from pyspark.sql.functions import explode, sequence, to_date

beginDate = '2010-01-01'
endDate = '2023-12-31'


spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as date").createOrReplaceTempView('dates_tmp')


In [0]:
%sql 

create or replace table gold_dates using delta location '/delta/dates' 
as select 
date, day(date) as day, date_format(date, 'EEEE') as day_name, weekday(date) + 1 as day_of_week, month(date) as month, year(date) as year 
from dates_tmp

num_affected_rows,num_inserted_rows


In [0]:
riders_df = spark.table("silver_riders")
dim_riders_df = riders_df \
    .withColumnRenamed('account_start_date', "member_start_date_id") \
    .withColumnRenamed('account_end_date', "member_end_date_id") \
    .select("rider_id", "first_name", "last_name", "address", "birthday", "is_member", "member_start_date_id", "member_end_date_id")
dim_riders_df.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("gold_riders")

In [0]:
stations_df = spark.table("silver_stations")
dim_stations_df = stations_df.select("station_id","name","latitude", "longitude")
dim_stations_df.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("gold_stations")


In [0]:
payments_df = spark.table("silver_payments")
fact_payments_df = payments_df \
    .withColumn("date_id", payments_df.date) \
    .select("payment_id","rider_id","date_id","amount")
fact_payments_df.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("gold_payments")


In [0]:
from pyspark.sql.functions import datediff,col, unix_timestamp, round
from pyspark.sql.types import IntegerType

trips_df = spark.table("silver_trips")
fact_trips_df = trips_df.join(riders_df, trips_df.rider_id == riders_df.rider_id, "left") \
    .withColumn("trip_duration_minutes",round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
    .withColumn('rider_age', round(datediff(col("account_start_date"),col("birthday"))/365.25).cast('int')) \
    .withColumnRenamed("start_at","start_date_id") \
    .withColumnRenamed("ended_at","end_date_id") \
    .select("trip_id", "trip_duration_minutes", "start_date_id", "end_date_id", trips_df.rider_id, col("rider_age"), "rideable_type", "start_station_id", "end_station_id")

display(fact_trips_df)

fact_trips_df.dropDuplicates(["trip_id"]).write.format("delta").mode("overwrite").saveAsTable("gold_trips")

trip_id,trip_duration_minutes,start_date_id,end_date_id,rider_id,rider_age,rideable_type,start_station_id,end_station_id
89E7AA6C29227EFF,7.0,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,71934,37,classic_bike,525,660
0FEFDE2603568365,20.0,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,47854,37,classic_bike,525,16806
E6159D746B2DBB91,9.0,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,70870,33,electric_bike,KA1503000012,TA1305000029
B32D3199F1C2E75B,4.0,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,58974,19,classic_bike,637,TA1305000034
83E463F23575F4BF,15.0,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,39608,71,electric_bike,13216,TA1309000055
BDAA7E3494E8D545,6.0,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,36267,28,electric_bike,18003,KP1705001026
A772742351171257,1.0,2021-02-01T17:47:42.000+0000,2021-02-01T17:48:33.000+0000,50104,29,classic_bike,KP1705001026,KP1705001026
295476889D9B79F8,1.0,2021-02-11T18:33:53.000+0000,2021-02-11T18:35:09.000+0000,19618,21,classic_bike,18003,18003
362087194BA4CC9A,23.0,2021-02-27T15:13:39.000+0000,2021-02-27T15:36:36.000+0000,16732,15,classic_bike,KP1705001026,KP1705001026
21630F715038CCB0,17.0,2021-02-20T08:59:42.000+0000,2021-02-20T09:17:04.000+0000,57068,45,classic_bike,KP1705001026,KP1705001026
