In [0]:
# Create dim_date

from pyspark.sql.functions import explode, sequence, to_date

beginDate = '1900-01-01'
endDate = '2050-12-31'

spark.sql(f"SELECT explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as date_key") \
    .createOrReplaceTempView('tmp_dates')

spark.sql(" \
    CREATE TABLE dim_dates \
    USING DELTA LOCATION '/delta/gold_dim_dates' \
    SELECT \
        date_key, \
        year(date_key) AS year, \
        month(date_key) AS month, \
        day(date_key) AS day, \
        dayofweek(date_key) AS day_of_week, \
        quarter(date_key) AS quarter \
    FROM tmp_dates \
    ORDER by date_key;")

In [0]:
# Payments

bronze_payments = spark.table("bronze_payments")
bronze_payments = bronze_payments \
    .withColumnRenamed("payment_id", "payment_key") \
    .withColumnRenamed("rider_id", "rider_key") \

bronze_payments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payments")

In [0]:
# Riders

bronze_riders = spark.table("bronze_riders")
bronze_riders = bronze_riders \
    .withColumnRenamed("rider_id", "rider_key") \
    .withColumnRenamed("account_date_start", "account_date_start_key") \
    .withColumnRenamed("account_date_end", "account_date_end_key") \

bronze_riders.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_riders")

In [0]:
# Station

bronze_stations = spark.table("bronze_stations")
bronze_stations = bronze_stations \
    .withColumnRenamed("station_id", "station_key")

bronze_stations.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_stations")

In [0]:
# Trips

from pyspark.sql.functions import *

riders = spark.table('dim_riders')
bronze_trips = spark.table("bronze_trips")

trips = bronze_trips.join(riders, riders.rider_key == bronze_trips.rider_id) \
    .withColumnRenamed('trip_id', 'trip_key') \
    .withColumnRenamed('rider_id', 'rider_key') \
    .withColumnRenamed('start_station_id', 'start_station_key') \
    .withColumnRenamed('end_station_id', 'end_station_key') \
    .withColumn('start_date_key', bronze_trips.started_at[0:11].cast("date")) \
    .withColumn('start_time', bronze_trips.started_at[12:20]) \
    .withColumn('end_date_key', bronze_trips.ended_at[0:11].cast("date")) \
    .withColumn('end_time', bronze_trips.ended_at[12:20]) \
    .withColumn('ride_duration', (bronze_trips.ended_at.cast("long") - bronze_trips.started_at.cast("long"))) \
    .withColumn('rider_age', floor(months_between(current_date(), riders.birthdate)/12)) \
    .select('trip_key', riders.rider_key, 'start_station_key', 'end_station_key', 'start_date_key', 'start_time', 'end_date_key', 'end_time', 'ride_duration', 'rideable_type', 'rider_age')


trips.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trips")