<h1>Transform data into Bikeshare star model</h1>

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType


<h3>Create payment_fact table</h3>

<!-- >>Use Spark and Databricks to run ELT processes by creating fact tables
<!-- The fact table Python scripts should **contain appropriate keys from the dimensions.** 
In addition, the fact table scripts should appropriately **generate the correct facts based on the diagrams** provided in the first step. --> 


In [None]:
# Loading data from "staging_payments" table to the Dataframe 'payments'
payments = spark.table("DEFAULT.staging_payments")  

In [None]:
# Verify the DataFrame
display(payments.limit(10)) 

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [None]:
# Write data into the fact table payment_fact


#------ Drop the table "payment_fact" in the "default" database if it exists. 
spark.sql("DROP TABLE IF EXIST DEFAULT.payment_fact")

payments.dropDuplicates(["payment_id"]) # drops duplicate rows in the DataFrame based on the primary key "payment_id"
        .write.format("delta") # write the DataFrame in Delta Lake format.
        .mode("overwrite") # overwriting existing file in the directory
        .saveAsTable("default.payment_fact") # saves the DataFrame as a table named "fact_payments" in default database using Delta Lake format.

<h3> Create rider_dim table </h3>


In [None]:
# Loading data from "staging_riders" table to the Dataframe 'riders'
riders = spark.table("DEFAULT.staging_riders") 

In [None]:
# Verify the DataFrame
display(riders.limit(10)) 

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


In [None]:
# Write data into the dimension table rider_dim


spark.sql("DROP TABLE IF EXIST DEFAULT.rider_dim")
riders.dropDuplicates(["rider_id"])
      .write.format("delta")
      .mode("overwrite")
      .saveAsTable("default.rider_dim")

<h3>Creating station_dim table </h3>

In [None]:
#loading data from "staging_stations" table to the Dataframe 'stations'
stations = spark.table("default.staging_stations")


In [None]:
display(stations.limit(10)) #verify the DataFrame

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [None]:
##################################################
# Write data into the dimension table station_dim
##################################################

spark.sql("drop table if exists default.station_dim")
stations.dropDuplicates(["station_id"])
        .write.format("delta")
        .mode("overwrite")
        .saveAsTable("default.station_dim")

<h3>Creating trip_fact table</h3>


In [None]:
#loading data from "staging_trips" table to the Dataframe 'trips'
trips = spark.table("default.staging_trips")


In [None]:
display(trips.limit(10)) #verify the DataFrame

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608
BDAA7E3494E8D545,electric_bike,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,18003,KP1705001026,36267
A772742351171257,classic_bike,2021-02-01T17:47:42.000+0000,2021-02-01T17:48:33.000+0000,KP1705001026,KP1705001026,50104
295476889D9B79F8,classic_bike,2021-02-11T18:33:53.000+0000,2021-02-11T18:35:09.000+0000,18003,18003,19618
362087194BA4CC9A,classic_bike,2021-02-27T15:13:39.000+0000,2021-02-27T15:36:36.000+0000,KP1705001026,KP1705001026,16732
21630F715038CCB0,classic_bike,2021-02-20T08:59:42.000+0000,2021-02-20T09:17:04.000+0000,KP1705001026,KP1705001026,57068


<h3>Create the new DataFrame trip_fact from the previous two DataFrame trips and riders.</h3>


In [None]:

# Create the new DataFrame trip_fact from the previous two DataFrame trips and riders.

# Left join between the trips DataFrame (aliased as "df1") and the riders DataFrame (aliased as "df2") based on the "rider_id" 
fact_trips = trips.alias("df1").join(riders.alias("df2"), 
                                     col("df1.rider_id") == col("df2.rider_id"), 
                                     "left"
                                     ) \
    
            # Adds a new column "duration" to df by taking the time difference between the "ended_at" and "start_at" columns
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
                
            # Adds a new column named "rider_age" to df, by taking the time difference between the "account_start_date" and "birthday" column
            .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
            
            # renames the columns "start_at" and "ended_at" to "start_time_id" and "end_time_id," respectively
            .withColumnRenamed('start_at', 'start_time_id') \
            .withColumnRenamed('ended_at', 'end_time_id') \
            
            #select columns for the df
            .select("trip_id",col("df1.rider_id"), "rideable_type", "start_station_id", "end_station_id", 
                                                "start_time_id", "end_time_id", "duration", "rider_age")
            
            
display(fact_trips.limit(10))

