In [0]:
from pyspark.sql.types import *

In [0]:
spark.sql("""
        CREATE TABLE Dim_Rider
        USING DELTA LOCATION '/delta/bronze_riders'
        """)

In [0]:
gold_dim_rider = spark.sql("""
        SELECT rider_id,
               first, 
               last, 
               address,
               birthday,
               account_start_date,
               account_end_date,
               CASE is_member WHEN 'True' THEN 'Member'
                     ELSE 'Casual Rider'
                     END AS member_status
               FROM riders
        """)

In [0]:
gold_dim_rider.show()

In [0]:
gold_dim_rider.write \
  .format("DELTA") \
  .mode("overwrite") \
  .saveAsTable('dim_rider')

In [0]:
spark.sql("""
        CREATE TABLE dim_stations
        USING DELTA LOCATION '/delta/bronze_stations'
""")

In [0]:
gold_dim_station = spark.sql("""
                          SELECT station_id,
                          name,
                          latitude,
                          longitude
                          FROM stations
                        """)

In [0]:
gold_dim_station.write \
  .format("DELTA") \
  .mode("overwrite") \
  .saveAsTable('dim_stations')

In [0]:

spark.sql("""
        CREATE TABLE fact_trips
        USING DELTA LOCATION '/delta/bronze_trips'
        """)

In [0]:
spark.sql("""
        CREATE TABLE fact_payments
        USING DELTA LOCATION '/delta/bronze_payments'
         """)

In [0]:
gold_fact_payment = spark.sql("""
         SELECT 
            fact_payments.payment_id,
            bigint(date_format(fact_payments.date, 'yyyyMMddHH')) as date,
            dim_rider.rider_id aS rider_id,
            fact_payments.amount as amount
            FROM fact_payments
            LEFT JOIN dim_rider
            ON fact_payments.rider_id = dim_rider.rider_id;
         """)

In [0]:
gold_fact_payment.write \
  .format("DELTA") \
  .mode("overwrite") \
  .saveAsTable('fact_payment')

In [0]:
gold_fact_trips = spark.sql("""
          SELECT 
            CONCAT(fact_trips.trip_id, CAST(dim_rider.rider_id AS varchar(10))) AS rideable_type,
            fact_trips.start_at as starts_at,
            fact_trips.ended_at as ends_at,
            fact_trips.start_station_id AS station_id,
            fact_trips.end_station_id AS end_station_id,
            dim_rider.rider_id AS rider_id,
            INT(months_between(dim_rider.account_start_date, dim_rider.birthday) / 12) AS rider_age,
            BIGINT(fact_trips.ended_at) - BIGINT(fact_trips.start_at)  AS trip_duration
            FROM fact_trips
            LEFT JOIN dim_rider
            ON fact_trips.rider_id = dim_rider.rider_id;
         """)

In [0]:
gold_fact_trips.write \
  .format("DELTA") \
  .mode("overwrite") \
  .saveAsTable('fact_trip')

In [0]:
beginDate = '2000-01-31'
endDate = '2022-09-01'

(
  spark.sql(f"select explode(sequence(to_timestamp('{beginDate}'), to_timestamp('{endDate}'), interval 1 hour)) as calendarDateTime")
    .createOrReplaceTempView('datetimes')
)


In [0]:
gold_dim_dates = spark.sql("""
     SELECT bigint(date_format(calendarDateTime, 'yyyyMMddHH')) AS date,
     hour(calendarDateTime) AS hour,
     dayofweek(calendarDateTime) AS day_of_week,
     dayofmonth(calendarDateTime) AS day_of_month,
     extract(week FROM calendarDateTime) AS week_of_year,
     date_format(calendarDateTime, 'Q') AS quarter,
     extract(month FROM calendarDateTime) AS month,
     extract(year FROM calendarDateTime) AS year
    FROM datetimes
""")

In [0]:
gold_dim_dates.show()

In [0]:
gold_dates.write \
  .format("DELTA") \
  .mode("overwrite") \
  .saveAsTable('dim_dates')