# **1. Import necessary lib** #

In [1]:
# Import pyspark, SparkSession
import pyspark
from pyspark.sql import SparkSession

# Import SparkFunction
import pyspark.sql.functions as F
from pyspark.sql.types import *

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 3, Finished, Available)

In [2]:
# Create Spark Session
spark = SparkSession.builder.appName("ETL_Uber_Analytics") \
                            .config('spark.cores.max', "16") \
                            .config("spark.executor.memory", "70g") \
                            .config("spark.driver.memory", "50g") \
                            .config("spark.memory.offHeap.enabled",True) \
                            .config("spark.memory.offHeap.size","16g") \
                            .getOrCreate()

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 4, Finished, Available)

# **2. Extract data** #

In [3]:
# Define file_path
file_path = "Files/uber_data.csv"

# Read dataset
df_uber = spark.read.format("csv") \
                    .option("header", True) \
                    .option("inferSchema", True) \
                    .load(file_path)

# Show dataframe
df_uber.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 5, Finished, Available)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875|40.765151977539055|         1|    

# **3. Transformation data** #

In [4]:
# Drop Duplicates data
df_uber.dropDuplicates()

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 6, Finished, Available)

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, pickup_longitude: double, pickup_latitude: double, RatecodeID: int, store_and_fwd_flag: string, dropoff_longitude: double, dropoff_latitude: double, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double]

In [5]:
# Create function for dim_datetime, dim_pickup, dim_dropoff
def create_dim_withidcolumn(df_base, column_from_base, column_id, name_temp_view, query):
    # Choose column from uber
    dim_create_new_df = df_base[column_from_base]

    # Drop duplicates rows
    dim_create_new_df = dim_create_new_df.dropDuplicates()

    # Create column ID with Auto Increment
    dim_create_new_df = dim_create_new_df.withColumn(column_id, 
                                                F.monotonically_increasing_id() + 1)
    
    # Create temp view
    dim_create_new_df.createOrReplaceTempView(name_temp_view)

    # Query for change order column
    dim_create_new_df = spark.sql(query)

    # Drop Temp View
    spark.catalog.dropTempView(name_temp_view)

    return dim_create_new_df

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 7, Finished, Available)

## 3.1. Create Dim_DateTime ##

In [6]:
# Define parameter for Dim_DateTime
column_from_base = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']
column_id = "DateTimeID"
name_temp_view = "dim_datetime"
query = ''' SELECT DateTimeID, tpep_pickup_datetime, tpep_dropoff_datetime
            FROM dim_datetime '''

# Use function
dim_datetime_df = create_dim_withidcolumn(df_uber, column_from_base, 
                             column_id, name_temp_view, query)

# Show Dataframe
dim_datetime_df.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 8, Finished, Available)

+----------+--------------------+---------------------+
|DateTimeID|tpep_pickup_datetime|tpep_dropoff_datetime|
+----------+--------------------+---------------------+
|         1| 2016-03-10 07:07:32|  2016-03-10 07:23:35|
|         2| 2016-03-10 07:07:56|  2016-03-10 07:22:02|
|         3| 2016-03-10 07:09:03|  2016-03-10 07:15:34|
|         4| 2016-03-10 07:09:23|  2016-03-10 07:25:00|
|         5| 2016-03-10 07:09:35|  2016-03-10 07:14:29|
+----------+--------------------+---------------------+
only showing top 5 rows



## 3.2. Create Dim_PickUp ##

In [7]:
# Define parameter for Dim_PickUp
column_from_base = ['pickup_longitude', 'pickup_latitude']
column_id = "PickUpID"
name_temp_view = "dim_pickup"
query = ''' SELECT PickUpID, pickup_longitude, pickup_latitude
            FROM dim_pickup '''

# Use function
dim_pickup_df = create_dim_withidcolumn(df_uber, column_from_base, 
                             column_id, name_temp_view, query)

# Show Dataframe
dim_pickup_df.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 9, Finished, Available)

+--------+------------------+------------------+
|PickUpID|  pickup_longitude|   pickup_latitude|
+--------+------------------+------------------+
|       1|-73.93929290771484| 40.84185028076172|
|       2|-73.97794342041014| 40.77862930297852|
|       3|-73.97701263427734|40.774707794189446|
|       4| -73.9498519897461| 40.78435897827149|
|       5|-73.94937133789062| 40.77684783935546|
+--------+------------------+------------------+
only showing top 5 rows



## 3.3. Create Dim_DropOff ##

In [8]:
# Define parameter for Dim_DropOff
column_from_base = ['dropoff_longitude', 'dropoff_latitude']
column_id = "DropOffID"
name_temp_view = "dim_dropoff"
query = ''' SELECT DropOffID, dropoff_longitude, dropoff_latitude
            FROM dim_dropoff '''

# Use function
dim_dropoff_df = create_dim_withidcolumn(df_uber, column_from_base, 
                             column_id, name_temp_view, query)

# Show Dataframe
dim_dropoff_df.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 10, Finished, Available)

+---------+------------------+------------------+
|DropOffID| dropoff_longitude|  dropoff_latitude|
+---------+------------------+------------------+
|        1|-73.97039031982422| 40.75946044921875|
|        2|-74.01337432861328| 40.70356369018555|
|        3|-73.96913146972656| 40.76084899902344|
|        4|-74.00003051757811| 40.74119567871094|
|        5|-73.96012115478516|40.777191162109375|
+---------+------------------+------------------+
only showing top 5 rows



## 3.4. Create Dim_RateCode ##

