In [0]:
# imports
from pyspark.sql.functions import *

## Riders Dimension Table, Payments Fact Table, Stations Dimension Table

In [0]:
# riders dimension table
df = spark.table("default.riders")
df.write.format("delta").mode("overwrite").saveAsTable("riders_dim")

In [0]:
# payments fact table
df = spark.table("default.payments")
df.write.format("delta").mode("overwrite").saveAsTable("payments_fact")

In [0]:
# stations dimension table
df = spark.table("default.stations")
df.write.format("delta").mode("overwrite").saveAsTable("stations_dim")

## Trip Fact Table

In [0]:
df = spark.table("default.trips")

In [0]:
# add duration
df = df.withColumn('duration_in_seconds', unix_timestamp("ended_at") - unix_timestamp('started_at'))

In [0]:
# add started_date, ended_date
df = df.withColumn("started_date", to_date(df.started_at)).withColumn("ended_date", to_date(df.ended_at))

In [0]:
# add age
df_riders = spark.table("default.riders")

df_join = df.join(df_riders, df.rider_id == df_riders.rider_id, "left").select(df["*"], df_riders["birthday"])

df = df_join.withColumn("age", year(df_join.started_date) - year(df_join.birthday)).drop('birthday')

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("trips_fact")

## Dates Dimension Table

In [0]:
df_pyments = spark.table("default.payments")

In [0]:
df_trips_fact = spark.table("default.trips_fact")

In [0]:
# earlist date and latest date (plus 5 years)
df_date_thres = df_trips_fact.select("started_date").union(df_pyments.select("date"))\
                            .agg(add_months(max('started_date'), 60).alias('max_date'), min('started_date').alias('min_date'))

In [0]:
# create dates dimension table
df_dates = spark.sql(f"SELECT explode(sequence(to_date('{df_date_thres.take(1)[0].min_date}'), to_date('{df_date_thres.take(1)[0].max_date}'), interval 1 day)) as date")

In [0]:
df = df_dates.withColumn('year', year('date')).withColumn('month', month('date')).withColumn('day_of_month', dayofmonth('date'))

In [0]:
df.write.format("delta").mode("overwrite").saveAsTable("dates_dim")