In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, datediff, minute

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()


In [2]:
from pyspark.sql.types import StructType, \
                              StructField, \
                              StringType, \
                              IntegerType, \
                              FloatType, \
                              DateType, \
                              ByteType, \
                              TimestampType
from pyspark.sql.functions import *

In [None]:
%fs ls  "/FileStore/raw_data"

path,name,size,modificationTime
dbfs:/FileStore/raw_data/payments.csv,payments.csv,57666115,1688697854000
dbfs:/FileStore/raw_data/riders.csv,riders.csv,5594949,1688697843000
dbfs:/FileStore/raw_data/stations.csv,stations.csv,49552,1688697844000
dbfs:/FileStore/raw_data/trips.csv,trips.csv,440125504,1688697903000


In [None]:
path = "/FileStore/raw_data/"

In [None]:
def write_data(data, table_name):
    """
    This function helps write delta format to the bronze store.
    """
    data.write.format("delta")\
            .mode("overwrite")\
            .save(f"/bronze_data_store/{table_name}data/")
    return f"Final save path for {table_name} is: /bronze_data_store/{table_name}data/"

"""
def write_data(data, format_type, table_name):
    '''
    This function helps write delta format to the bronze store.
    '''
    data.write.format(str(format_type))\
            .mode("overwrite")\
            .save(f"/bronze_data_store/{table_name}data/")
"""

Out[3]: '\ndef write_data(data, format_type, table_name):\n    \'\'\'\n    This function helps write delta format to the bronze store.\n    \'\'\'\n    data.write.format(str(format_type))            .mode("overwrite")            .save(f"/bronze_data_store/{table_name}data/")\n'

In [None]:
# Create a gold data store in Delta Lake tables

def read_create_gold_table(table_name):
    """
    This function reads bronze data store and 
    then writes gold level tables.
    """
    df = spark.read.format("delta")\
            .load(f"/bronze_data_store/{table_name}data/")

    # Save as table
    df.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable(f"gold_{table_name}")
    return df

In [None]:
def write_starTables(data, table_name):
    data.write.format("delta")\
        .mode("overwrite")\
        .saveAsTable(table_name)

In [8]:
schema_payment = StructType([ \
    StructField("payment_id",IntegerType(),False), \
    StructField("date",DateType(),True), \
    StructField("amount",FloatType(),True), \
    StructField("ride_id", IntegerType(), True)
  ])

paymentDf = spark.read.format("csv") \
        .option("inferSchema","false") \
        .option("header", "false") \
        .option("sep", ",") \
        .schema(schema_payment) \
        .load("../azure-data-lakehouse-projectdatafiles/Out/payments.csv")
display(paymentDf.head(5))

# Write payment df to bronze
# write_data(paymentDf, "paymentDf")

[Row(payment_id=1, date=datetime.date(2019, 5, 1), amount=9.0, ride_id=1000),
 Row(payment_id=2, date=datetime.date(2019, 6, 1), amount=9.0, ride_id=1000),
 Row(payment_id=3, date=datetime.date(2019, 7, 1), amount=9.0, ride_id=1000),
 Row(payment_id=4, date=datetime.date(2019, 8, 1), amount=9.0, ride_id=1000),
 Row(payment_id=5, date=datetime.date(2019, 9, 1), amount=9.0, ride_id=1000)]

In [None]:
schema_rider = StructType([ \
    StructField("rider_id",IntegerType(),False), \
    StructField("first",StringType(),True), \
    StructField("last",StringType(),True), \
    StructField("address",StringType(),True), \
    StructField("birthday", DateType(),True), \
    StructField("account_start_date", DateType(),True), \
    StructField("account_end_date", DateType(),True), \
    StructField("is_member", StringType(),True)
  ])

riderDf = spark.read.format("csv") \
        .option("inferSchema","false") \
        .option("header", "false") \
        .option("sep", ",") \
        .schema(schema_rider) \
        .load("../azure-data-lakehouse-projectdatafiles/Out/riders.csv")
display(riderDf.head(5))

# Write rider df to bronze
# write_data(riderDf, "riderDf")

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


Out[7]: 'Final save path for DataFrame[rider_id: int, first: string, last: string, address: string, birthday: date, account_start_date: date, account_end_date: date, is_member: string] is: /bronze_data_store/riderDfdata/'

In [None]:
schema_station = StructType([ \
    StructField("station_id",StringType(),False), \
    StructField("name",StringType(),True), \
    StructField("latitude",FloatType(),True), \
    StructField("longitude", FloatType(), True)
  ])

