In [0]:
min_date = spark.sql('''
    SELECT MIN(started_at) as started_at FROM stg_trips
''')

min_date = min_date.first().asDict()['started_at']

max_date = spark.sql('''
    SELECT DATEADD(year, 5, MAX(started_at)) as started_at FROM stg_trips
''')
max_date = max_date.first().asDict()['started_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

# Create dimDate
dimDate = spark.createDataFrame([(1,)], ["time_id"])

dimDate = dimDate.withColumn("dateinit", f.explode(f.expr(expression)))
dimDate = dimDate.withColumn("date", f.to_timestamp(dimDate.dateinit, "yyyy-MM-dd"))
dimDate = dimDate\
    .withColumn("year", f.year(dimDate.date)) \
    .withColumn("quarter", f.quarter(dimDate.date)) \
    .withColumn("month", f.month(dimDate.date)) \
    .withColumn("day", f.dayofweek(dimDate.date)) \
    .withColumn("date_id", dimDate.date.cast(StringType())) \
    .drop("dateinit")

In [0]:
#dimDate
dimDate.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dimDate")

In [0]:
#dimRider
dimRider = spark.sql('''
    select rider_id, firstName, lastName, address, birthday, startDate, endDate, isMember from stg_riders
''')
dimRider.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("dimRider")

In [0]:
#dimStation
dimStation = spark.sql('''
    select station_id, name, latitude , longitude from stg_stations
''')
dimStation.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("dimStation")

In [0]:
# factPayment
factPayment = spark.sql('''
    select  stg_payments.payment_id,
            stg_payments.amount,
            stg_payments.rider_id,
            dimDate.date_id                
    from stg_payments
    join dimDate on dimDate.date=stg_payments.date
''')
factPayment.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable("factPayment")

In [0]:
#factTrip
factTrip = spark.sql('''
    SELECT distinct
        stg_trips.trip_id,
        stg_riders.rider_id,
        stg_trips.start_station_id, 
        stg_trips.end_station_id, 
        start_time.date_id                                                  AS start_at,
        end_time.date_id                                                    AS end_at,
        stg_trips.rideable_type,
        DATEDIFF(hour, stg_trips.started_at, stg_trips.ended_at)            AS trip_duration,
        DATEDIFF(year, stg_riders.birthday, stg_trips.started_at)           AS rider_age

    FROM stg_trips
    JOIN stg_riders                ON stg_riders.rider_id = stg_trips.rider_id
    JOIN stg_stations AS s1        ON s1.station_id = stg_trips.start_station_id
    JOIN stg_stations AS s2        ON s2.station_id = stg_trips.end_station_id
    JOIN dimDate AS start_time     ON cast(start_time.date_id as date) = cast(stg_trips.started_at as date)
    JOIN dimDate AS end_time       ON cast(end_time.date_id as date) = cast(stg_trips.ended_at as date)
        
''')

factTrip.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("factTrip")