In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

In [0]:
#fact_payment

payments = spark.table("default.staging_payments")

In [0]:
display(payments.limit(5))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


In [0]:
# Write data to fact_payments
spark.sql("drop table if exists default.fact_payments")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")

In [0]:
#dim_rider

riders = spark.table("default.staging_riders")

In [0]:
display(riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


In [0]:
# Write data to dim_rider
spark.sql("drop table if exists default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")

In [0]:
#dim_stations

In [0]:
stations = spark.table("default.staging_stations")

In [0]:
display(stations.limit(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
# Write data to dim_station
spark.sql("drop table if exists default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

In [0]:
# fact_trips table
from pyspark.sql.functions import unix_timestamp

# Read data from tables
table1 = spark.read.format("delta").table("staging_trips")
table2 = spark.read.format("delta").table("dim_rider")

# Perform the join
joined_df = table1.join(table2, "rider_id")

# Calculate duration column
duration_df = joined_df.withColumn("duration", (unix_timestamp(col("ended_at")) - unix_timestamp(col("start_at")))/60)

#Calculate Rider Age column
rider_age_df = duration_df.withColumn("rider_age",(unix_timestamp(col("start_at")) - unix_timestamp(col("birthday")))/3600/24/365)

# Show the resulting DataFrame
rider_age_df.show()

+--------+----------------+-------------+-------------------+-------------------+----------------+--------------+----------+---------+--------------------+----------+------------------+----------------+---------+------------------+------------------+
|rider_id|         trip_id|rideable_type|           start_at|           ended_at|start_station_id|end_station_id|first_name|last_name|             address|  birthday|account_start_date|account_end_date|is_member|          duration|         rider_age|
+--------+----------------+-------------+-------------------+-------------------+----------------+--------------+----------+---------+--------------------+----------+------------------+----------------+---------+------------------+------------------+
|   71934|89E7AA6C29227EFF| classic_bike|2021-02-12 16:14:56|2021-02-12 16:21:43|             525|           660|   Ricardo|    Grant|34542 Smith Exten...|1983-08-26|        2020-12-27|            null|     true| 6.783333333333333| 37.495005580923

In [0]:
rider_age_df.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

In [0]:
# date dimension table

from pyspark.sql.functions import min, max

min_max_dates = table1.select(min("start_at").alias("min_date"), max("ended_at").alias("max_date")).first()
min_date = min_max_dates["min_date"]
max_date = min_max_dates["max_date"]

In [0]:
from pyspark.sql.functions import sequence, to_date

date_range_df = spark.range(0, (max_date - min_date).days + 1).selectExpr(f"date_add('{min_date}', cast(id as int)) as date")

In [0]:
from pyspark.sql.functions import date_format
from pyspark.sql.functions import weekofyear

date_dimension_df = date_range_df.select(
    "date",
    date_format("date", "yyyy").alias("year"),
    date_format("date", "Q").alias("quarter"),
    date_format("date", "MM").alias("month"),
    date_format("date", "dd").alias("day_of_month"),
    date_format("date", "E").alias("day_of_week"),
    #date_format("date", "w-yyyy").alias("WeekOfYear")
    
)
date_dimension_df = date_dimension_df.withColumn("week_of_year", weekofyear("date"))
date_dimension_df.show()

+----------+----+-------+-----+------------+-----------+------------+
|      date|year|quarter|month|day_of_month|day_of_week|week_of_year|
+----------+----+-------+-----+------------+-----------+------------+
|2021-02-01|2021|      1|   02|          01|        Mon|           5|
|2021-02-02|2021|      1|   02|          02|        Tue|           5|
|2021-02-03|2021|      1|   02|          03|        Wed|           5|
|2021-02-04|2021|      1|   02|          04|        Thu|           5|
|2021-02-05|2021|      1|   02|          05|        Fri|           5|
|2021-02-06|2021|      1|   02|          06|        Sat|           5|
|2021-02-07|2021|      1|   02|          07|        Sun|           5|
|2021-02-08|2021|      1|   02|          08|        Mon|           6|
|2021-02-09|2021|      1|   02|          09|        Tue|           6|
|2021-02-10|2021|      1|   02|          10|        Wed|           6|
|2021-02-11|2021|      1|   02|          11|        Thu|           6|
|2021-02-12|2021|   

In [0]:
date_dimension_df.write.format("delta").mode("overwrite").saveAsTable("default.dim_date")