## === Uber Data Pipeline ===

### create SparkSession

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("uber_data_project").getOrCreate()

### Importing all the necessary libraries

In [2]:
from pyspark.sql.functions import when, hour, day, month, year, weekday, col

### Read the flatfile data into Pyspark DataFrame 

In [3]:
df = spark.read.options(header = True, inferschema = True).csv("data/uber_data.csv")

In [4]:
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+------------------+---------------

### Drop duplicates from the main df and add `"trip_id"` column with indexing

In [5]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.drop_duplicates().withColumn('trip_id', monotonically_increasing_id())

In [6]:
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_id: long (nullable = false)

+--------+--------------------+---------------------+---------------+----------

### Create DataFrame for `"datetime_dim"` dimension table

In [7]:
datetime_dim = df.withColumn("datetime_id", monotonically_increasing_id()) \
                 .select("datetime_id",
                         df.tpep_pickup_datetime,
                         hour(df.tpep_pickup_datetime).alias('pick_hour'),
                         day(df.tpep_pickup_datetime).alias('pick_day'),
                         month(df.tpep_pickup_datetime).alias('pick_month'),
                         year(df.tpep_pickup_datetime).alias('pick_year'),
                         weekday(df.tpep_pickup_datetime).alias('pick_weekday'),
                         df.tpep_dropoff_datetime,
                         hour(df.tpep_dropoff_datetime).alias('drop_hour'),
                         day(df.tpep_dropoff_datetime).alias('drop_day'),
                         month(df.tpep_dropoff_datetime).alias('drop_month'),
                         year(df.tpep_dropoff_datetime).alias('drop_year'),
                         weekday(df.tpep_dropoff_datetime).alias('drop_weekday'),
)


In [8]:
datetime_dim.printSchema()
datetime_dim.show(5)

root
 |-- datetime_id: long (nullable = false)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- pick_hour: integer (nullable = true)
 |-- pick_day: integer (nullable = true)
 |-- pick_month: integer (nullable = true)
 |-- pick_year: integer (nullable = true)
 |-- pick_weekday: integer (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- drop_hour: integer (nullable = true)
 |-- drop_day: integer (nullable = true)
 |-- drop_month: integer (nullable = true)
 |-- drop_year: integer (nullable = true)
 |-- drop_weekday: integer (nullable = true)

+-----------+--------------------+---------+--------+----------+---------+------------+---------------------+---------+--------+----------+---------+------------+
|datetime_id|tpep_pickup_datetime|pick_hour|pick_day|pick_month|pick_year|pick_weekday|tpep_dropoff_datetime|drop_hour|drop_day|drop_month|drop_year|drop_weekday|
+-----------+--------------------+---------+--------+----------+---------+------------+--

### Create DataFrame for `"passenger_count_dim"` dimension table

In [9]:
passenger_count_dim = df.withColumn("passenger_count_id", monotonically_increasing_id()) \
                         .select("passenger_count_id", 
                                 "passenger_count")

In [10]:
passenger_count_dim.printSchema()
passenger_count_dim.show(5)

root
 |-- passenger_count_id: long (nullable = false)
 |-- passenger_count: integer (nullable = true)

+------------------+---------------+
|passenger_count_id|passenger_count|
+------------------+---------------+
|                 0|              3|
|                 1|              2|
|                 2|              1|
|                 3|              6|
|                 4|              2|
+------------------+---------------+
only showing top 5 rows



### Create DataFrame for `"trip_distance_dim"` dimension table

In [11]:
trip_distance_dim = df.withColumn("trip_distance_id", monotonically_increasing_id()) \
                        .select("trip_distance_id", 
                                "trip_distance")

In [12]:
trip_distance_dim.printSchema()
trip_distance_dim.show(5)

root
 |-- trip_distance_id: long (nullable = false)
 |-- trip_distance: double (nullable = true)

+----------------+-------------+
|trip_distance_id|trip_distance|
+----------------+-------------+
|               0|         1.29|
|               1|         2.44|
|               2|         2.03|
|               3|         1.24|
|               4|          2.1|
+----------------+-------------+
only showing top 5 rows



### Create DataFrame for `"pickup_location_dim"` dimension table

In [13]:
pickup_location_dim = df.withColumn("pickup_location_id", monotonically_increasing_id()) \
                        .select("pickup_location_id", 
                                "pickup_latitude", 
                                "pickup_longitude")


In [14]:
pickup_location_dim.printSchema()
pickup_location_dim.show(5)

root
 |-- pickup_location_id: long (nullable = false)
 |-- pickup_latitude: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)

+------------------+------------------+------------------+
|pickup_location_id|   pickup_latitude|  pickup_longitude|
+------------------+------------------+------------------+
|                 0| 40.77577972412109|-73.96041870117188|
|                 1|40.688358306884766|-73.98932647705078|
|                 2|40.730918884277344| -73.9813003540039|
|                 3| 40.78052520751953|-73.94903564453125|
|                 4|40.767520904541016|-73.95292663574217|
+------------------+------------------+------------------+
only showing top 5 rows



### Create DataFrame for `"dropoff_location_dim"` dimension table

In [15]:
dropoff_location_dim = df.withColumn("dropoff_location_id", monotonically_increasing_id()) \
                        .select("dropoff_location_id", 
                                "dropoff_latitude", 
                                "dropoff_longitude")

