The code should be run in a notebook in Azure Databricks. A prerequisite and manual step: upload files (payments.csv, riders.csv, stations.csv and trips.csv) to Delta using Databricks.

# Extract

In [None]:
df = spark.read.format('csv') \
    .option('inferSchema', 'false') \
    .option('header', 'false') \
    .option('sep', ',') \
    .load('/FileStore/tables/demo/payments.csv')
df = df.toDF('payment_id', 'date', 'amount', 'rider_id')
df.write.format('delta').mode('overwrite').save('/delta/payments')

df = spark.read.format('csv') \
    .option('inferSchema', 'false') \
    .option('header', 'false') \
    .option('sep', ',') \
    .load('/FileStore/tables/demo/riders.csv')
df = df.toDF('rider_id', 'first', 'last', 'address', 'birthday', 'account_start_date', 'account_end_date', 'is_member')
df.write.format('delta').mode('overwrite').save('/delta/riders')

df = spark.read.format('csv') \
    .option('inferSchema', 'false') \
    .option('header', 'false') \
    .option('sep', ',') \
    .load('/FileStore/tables/demo/stations.csv')
df = df.toDF('station_id', 'name', 'latitude', 'longitude')
df.write.format('delta').mode('overwrite').save('/delta/stations')

df = spark.read.format('csv') \
    .option('inferSchema', 'false') \
    .option('header', 'false') \
    .option('sep', ',') \
    .load('/FileStore/tables/demo/trips.csv')
df = df.toDF('trip_id', 'rideable_type', 'start_at', 'ended_at', 'start_station_id', 'end_station_id', 'rider_id')
df.write.format('delta').mode('overwrite').save('/delta/trips')

In [0]:
spark.sql('DROP TABLE IF EXISTS payments')
spark.sql('DROP TABLE IF EXISTS riders')
spark.sql('DROP TABLE IF EXISTS stations')
spark.sql('DROP TABLE IF EXISTS trips')

spark.sql("CREATE TABLE payments USING DELTA LOCATION '/delta/payments'")
spark.sql("CREATE TABLE riders USING DELTA LOCATION '/delta/riders'")
spark.sql("CREATE TABLE stations USING DELTA LOCATION '/delta/stations'")
spark.sql("CREATE TABLE trips USING DELTA LOCATION '/delta/trips'")

# Load and Transform

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F

In [0]:
df = spark.table('payments')
df.write.format('delta').mode('overwrite').saveAsTable('fact_payment')

display(df)

In [0]:
df_1 = spark.table('trips')
df_1 = df_1.withColumn(
    'time_of_day',
    F.when((F.hour("start_at") >= 6) & (F.hour("start_at") <= 11), "Morning")
     .when((F.hour("start_at") >= 12) & (F.hour("start_at") <= 17), "Afternoon")
     .when((F.hour("start_at") >= 18) & (F.hour("start_at") <= 21), "Evening")
     .otherwise("Night")
)
df_1 = df_1.withColumn(
    'trip_duration',
    (unix_timestamp(col('ended_at')) - unix_timestamp(col('start_at'))) / 60
)

df_2 = spark.table('riders').select('rider_id', 'birthday')
df_1 = df_1.join(df_2, on = 'rider_id', how = 'inner')
df_1 = df_1.withColumn('birthday_timestamp', to_timestamp(col('birthday'), 'yyyy-MM-dd'))
df_1 = df_1.withColumn(
    'rider_age',
    floor((unix_timestamp(col('start_at')) - unix_timestamp(col('birthday_timestamp'))) / (365.25 * 24 * 60 * 60))
)
df_1 = df_1.drop('birthday_timestamp')

order = ['trip_id', 'rideable_type', 'start_at', 'ended_at', 'trip_duration', 'start_station_id', 'end_station_id', 'rider_id', 'rider_age', 'time_of_day']
df_1 = df_1.select(*order)

df_1.write.format('delta').mode('overwrite').saveAsTable('fact_trip')

display(df_1)

In [0]:
df = spark.table('stations')
df.write.format('delta').mode('overwrite').saveAsTable('dim_station')

display(df)

In [0]:
df = spark.table('riders')
df.write.format('delta').mode('overwrite').saveAsTable('dim_rider')

display(df)

In [0]:
df_1 = spark.table('trips')
df_2 = spark.table('payments')

df_3 = df_1.select(F.col('start_at').alias('date')) \
    .union(df_2.select(F.col('date').alias('date'))) \
    .distinct()

df_3 = df_3.withColumn('day_of_week', F.date_format('date', 'EEEE')) \
           .withColumn('month', F.month('date')) \
           .withColumn('quarter', F.quarter('date')) \
           .withColumn('year', F.year('date'))

spark.sql('DROP TABLE IF EXISTS dim_date')
df_3.write.format('delta').mode('overwrite').saveAsTable('dim_date')

display(df_3)