## Loading data into delta lake

In [0]:
payments_column = ["payment_id", "date", "amount", "rider_id"]
riders_column = ["rider_id", "first", "last", "address", "birthday", "account_start_date", "account_end_date", "is_member"]
stations_column = ["station_id", "name", "latitude", "longitude"]
trips_column = ["trip_id", "rideable_type", "started_at", "ended_at", "start_station_id", "end_station_id", "rider_id"]

In [0]:
df_payments = spark.read.format("csv").option("inferSchema", "false").option("header", "false").load("/FileStore/data/payments.csv").toDF(*payments_column)
df_riders = spark.read.format("csv").option("inferSchema", "false").option("header", "false").load("/FileStore/data/riders.csv").toDF(*riders_column)
df_stations = spark.read.format("csv").option("inferSchema", "false").option("header", "false").load("/FileStore/data/stations.csv").toDF(*stations_column)
df_trips = spark.read.format("csv").option("inferSchema", "false").option("header", "false").load("/FileStore/data/trips.csv").toDF(*trips_column)


In [0]:
df_payments.write.format("delta").mode("overwrite").save("/delta/data/payments")
df_riders.write.format("delta").mode("overwrite").save("/delta/data/riders")
df_stations.write.format("delta").mode("overwrite").save("/delta/data/stations")
df_trips.write.format("delta").mode("overwrite").save("/delta/data/trips")

In [0]:
df_payments.write.format("delta").mode("overwrite").saveAsTable("payment_table")
df_riders.write.format("delta").mode("overwrite").saveAsTable("rider_table")
df_stations.write.format("delta").mode("overwrite").saveAsTable("station_table")
df_trips.write.format("delta").mode("overwrite").saveAsTable("trip_table")

## Extract

In [0]:
df_payments = spark.read.format("delta").load("/delta/data/payments")
df_riders = spark.read.format("delta").load("/delta/data/riders")
df_stations = spark.read.format("delta").load("/delta/data/stations")
df_trips = spark.read.format("delta").load("/delta/data/trips")

In [0]:
df_payments = df_payments.dropDuplicates(df_payments.columns)
df_riders = df_riders.dropDuplicates(df_riders.columns)
df_stations = df_stations.dropDuplicates(df_stations.columns)
df_trips = df_trips.dropDuplicates(df_trips.columns)


In [0]:
df_payments.write.format("delta").mode("overwrite").save("/delta/data/silver/payments")
df_riders.write.format("delta").mode("overwrite").save("/delta/data/silver/riders")
df_stations.write.format("delta").mode("overwrite").save("/delta/data/silver/stations")
df_trips.write.format("delta").mode("overwrite").save("/delta/data/silver/trips")

In [0]:
spark.sql("CREATE TABLE payment_table_silver USING DELTA LOCATION '/delta/data/silver/payments'")
spark.sql("CREATE TABLE rider_table_silver USING DELTA LOCATION '/delta/data/silver/riders'")
spark.sql("CREATE TABLE station_table_silver USING DELTA LOCATION '/delta/data/silver/stations'")
spark.sql("CREATE TABLE trip_table_silver USING DELTA LOCATION '/delta/data/silver/trips'")

DataFrame[]

## Transform

In [0]:
df_payments = spark.table("default.payment_table_silver")
df_riders = spark.table("default.rider_table_silver")
df_stations = spark.table("default.station_table_silver")
df_trips = spark.table("default.trip_table_silver")

In [0]:
from pyspark.sql import functions as F

df_fact_payment = df_payments.withColumn("date_id", F.regexp_replace("date", "-", "")).select("payment_id", "amount", "rider_id", "date_id")
df_fact_payment.write.format("delta").mode("overwrite").save("/delta/data/golden/payments")
df_fact_payment.write.format("delta").mode("overwrite").saveAsTable("fact_payment")
df_fact_payment.limit(10).display()

payment_id,amount,rider_id,date_id
35,9.0,1001,20191201
223,9.0,1010,20200101
251,9.0,1011,20170801
633,23.31,1025,20211201
758,9.0,1032,20190101
774,9.0,1032,20200501
936,9.0,1038,20200901
1132,9.0,1042,20190201
1221,9.0,1043,20190501
2005,9.0,1074,20210101


In [0]:
df_fact_trip = (df_trips.withColumn("trip_duration_min", (F.unix_timestamp("ended_at")-F.unix_timestamp("started_at"))/60)
                        .withColumn("date_id", F.regexp_replace(F.col("started_at").substr(1,10), "-", ""))
                        .join(df_riders, on="rider_id", how="left")
                        .withColumn("age_at_trip", F.col("started_at").substr(1,4).astype("int")-F.col("birthday").substr(1,4).astype("int"))
                        .select("trip_id", "trip_duration_min", "rider_id", "start_station_id", "end_station_id", "date_id", "age_at_trip")
                )