trip_id,rider_id,rideable_type,start_station_id,end_station_id,start_time_id,end_time_id,duration,rider_age
89E7AA6C29227EFF,71934,classic_bike,525,660,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,7.0,13638.0
0FEFDE2603568365,47854,classic_bike,525,16806,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,20.0,13523.0
E6159D746B2DBB91,70870,electric_bike,KA1503000012,TA1305000029,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,9.0,12051.0
B32D3199F1C2E75B,58974,classic_bike,637,TA1305000034,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,4.0,7107.0
83E463F23575F4BF,39608,electric_bike,13216,TA1309000055,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,15.0,25961.0
BDAA7E3494E8D545,36267,electric_bike,18003,KP1705001026,2021-02-24T15:43:33.000+0000,2021-02-24T15:49:05.000+0000,6.0,10176.0
A772742351171257,50104,classic_bike,KP1705001026,KP1705001026,2021-02-01T17:47:42.000+0000,2021-02-01T17:48:33.000+0000,1.0,10697.0
295476889D9B79F8,19618,classic_bike,18003,18003,2021-02-11T18:33:53.000+0000,2021-02-11T18:35:09.000+0000,1.0,7767.0
362087194BA4CC9A,16732,classic_bike,KP1705001026,KP1705001026,2021-02-27T15:13:39.000+0000,2021-02-27T15:36:36.000+0000,23.0,5626.0
21630F715038CCB0,57068,classic_bike,KP1705001026,KP1705001026,2021-02-20T08:59:42.000+0000,2021-02-20T09:17:04.000+0000,17.0,16461.0


In [None]:
# Write data into the dimension table trip_fact

spark.sql("drop table if exists default.trip_fact")
fact_trips.write.format("delta")
          .mode("overwrite")
          .saveAsTable("default.trip_fact")

<h3>Creating dates_dim table</h3>


In [None]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

# calculates the minimum date from the "trips" DataFrame and stores it in the variable date_min.
date_min = trips.selectExpr('MIN(start_at) AS started_at').first().asDict()['started_at']

# calculates the maximum date from the "trips" DataFrame and adds 5 years to it, storing the result in the variable date_max.
date_max = trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at').first().asDict()['started_at']


# represents a sequence of dates from the date_min to the date_max with an interval of 1 day. 
dates_range = f"sequence(to_date('{date_min}'), to_date('{date_max}'), interval 1 day)" #to generate a date range for the date dimension table.

# creates a PySpark DataFrame named dates_dim with a single row and one column named "time_id." 
dates_dim = spark.createDataFrame([(1,)], ["time_id"]) #as a base DataFrame to construct the date dimension table.

# add to DF two new columns: "dateinit" and "date."
dates_dim = dates_dim.withColumn("dateinit", f.explode(f.expr(dates_range))) # each date in the sequence is exploded and becomes a separate row, held by "dateinit" column 
dates_dim = dates_dim.withColumn("date", f.to_timestamp(dates_dim.dateinit, "YYYY-MM-DD")) #convert the date strings from the "dateinit" column to timestamp format.


# The dim_time DataFrame is further enhanced to include additional date attributes and their datatype.
dates_dim = dates_dim \
            .withColumn("time_id", dates_dim.date.cast(StringType())) #representing the dates as strings in the format "YYYY-MM-DD" 
            .withColumn("day", dates_dim.date) \
            .withColumn("day_of_week", f.dayofweek(dates_dim.date)) \
            .withColumn("week_of_year", f.weekofyear(dates_dim.date)) \
            .withColumn("month", f.month(dates_dim.date)) \
            .withColumn("quarter", f.quarter(dates_dim.date)) \
            .withColumn("year", f.year(dates_dim.date)) \
               
            .drop(f.col("dateinit")) #drops the "dateinit" column from the DataFrame

In [None]:
# Verify the newly created DataFramne
display(dates_dim.selectExpr('MIN(date)').show())
display(dates_dim.selectExpr('MAX(date)').show())

+-------------------+
|          min(date)|
+-------------------+
|2021-02-01 00:00:00|
+-------------------+

+-------------------+
|          max(date)|
+-------------------+
|2027-01-31 00:00:00|
+-------------------+



In [None]:
# Write data into the dimension table dates_dim

spark.sql("drop table if exists default.dates_dim")
dates_dim.write.format("delta")
                .mode("overwrite")
                .saveAsTable("default.dates_dim")