stationDf = spark.read.format("csv") \
        .option("inferSchema","false") \
        .option("header", "false") \
        .option("sep", ",") \
        .schema(schema_station) \
        .load("../azure-data-lakehouse-projectdatafiles/Out/stations.csv")
display(stationDf.head(5))

# Write station df to bronze
# write_data(stationDf, "stationDf")

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.01269912719727,-87.66606140136719
KA1503000012,Clark St & Lake St,41.88579559326172,-87.631103515625
637,Wood St & Chicago Ave,41.895633697509766,-87.67206573486328
13216,State St & 33rd St,41.83473205566406,-87.62582397460938
18003,Fairbanks St & Superior St,41.895809173583984,-87.62025451660156


Out[8]: 'Final save path for DataFrame[station_id: string, name: string, latitude: float, longitude: float] is: /bronze_data_store/stationDfdata/'

In [None]:
schema_trip = StructType([ \
    StructField("trip_id",StringType(),False), \
    StructField("rideable_type", StringType(),True), \
    StructField("started_at", TimestampType(),True), \
    StructField("ended_at", TimestampType(),True), \
    StructField("start_station_id",StringType(),True), \
    StructField("end_station_id",StringType(),True), \
    StructField("rider_id",IntegerType(),True)
  ])

tripsDf = spark.read.format("csv") \
        .option("inferSchema","false") \
        .option("header", "false") \
        .option("sep", ",") \
        .schema(schema_trip) \
        .load("../azure-data-lakehouse-projectdatafiles/Out/trips.csv")

display(tripsDf.head(5))

# Write trips df to bronze
# write_data(tripsDf, "tripsDf")

trip_id,rideable_type,started_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38.000+0000,2021-02-14T18:12:09.000+0000,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18.000+0000,2021-02-09T19:19:10.000+0000,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41.000+0000,2021-02-02T17:54:06.000+0000,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23.000+0000,2021-02-23T15:22:37.000+0000,13216,TA1309000055,39608


Out[9]: 'Final save path for DataFrame[trip_id: string, rideable_type: string, started_at: timestamp, ended_at: timestamp, start_station_id: string, end_station_id: string, rider_id: int] is: /bronze_data_store/tripsDfdata/'

In [None]:
# Create a gold data store in Delta Lake tables
read_create_gold_table("paymentDf")
read_create_gold_table("stationDf")
read_create_gold_table("riderDf")
read_create_gold_table("tripsDf")

Out[10]: DataFrame[trip_id: string, rideable_type: string, started_at: timestamp, ended_at: timestamp, start_station_id: string, end_station_id: string, rider_id: int]

In [None]:
#Transform the data into the star schema for a Gold data store

In [None]:
payment_table = spark.table("gold_paymentdf")
print(f"Payment table count is: {payment_table.count()}")
rider_table = spark.table("gold_riderdf")
print(f"Rider table count is: {rider_table.count()}")
station_table = spark.table("gold_stationdf")
print(f"Station table count is:{station_table.count()}")
trips_table = spark.table("gold_tripsdf")
print(f"Trips table count is: {trips_table.count()}")

Payment table count is: 1946607
Rider table count is: 75000
Station table count is:838
Trips table count is: 4584921


In [None]:
rider_df = spark.table("gold_riderdf")
print(f"Rider table count is: {rider_df.count()}")

dimRider = rider_df.withColumn('rider_key', col('rider_id'))\
        .select(['rider_id',
                'first',
                'last',
                'address',
                'account_start_date',
                'account_end_date',
                'is_member']
                )
display(dimRider.head(50))

write_starTables(dimRider, "dimRider")

Rider table count is: 75000


rider_id,first,last,address,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,2020-06-12,2021-11-01,True


In [None]:
station_df = spark.table("gold_stationdf")
# print(type(station_df))
print(f"Station table count is: {station_table.count()}")

dimStation = station_df.withColumn('station_key', col('station_id'))\
        .select("*")
display(dimStation.head(5))

write_starTables(dimStation, "dimStation")

Station table count is: 838


station_id,name,latitude,longitude,station_key
525,Glenwood Ave & Touhy Ave,42.01269912719727,-87.66606140136719,525
KA1503000012,Clark St & Lake St,41.88579559326172,-87.631103515625,KA1503000012
637,Wood St & Chicago Ave,41.895633697509766,-87.67206573486328,637
13216,State St & 33rd St,41.83473205566406,-87.62582397460938,13216
18003,Fairbanks St & Superior St,41.895809173583984,-87.62025451660156,18003