df_fact_trip.write.format("delta").mode("overwrite").save("/delta/data/golden/trips")
df_fact_trip.write.format("delta").mode("overwrite").saveAsTable("fact_trip")
df_fact_trip.limit(10).display()

trip_id,trip_duration_min,rider_id,start_station_id,end_station_id,date_id,age_at_trip
710CF541AD1AABA3,8.733333333333333,70413,623,13263,20210203,16
A164800246059225,4.6,42130,KA1503000034,TA1305000029,20210205,35
7EDA0DB96BE86245,8.7,71398,13221,637,20210202,28
3F0DE8D472139C38,4.966666666666667,74058,18067,TA1307000166,20210225,27
7BE513DB779DB252,14.05,74015,TA1307000151,KA1503000041,20210222,18
706DA4E8CCBB466C,13.883333333333333,42391,KA1503000034,15529,20210222,21
9006903BEB00F908,3.816666666666667,45191,13138,15550,20210224,25
540EEBEAF8A2CCDF,10.55,47434,KA1504000159,KA1504000156,20210201,34
6B69584E5D396F98,5.6,3181,13128,TA1307000039,20210223,52
01AC03FCDECFE5D8,27.28333333333333,67432,TA1309000004,TA1305000010,20210226,41


In [0]:
df_dim_rider = (df_riders.withColumn("name", F.concat("first", F.lit(" "), "last"))
                         .withColumn("age_at_account_start", F.col("account_start_date").substr(1,4).astype("int")-F.col("birthday").substr(1,4).astype("int"))
                         .select("rider_id", "name", "is_member", "age_at_account_start")            
                )
df_dim_rider.write.format("delta").mode("overwrite").save("/delta/data/golden/riders")
df_dim_rider.write.format("delta").mode("overwrite").saveAsTable("dim_rider")
df_dim_rider.limit(10).display()

rider_id,name,is_member,age_at_account_start
1153,James Harrison,True,16
1181,Brandon Robinson,True,21
1313,Nathan Trevino,True,13
1436,Vincent Clark,True,36
1670,Rebecca Freeman,False,25
1758,Andrew Johnson,False,19
1831,Jennifer Garcia,True,25
1936,Brenda Brady,True,29
2064,Joshua Khan,True,46
2109,Melissa Vasquez,True,23


In [0]:
df_dim_station = df_stations.select("station_id", "name")
df_dim_station.write.format("delta").mode("overwrite").save("/delta/data/golden/stations")
df_dim_station.write.format("delta").mode("overwrite").saveAsTable("dim_station")
df_dim_station.limit(10).display()

station_id,name
13277,Broadway & Belmont Ave
RP-002,Warren Park East
16916,Central Ave & Madison St
319,Roscoe & Harlem
TA1305000002,Wabash Ave & Roosevelt Rd
KA1503000007,Cornell Ave & Hyde Park Blvd
TA1306000007,Larrabee St & Menomonee St
15687,Central Park Ave & 24th St
KA1504000160,Francisco Ave & Foster Ave
13325,Broadway & Waveland Ave


In [0]:
import pandas as pd
from datetime import datetime

def get_date_info(row):
    date = row["date"][0:10]
    date_id = int(date.replace("-", ""))
    year = int(date[0:4])
    quarter = int(pd.DatetimeIndex([pd.to_datetime(date)]).quarter[0])
    month = int(date[5:7])
    day = int(date[8:10])
    weekday = int(datetime.strptime(date, "%Y-%m-%d").weekday())
    return date_id, year, quarter, month, day, weekday


In [0]:
dim_date_columns = ["date_id", "year", "quarter", "month", "day", "weekday"]
df_dim_date = (df_trips.select("started_at").withColumnRenamed("started_at", "date").rdd.map(get_date_info).toDF(dim_date_columns)
               .union(df_trips.select("ended_at").withColumnRenamed("ended_at", "date").rdd.map(get_date_info).toDF(dim_date_columns))
               .union(df_payments.select("date").rdd.map(get_date_info).toDF(dim_date_columns))
               .dropDuplicates(dim_date_columns)
)
df_dim_date.write.format("delta").mode("overwrite").save("/delta/data/golden/dates")
df_dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")
df_dim_date.limit(10).display()

date_id,year,quarter,month,day,weekday
20210214,2021,1,2,14,6
20210206,2021,1,2,6,5
20210203,2021,1,2,3,2
20210211,2021,1,2,11,3
20210227,2021,1,2,27,5
20210202,2021,1,2,2,1
20210220,2021,1,2,20,5
20210226,2021,1,2,26,4
20210201,2021,1,2,1,0
20210219,2021,1,2,19,4
