In this notebook, I execute and document extract, transform and load steps

# Extract from csv to delta

## Payments data

In [0]:
df_payments = spark.read.format('csv') \
    .option('inferSchema','true') \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/tables/payments.csv') 

In [0]:
df_payments.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: date (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)



In [0]:
#rename columns
cols = ['payment_id','date','amount','rider_id']
df_payments = df_payments.toDF(*cols)

In [0]:
display(df_payments.limit(5))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


In [0]:
#write to delta location
df_payments.write.format('delta').mode('overwrite').save('/delta/payments')

## Riders data

In [0]:
df_riders = spark.read.format('csv') \
    .option('inferSchema','true') \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/tables/riders.csv') \
    .toDF('rider_id','first_name','last_name','address','birthday','account_start_date','account_end_date','is_member')

display(df_riders.limit(5))

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


In [0]:
#write to delta location
df_riders.write.format('delta').mode('overwrite').save('/delta/riders')

## Trips data

In [0]:
df_trips = spark.read.format('csv') \
    .option('inferSchema','true') \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/tables/trips.csv') \
    .toDF('trip_id','rideable_type','start_at','ended_at','start_station_id','end_station_id','rider_id')

display(df_trips.limit(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608


In [0]:
#write to delta location
df_trips.write.format('delta').mode('overwrite').save('/delta/trips')

## Stations data

In [0]:
df_stations = spark.read.format('csv') \
    .option('inferSchema','true') \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/tables/stations.csv') \
    .toDF('station_id','name','latitude','longitude')

display(df_stations.limit(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
#write to delta location
df_stations.write.format('delta').mode('overwrite').save('/delta/stations')

# Load into Delta Tables

In [0]:
spark.sql('create schema if not exists divvy')

Out[33]: DataFrame[]

## Payments table

In [0]:
spark.sql("""
    create table if not exists divvy.staging_payments using delta location '/delta/payments'
""")

Out[34]: DataFrame[]

In [0]:
%sql

select * from divvy.staging_payments
limit 5

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000


## Riders table

In [0]:
spark.sql("""
    create table if not exists divvy.staging_riders using delta location '/delta/riders'
""")

Out[37]: DataFrame[]

## Trips table

In [0]:
spark.sql("""
    create table if not exists divvy.staging_trips using delta location '/delta/trips'
""")

Out[38]: DataFrame[]

## Stations table

In [0]:
spark.sql("""
    create table if not exists divvy.staging_stations using delta location '/delta/stations'
""")

Out[39]: DataFrame[]

# Transform into star schema

In [0]:
from pyspark.sql import functions as f

## Transform Payments

In [0]:
#load table into dataframe
payments = spark.sql("select * from divvy.staging_payments") \
    .withColumn('date_id',f.date_format(f.col('date'),'YYYYMMDD')) \
    .select('payment_id','date_id','amount','rider_id')

#create fact payments table with override mode
payments.write.format('delta').mode('overwrite').saveAsTable('divvy.fact_payments')

## Transform Riders

In [0]:
#load table into dataframe
riders = spark.sql("select * from divvy.staging_riders")

#create dim riders table with override mode
riders.write.format('delta').mode('overwrite').saveAsTable('divvy.dim_riders')

In [0]:
%sql
select * from divvy.dim_riders
limit 5

rider_id,first_name,last_name,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


## Transform Stations

In [0]:
#load table into dataframe
stations = spark.sql("select * from divvy.staging_stations")

#create dim stations table with override mode
stations.write.format('delta').mode('overwrite').saveAsTable('divvy.dim_stations')

In [0]:
%sql
select * from divvy.dim_stations
limit 5

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


## Transform Trips

In [0]:
trips = spark.sql('select * from divvy.staging_trips')

In [0]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

trips = trips.withColumn('trip_duration_mins',f.unix_timestamp(f.col('ended_at'))-f.unix_timestamp(f.col('start_at'))) \
    .join(riders,on='rider_id') \
    .withColumn('date_id',f.date_format(f.col('start_at'),'YYYYMMDD')) \
    .withColumn('rider_age_ride_start',f.year(f.col('start_at'))-f.year(f.col('birthday'))) \
    .withColumn('rider_age_account_start',f.year(f.col('start_at'))-f.year(f.col('account_start_date'))) \
    .select('trip_id','rider_id','date_id','start_station_id','end_station_id','start_at','ended_at','trip_duration_mins'\
           ,'rider_age_ride_start','rider_age_account_start','rideable_type')

In [0]:
#create facts trips table with override mode
trips.write.format('delta').mode('overwrite').saveAsTable('divvy.fact_trips')

## Transform Dates: Create dim dates

In [0]:
%sql
--earliest date in trips data
select min(start_at) start_date from divvy.staging_trips

start_date
2021-02-01T01:07:04.000+0000


In [0]:
from pyspark.sql.functions import explode, sequence, to_date

beginDate = '2021-02-01'
##use next 10 years as end date
endDate = '2031-02-01'

(
  spark.sql(f"select explode(sequence(to_date('{beginDate}'), to_date('{endDate}'), interval 1 day)) as calendarDate")
    .createOrReplaceTempView('dates')
)

In [0]:
%sql

create or replace table divvy.dim_dates as
select
  year(calendarDate) * 10000 + month(calendarDate) * 100 + day(calendarDate) as date_id,
  CalendarDate date,
  year(calendarDate) AS Year,
  date_format(calendarDate, 'MMMM') as MonthName,
  month(calendarDate) as Month,
  date_format(calendarDate, 'EEEE') as WeekDayName,
  dayofweek(calendarDate) AS DayOfWeek,
  quarter(calendarDate) as Quarter
from
  dates
order by
  calendarDate
  

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from divvy.dim_dates

date_id,date,Year,MonthName,Month,WeekDayName,DayOfWeek,Quarter
20210201,2021-02-01,2021,February,2,Monday,2,1
20210202,2021-02-02,2021,February,2,Tuesday,3,1
20210203,2021-02-03,2021,February,2,Wednesday,4,1
20210204,2021-02-04,2021,February,2,Thursday,5,1
20210205,2021-02-05,2021,February,2,Friday,6,1
20210206,2021-02-06,2021,February,2,Saturday,7,1
20210207,2021-02-07,2021,February,2,Sunday,1,1
20210208,2021-02-08,2021,February,2,Monday,2,1
20210209,2021-02-09,2021,February,2,Tuesday,3,1
20210210,2021-02-10,2021,February,2,Wednesday,4,1