In [None]:
# Payment Fact table
payment_df = spark.table("gold_paymentdf")
print(f"Payment table count is: {payment_table.count()}")

payment_df = payment_df.withColumnRenamed("payment_key", "payment_id")

payment_facts = payment_df.select(
                        col("payment_id").alias("payment_key"),
                        col("amount").alias("amount"),
                        col("ride_id").alias("rider_key"),
                        date_format(col("date"), "yyyyMMdd").alias("date_key"))
display(payment_facts.head(15))

write_starTables(payment_facts, "payment_facts")

Payment table count is: 1946607


payment_key,amount,rider_key,date_key
1,9.0,1000,20190501
2,9.0,1000,20190601
3,9.0,1000,20190701
4,9.0,1000,20190801
5,9.0,1000,20190901
6,9.0,1000,20191001
7,9.0,1000,20191101
8,9.0,1000,20191201
9,9.0,1000,20200101
10,9.0,1000,20200201


In [None]:
# Create dimDate table
distinct_date_key = payment_df.select(date_format(col("date"), "yyyyMMdd").alias("date_key")).distinct()

# Calculate other columns and join with distinct_date_key
dimDate = payment_df.join(distinct_date_key,
                          (date_format(payment_df.date, "yyyyMMdd")) == distinct_date_key.date_key, 
                          "inner") \
    .select(
        col("date_key"), 
        col("date"), 
        year("date").alias("year"), 
        quarter("date").alias("quarter"),
        month("date").alias("month"),                
        dayofweek("date").alias("dayofweek"),
        weekofyear("date").alias("week")
    ) \
    .withColumn("is_weekend", when((dayofweek("date") == 6) | (dayofweek("date") == 7), True).otherwise(False))
dimDate.display(50)

write_starTables(dimDate, "dimDate")

date_key,date,year,quarter,month,dayofweek,week,is_weekend
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True
20130601,2013-06-01,2013,2,6,7,22,True


In [None]:
# print(payment_df.columns)
# print(rider_df.columns)
# print(trips_df.columns)

In [None]:
trips_df.columns

Out[34]: ['trip_id',
 'rideable_type',
 'started_at',
 'ended_at',
 'start_station_id',
 'end_station_id',
 'rider_id']

In [None]:
# Create trip fact table
trips_df = spark.table("gold_tripsdf")
print(f"Trips table count is: {trips_table.count()}")

trip_facts = trips_df.join(
    rider_df, 
    rider_df.rider_id == trips_df.rider_id,
    how="inner")\
    .join(
        payment_df,
        payment_df.ride_id== trips_df.rider_id,
        "inner")\
    .select(
    col("trip_id").alias("trip_key")
    , ((col("ended_at").cast("long") - col("started_at").cast("long"))/60).alias("trip_duration")
    , (year(trips_df.ended_at) - year(rider_df.birthday)).alias("rider_age_at_trip")
    , col("start_station_id").alias("station_key")
    , (rider_df.rider_id).alias("rider_key")
    , (date_format(payment_df.date, "yyyyMMdd")).alias("date_key")
    , (trips_df.rideable_type).alias("rideable_type")
    , col("started_at").alias("started_at")
    , col("ended_at").alias("ended_at")
    , (trips_df.start_station_id).alias("start_station_id")
    , (trips_df.end_station_id).alias("end_station_id")
    )
display(trip_facts.head(500))

write_starTables(trip_facts, "trip_facts")

Trips table count is: 4584921


trip_key,trip_duration,rider_age_at_trip,station_key,rider_key,date_key,rideable_type,started_at,ended_at,start_station_id,end_station_id
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20220201,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20220101,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20211201,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20211101,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20211001,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20210901,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20210801,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20210701,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20210601,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660
89E7AA6C29227EFF,6.783333333333333,38,525,71934,20210501,classic_bike,2021-02-12T16:14:56.000+0000,2021-02-12T16:21:43.000+0000,525,660


In [None]:
# display(
#     payment_df.select(
#                     F.col("date"), 
#                     year("date").alias("year"), 
#                     month("date").alias("month"),
#                     quarter("date").alias("quarter"),
#                     dayofweek("date").alias("dayofweek"),
#                     weekofyear("date").alias("week")
#                   )\
#               .withColumn("date_key", F.date_format(F.col("date"), "yyyyMMdd"))\
#               .withColumn("is_weekend", when((dayofweek("date") == 6) | (dayofweek("date") == 7), True).otherwise(False))
#         )

In [None]:
# payment_table.createOrReplaceTempView("payment_tbl")