In [9]:
# Choose column
dim_ratecode_df = df_uber[['RateCodeID']]

# Drop duplicates values
dim_ratecode_df = dim_ratecode_df.dropDuplicates()

# List data with dictionary
dict_ratecode_type = {
    '1':"Standard rate",
    '2':"JFK",
    '3':"Newark",
    '4':"Nassau or Westchester",
    '5':"Negotiated fare",
    '6':"Group ride"
}
    
# Convert dict_value to a list of tuples
data_list = list(dict_ratecode_type.items())

# Define Schema
schema = StructType([
    StructField('RatecodeID', StringType(), True),
    StructField('rate_code_type', StringType(), True)
])

# Convert to DataFrame
df_dict = spark.createDataFrame(data = data_list, schema = schema)

# Change Order of Dataframe
    # Create TempView
dim_ratecode_df.createOrReplaceTempView("Dim_RateCode")
df_dict.createOrReplaceTempView("Dim_Dict")
    
    # Join two tables
dim_ratecode_df = spark.sql(''' 
                SELECT DC.RatecodeID, DD.rate_code_type
                FROM Dim_RateCode DC join Dim_Dict DD on DC.RatecodeID = DD.RatecodeID
                ''')
                    
    # Drop Temp View
spark.catalog.dropTempView("Dim_RateCode")
spark.catalog.dropTempView("Dim_Dict")

# Show Dataframe
dim_ratecode_df.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 11, Finished, Available)

+----------+--------------------+
|RatecodeID|      rate_code_type|
+----------+--------------------+
|         1|       Standard rate|
|         2|                 JFK|
|         3|              Newark|
|         4|Nassau or Westche...|
|         5|     Negotiated fare|
+----------+--------------------+
only showing top 5 rows



## 3.5. Create Dim_Payment ##

In [10]:
# Choose column
dim_payment_df = df_uber[['payment_type']]

# Drop duplicates values
dim_payment_df = dim_payment_df.dropDuplicates()

# List data with dictionary
payment_type_name = {
    "1":"Credit card",
    "2":"Cash",
    "3":"No charge",
    "4":"Dispute",
    "5":"Unknown",
    "6":"Voided trip"
}

# Convert dict_value to a list of tuples
data_list = list(payment_type_name.items())

# Define Schema
schema = StructType([
    StructField('payment_type', StringType(), True),
    StructField('payment_name', StringType(), True)
])

# Convert to DataFrame
df_dict = spark.createDataFrame(data = data_list, schema = schema)

# Change Order of Dataframe
    # Create TempView
dim_payment_df.createOrReplaceTempView("dim_payment")
df_dict.createOrReplaceTempView("dim_dict")
    
    # Join two tables
dim_payment_df = spark.sql(''' 
                SELECT dp.payment_type, dd.payment_name
                FROM dim_payment dp JOIN dim_dict dd 
                                    ON dp.payment_type = dd.payment_type
                ''')

    # Drop Temp View
spark.catalog.dropTempView("dim_payment")
spark.catalog.dropTempView("dim_dict")

# Show Dataframe
dim_payment_df.show(5)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 12, Finished, Available)

+------------+------------+
|payment_type|payment_name|
+------------+------------+
|           1| Credit card|
|           2|        Cash|
|           3|   No charge|
|           4|     Dispute|
+------------+------------+



## 3.6. Create Fact table ##

In [11]:
df_uber.count()

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 13, Finished, Available)

100000

In [12]:
# Merge column and then choose what column replace
fact_df = df_uber.join(dim_pickup_df, on = ['pickup_longitude', 'pickup_latitude'], how = "inner") \
                    .join(dim_dropoff_df, on = ['dropoff_longitude', 'dropoff_latitude'], how = "inner") \
                    .join(dim_datetime_df, on = ['tpep_pickup_datetime', 'tpep_dropoff_datetime'], how = "inner") \
                    .select(['VendorID', 'DateTimeID', 'PickUpID', 'DropOffID', 'RatecodeID',
                            'payment_type', 'passenger_count', 'store_and_fwd_flag',
                            'fare_amount', 'extra',
                            'mta_tax', 'tip_amount', 'tolls_amount',
                            'improvement_surcharge', 'total_amount'])

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 14, Finished, Available)

In [13]:
# Show Schema Fact Table
fact_df.printSchema()

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 15, Finished, Available)

root
 |-- VendorID: integer (nullable = true)
 |-- DateTimeID: long (nullable = false)
 |-- PickUpID: long (nullable = false)
 |-- DropOffID: long (nullable = false)
 |-- RatecodeID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [14]:
fact_df.count()

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 16, Finished, Available)

100000

# 4. Load to table(Lakehouse) #

In [15]:
# Create function for save as table
def save_table_lakehouse(tables_name, df_save_table, format_table = "delta"):
    # Define delta_path
    delta_path = f"Tables/{tables_name}"

    # Save as table
    df_save_table.write.format(format_table).mode("overwrite").save(delta_path)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 17, Finished, Available)

In [16]:
# Create Dimension table
save_table_lakehouse("Dim_DateTime", dim_datetime_df)
save_table_lakehouse("Dim_PickUp", dim_pickup_df)
save_table_lakehouse("Dim_DropOff", dim_dropoff_df)
save_table_lakehouse("Dim_RateCode", dim_ratecode_df)
save_table_lakehouse("Dim_Payment", dim_payment_df)

# Create Fact Table
save_table_lakehouse("Fact_Uber", fact_df)

StatementMeta(, 454e6e77-7dcd-4ae7-aa20-4b3f7758e9a6, 18, Finished, Available)