### Extract 

- Read the csv files data from DBFS and save as delta file format.

In [0]:
def load_raw_to_delta(read_format, 
                      inferSchema, 
                      header, 
                      sep, 
                      schema, 
                      read_path, 
                      write_format, 
                      write_mode, 
                      write_path):
    """
    Function to read raw file from dbfs and save in delta file format
    """
    spark.read.format(read_format) \
    .option("inferSchema", inferSchema) \
    .option("header", header) \
    .option("sep", sep) \
    .schema(schema) \
    .load(read_path) \
    .write.format(write_format) \
    .mode(write_mode) \
    .save(write_path)

In [0]:
read_format = "csv"
inferSchema = "false"
header = "false"
sep = ","
write_format = "delta"
write_mode = "overwrite"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DateType, DoubleType, StringType, BooleanType, FloatType

In [0]:
payments_schema = StructType([ \
    StructField("payment_id",IntegerType(),True), \
    StructField("date",DateType(),True), \
    StructField("amount",DoubleType(),True), \
    StructField("rider_id", IntegerType(), True) \
  ])

riders_schema = StructType([ \
    StructField("rider_id",IntegerType(),True), \
    StructField("first_name",StringType(),True), \
    StructField("last_name",StringType(),True), \
    StructField("address", StringType(), True), \
    StructField("birthday", DateType(), True), \
    StructField("account_start_date", DateType(), True), \
    StructField("account_end_date", DateType(), True), \
    StructField("is_member", BooleanType(), True) \
  ]) 
     
stations_schema = StructType([ \
    StructField("station_id", StringType(),True), \
    StructField("name", StringType(),True), \
    StructField("latitude", FloatType(),True), \
    StructField("longitude", FloatType(), True) \
  ])

trips_schema = StructType([ \
    StructField("trip_id", StringType(),True), \
    StructField("rideable_type", StringType(),True), \
    StructField("start_at", TimestampType(),True), \
    StructField("ended_at", TimestampType(), True), \
    StructField("start_station_id", StringType(), True), \
    StructField("end_station_id", StringType(), True), \
    StructField("rider_id", IntegerType(), True) \
  ])

# Create delta tables for the 
tables = ["payments", "riders", "stations", "trips"]
schemas = [payments_schema, riders_schema, stations_schema, trips_schema]

for table, schema in zip(tables, schemas):

    read_path = "/FileStore/raw/{}.csv".format(table)
    write_path = "/delta/bronze/{}".format(table)
    # print(read_path)
    # print(write_path)
    
    load_raw_to_delta(read_format, inferSchema, header, sep, schema, read_path, write_format, write_mode, write_path)

### Load

- Create delta tables(bronze) using the delta files

In [0]:
# Create bronze tables

for table in tables:
    spark.sql("CREATE TABLE {} USING DELTA LOCATION '/delta/bronze/{}'".format(table, table))

### Transform

- Create fact & dimension tables(gold) using delta bronze tables

In [0]:
from pyspark.sql import functions as F

In [0]:
# Create "dim_riders" table

df_bronze_rider = spark.read.format("delta").load("/delta/bronze/riders")
# print(df_bronze_rider)

df_dim_rider = (df_bronze_rider
                .withColumn("rider_age_at_account_start", F.floor(F.datediff("account_start_date", "birthday") / 365 ).cast(IntegerType()))
                .select("rider_id", "first_name", "last_name", "address", "birthday", "account_start_date", "account_end_date"
                        , "rider_age_at_account_start", "is_member")
               )
# print(df_dim_rider.head())

df_dim_rider.write.format("delta") \
                .mode("overwrite") \
                .save("/delta/gold/dim_rider")

In [0]:
# Create "dim_station" table

df_bronze_station = spark.read.format("delta").load("/delta/bronze/stations")
# print(df_bronze_station)

df_dim_station = df_bronze_station
# print(df_dim_station.head())

df_dim_station.write.format("delta") \
                .mode("overwrite") \
                .save("/delta/gold/dim_station")

In [0]:
# Create "fact_payment" table

df_bronze_payment = spark.read.format("delta").load("/delta/bronze/payments")
# print(df_bronze_payment)

df_fact_payment = df_bronze_payment.withColumnRenamed("date", "payment_date").select("payment_id", "rider_id", "payment_date", "amount")
# print(df_fact_payment.head())

df_fact_payment.write.format("delta") \
                .mode("overwrite") \
                .save("/delta/gold/fact_payment")

In [0]:
# Create "fact_trip" table

df_bronze_trip = spark.read.format("delta").load("/delta/bronze/trips")
# print(df_bronze_trip)

df_fact_trips = (df_bronze_trip.join(df_bronze_rider, on=['rider_id'])
                 .select("trip_id", "rider_id", "start_at", "ended_at", "start_station_id", "end_station_id", "rideable_type", "birthday")
                 .withColumn('rider_age_on_started_at', F.floor(F.datediff(F.col('start_at'), F.col('birthday')) / 365.25).cast(IntegerType()))
                 .withColumn('duration_minutes', F.floor((F.unix_timestamp(F.col('ended_at')) - F.unix_timestamp(F.col('start_at'))) / 60).cast(IntegerType()))
                 .select("trip_id", "rider_id", "start_at", "ended_at", "start_station_id", "end_station_id"
                         , "rideable_type", "rider_age_on_started_at", "duration_minutes")
               )
# print(df_fact_trips.head())

df_fact_trips.write.format("delta") \
                .mode("overwrite") \
                .save("/delta/gold/fact_trip")

In [0]:
# Create "dim_date" table

from datetime import timedelta
import pandas as pd

start_date = pd.to_datetime(df_bronze_trip.agg({'start_at': 'min'}).first()[0])

ADD_10_YEARS = 365 * 10

end_date = start_date + timedelta(days=ADD_10_YEARS)

print(start_date)
print(end_date)

df = pd.DataFrame(pd.date_range(start=start_date, end=end_date, freq="H"))
# print(df)

df_date = spark.createDataFrame(df, schema='date_key timestamp')
# print(df_date.head(5))

df_dim_date = (df_date.select("date_key")
               .withColumn('year', F.year(F.col('date_key')))
               .withColumn('quarter', F.quarter(F.col('date_key')))
               .withColumn('month', F.month(F.col('date_key')))
               .withColumn('day', F.date_format(F.col('date_key'), "d"))
               .withColumn('week_day', F.dayofweek(F.col('date_key')))
               .withColumn('hour', F.date_format(F.col('date_key'), "H"))
              )

# print(df_dim_date.head())

df_dim_date.write.format("delta") \
                .mode("overwrite") \
                .save("/delta/gold/dim_date")

In [0]:
# Create delta fact & dimensions tables

tables = ["dim_date", "dim_rider", "dim_station", "fact_payment", "fact_trip"]

for table in tables:
    spark.sql("CREATE TABLE {} USING DELTA LOCATION '/delta/gold/{}'".format(table, table))

In [0]:
# dbutils.fs.rm("/delta", recurse=True)