In [None]:
# spark.sql(
#     '''
#         SELECT *
#         FROM payment_tbl 
#         LIMIT 10
#     ''').display()

In [None]:
# %sql
# SELECT * 
# FROM payment_tbl

## Business Questions

The business outcomes you are designing for:

  1.	Analyze how much time is spent per ride 
    o	Based on date and time factors such as day of week and time of day

    o	Based on which station is the starting and / or ending station

    o	Based on age of the rider at time of the ride

    o	Based on whether the rider is a member or a casual rider

  2.	Analyze how much money is spent 
    o	Per month, quarter, year

    o	Per member, based on the age of the rider at account start

  3.	EXTRA CREDIT - Analyze how much money is spent per member 
    o	Based on how many rides the rider averages per month
    
    o	Based on how many minutes the rider spends on a bike per month

In [None]:
trip_facts.columns

Out[45]: ['trip_key',
 'trip_duration',
 'rider_age_at_trip',
 'station_key',
 'rider_key',
 'date_key',
 'rideable_type',
 'started_at',
 'ended_at',
 'start_station_id',
 'end_station_id']

In [None]:
display(
    trip_facts.select("started_at").limit(5)
)

started_at
2021-02-12T16:14:56.000+0000
2021-02-12T16:14:56.000+0000
2021-02-12T16:14:56.000+0000
2021-02-12T16:14:56.000+0000
2021-02-12T16:14:56.000+0000


In [None]:
%sql

SELECT
    dt.dayofweek AS day_of_week,
    CASE 
        -- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) > 5 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 12 THEN 'morning'
        WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) > 5 AND EXTRACT(hour FROM t.started_at) < 12 THEN 'morning'
        -- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 12 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 17 THEN 'afternoon'
        WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 12 AND EXTRACT(hour FROM started_at) < 17 THEN 'afternoon'
			  -- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 17 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 21 THEN 'evening'
        WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 17 AND EXTRACT(hour FROM started_at) < 21 THEN 'evening'
			  -- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 21 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 4 THEN 'morning'
        WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 21 AND EXTRACT(hour FROM started_at) < 4 THEN 'morning'
        ELSE 'NA'
    END AS time_of_day
FROM trip_facts t
INNER JOIN dimDate dt ON t.date_key = dt.date_key;

day_of_week,time_of_day
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon
3,afternoon


In [None]:
%sql

SELECT
		dt.dayofweek as day_of_week
    , CASE 
				WHEN date_part(hour, t.started_at) > 5 AND date_part(hour, t.ended_at) < 12 THEN 'morning'
			ELSE 'NA'
		END AS time_of_day
	FROM trip_facts t
  		INNER JOIN dimDate dt 
			ON t.date_key = dt.date_key

day_of_week
3
3
3
3
3
3
3
3
3
3


In [None]:
%sql


/*
The business outcomes you are designing for are as follows:
    Analyze how much time is spent per ride
        Based on date and time factors such as day of week and time of day
        Based on which station is the starting and / or ending station
        Based on age of the rider at time of the ride
        Based on whether the rider is a member or a casual rider
*/

-- Analyze how much time is spent per ride,
--  Based on date and time factors such as day of week and time of day

WITH cte as 
(
	SELECT
			dt.dayofweek AS day_of_week,
			CASE 
					-- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) > 5 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 12 THEN 'morning'
					WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) > 5 AND EXTRACT(hour FROM t.started_at) < 12 THEN 'morning'
					-- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 12 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 17 THEN 'afternoon'
					WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 12 AND EXTRACT(hour FROM started_at) < 17 THEN 'afternoon'
					-- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 17 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 21 THEN 'evening'
					WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 17 AND EXTRACT(hour FROM started_at) < 21 THEN 'evening'
					-- WHEN DATEPART(HOUR, CONVERT(TIME, t.started_at)) >= 21 AND DATEPART(HOUR, CONVERT(TIME, t.started_at)) < 4 THEN 'morning'
					WHEN dt.dayofweek BETWEEN 1 AND 5 AND EXTRACT(hour FROM t.started_at) >= 21 AND EXTRACT(hour FROM started_at) < 4 THEN 'morning'
					ELSE 'NA'
			END AS time_of_day
			, DATEDIFF(MINUTE, started_at, ended_at) AS ride_time
	FROM trip_facts t
	INNER JOIN dimDate dt ON t.date_key = dt.date_key
)

SELECT
	day_of_week,
    time_of_day,
    ride_time AS time_per_ride
    --, COUNT(ride_time) as cnt_time_per_ride
FROM cte
GROUP BY day_of_week, time_of_day, ride_time;