In [16]:
dropoff_location_dim.printSchema()
dropoff_location_dim.show(5)
dropoff_location_dim.count()

root
 |-- dropoff_location_id: long (nullable = false)
 |-- dropoff_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)

+-------------------+------------------+------------------+
|dropoff_location_id|  dropoff_latitude| dropoff_longitude|
+-------------------+------------------+------------------+
|                  0| 40.78376007080078|-73.97956085205078|
|                  1|40.707969665527344|-74.00363159179686|
|                  2|40.753116607666016|-73.97920227050781|
|                  3| 40.79473114013672|-73.94375610351561|
|                  4|40.751407623291016|-73.97410583496092|
+-------------------+------------------+------------------+
only showing top 5 rows



100000

### Create DataFrame for `"rate_code_dim"` dimension table

In [17]:
rate_code_dim = df.withColumn("rate_code_id", monotonically_increasing_id()) \
                    .withColumn("rate_code_name",
                               when(col("RatecodeID") == 1, "Standard rate") \
                               .when(col("RatecodeID") == 2, "JFK") \
                                .when(col("RatecodeID") == 3, "Nassau or Westchester") \
                                .when(col("RatecodeID") == 4, "Negotiated fare") \
                                .when(col("RatecodeID") == 5, "Group ride") \
                                .otherwise("other")
                               ) \
                    .select("rate_code_id", 
                            "RatecodeID", 
                            "rate_code_name")

In [18]:
rate_code_dim.printSchema()
rate_code_dim.show(5)

root
 |-- rate_code_id: long (nullable = false)
 |-- RatecodeID: integer (nullable = true)
 |-- rate_code_name: string (nullable = false)

+------------+----------+--------------+
|rate_code_id|RatecodeID|rate_code_name|
+------------+----------+--------------+
|           0|         1| Standard rate|
|           1|         1| Standard rate|
|           2|         1| Standard rate|
|           3|         1| Standard rate|
|           4|         1| Standard rate|
+------------+----------+--------------+
only showing top 5 rows



### Create DataFrame for `"payment_type_dim"` dimension table

In [19]:
payment_type_dim = df.withColumn("payment_type_name",
                                      when(df.payment_type == 1, "Credit card") \
                                      .when(df.payment_type == 2, "Cash") \
                                      .when(df.payment_type == 3, "No Charge") \
                                      .when(df.payment_type == 4, "Dispute") \
                                      .when(df.payment_type == 5, "Unknown") \
                                      .when(df.payment_type == 6, "Voided Trip") \
                                      .otherwise("other")) \
                        .withColumn("payment_type_id", monotonically_increasing_id()) \
                        .select("payment_type_id", 
                                "payment_type",
                                "payment_type_name")


In [20]:
payment_type_dim.show(5)
payment_type_dim.printSchema()

+---------------+------------+-----------------+
|payment_type_id|payment_type|payment_type_name|
+---------------+------------+-----------------+
|              0|           1|      Credit card|
|              1|           2|             Cash|
|              2|           1|      Credit card|
|              3|           2|             Cash|
|              4|           2|             Cash|
+---------------+------------+-----------------+
only showing top 5 rows

root
 |-- payment_type_id: long (nullable = false)
 |-- payment_type: integer (nullable = true)
 |-- payment_type_name: string (nullable = false)



### Create fact table by joining all dimension tables

In [21]:
fact_table = df.join(passenger_count_dim, df['trip_id'] == passenger_count_dim['passenger_count_id']) \
                .join(trip_distance_dim, df['trip_id'] == trip_distance_dim['trip_distance_id']) \
                .join(pickup_location_dim, df['trip_id'] == pickup_location_dim['pickup_location_id']) \
                .join(dropoff_location_dim, df['trip_id'] == dropoff_location_dim['dropoff_location_id']) \
                .join(datetime_dim, df['trip_id'] == datetime_dim['datetime_id']) \
                .join(rate_code_dim, df['trip_id'] == rate_code_dim['rate_code_id']) \
                .join(payment_type_dim, df['trip_id'] == payment_type_dim['payment_type_id']) \
                .select("trip_id", 
                        "VendorID", 
                        "datetime_id", 
                        "passenger_count_id", 
                        "trip_distance_id", 
                        "pickup_location_id", 
                        "dropoff_location_id", 
                        "rate_code_id", 
                        "store_and_fwd_flag", 
                        "payment_type_id", 
                        "fare_amount", 
                        "extra", 
                        "mta_tax", 
                        "tip_amount", 
                        "tolls_amount", 
                        "improvement_surcharge", 
                        "total_amount")

In [22]:
fact_table.printSchema()
fact_table.show(5)

root
 |-- trip_id: long (nullable = false)
 |-- VendorID: integer (nullable = true)
 |-- datetime_id: long (nullable = false)
 |-- passenger_count_id: long (nullable = false)
 |-- trip_distance_id: long (nullable = false)
 |-- pickup_location_id: long (nullable = false)
 |-- dropoff_location_id: long (nullable = false)
 |-- rate_code_id: long (nullable = false)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type_id: long (nullable = false)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

+-------+--------+-----------+------------------+----------------+------------------+-------------------+------------+------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------

In [23]:
spark.stop()