In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofmonth, month, year, dayofweek, lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, when

In [7]:
spark = SparkSession.builder \
    .appName("Uber Data Analytics") \
    .getOrCreate()

In [9]:
df = spark.read.csv(r"D:\Uber Data Analytics\data\uber_data.csv", header=True, inferSchema=True)

In [10]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875|40.765151977539055|         1|    

In [11]:
# Convert datetime columns
df = df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp")) \
       .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

In [12]:
# Display the first 5 rows of the dataframe after conversion
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875|40.765151977539055|         1|    

In [13]:
# Remove duplicates
df = df.dropDuplicates()

In [14]:
# Create window specification
windowSpec = Window.orderBy(lit(1))

In [15]:
# Create datetime dimension
datetime_dim = df.select(
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    hour(col("tpep_pickup_datetime")).alias("pick_hour"),
    dayofmonth(col("tpep_pickup_datetime")).alias("pick_day"),
    month(col("tpep_pickup_datetime")).alias("pick_month"),
    year(col("tpep_pickup_datetime")).alias("pick_year"),
    dayofweek(col("tpep_pickup_datetime")).alias("pick_weekday"),
    hour(col("tpep_dropoff_datetime")).alias("drop_hour"),
    dayofmonth(col("tpep_dropoff_datetime")).alias("drop_day"),
    month(col("tpep_dropoff_datetime")).alias("drop_month"),
    year(col("tpep_dropoff_datetime")).alias("drop_year"),
    dayofweek(col("tpep_dropoff_datetime")).alias("drop_weekday")
).withColumn("datetime_id", row_number().over(windowSpec))

# Display the first 5 rows of datetime dimension
datetime_dim.show(5)

+--------------------+---------------------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|pick_hour|pick_day|pick_month|pick_year|pick_weekday|drop_hour|drop_day|drop_month|drop_year|drop_weekday|datetime_id|
+--------------------+---------------------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+-----------+
| 2016-03-10 07:07:59|  2016-03-10 07:15:17|        7|      10|         3|     2016|           5|        7|      10|         3|     2016|           5|          1|
| 2016-03-10 07:08:05|  2016-03-10 07:22:06|        7|      10|         3|     2016|           5|        7|      10|         3|     2016|           5|          2|
| 2016-03-10 07:09:37|  2016-03-10 07:20:53|        7|      10|         3|     2016|           5|        7|      10|         3|     2016|           5|          3|
| 2016-03-10 07:14:12|

In [16]:
# Create dimensions with IDs
passenger_count_dim = df.select("passenger_count").distinct() \
    .withColumn("passenger_count_id", row_number().over(windowSpec))

In [17]:
# Create trip distance dimension
trip_distance_dim = df.select("trip_distance").distinct() \
    .withColumn("trip_distance_id", row_number().over(windowSpec))

In [18]:
# Display the first 5 rows of passenger count dimension
passenger_count_dim.show(5)

+---------------+------------------+
|passenger_count|passenger_count_id|
+---------------+------------------+
|              1|                 1|
|              6|                 2|
|              3|                 3|
|              5|                 4|
|              4|                 5|
+---------------+------------------+
only showing top 5 rows



In [19]:
# Display the first 5 rows of trip distance dimension
trip_distance_dim.show(5)

+-------------+----------------+
|trip_distance|trip_distance_id|
+-------------+----------------+
|        19.98|               1|
|        17.56|               2|
|         2.86|               3|
|         0.66|               4|
|        10.65|               5|
+-------------+----------------+
only showing top 5 rows



In [20]:
# Rate code dimension with human-readable names
rate_code_type = {
    1: "Standard rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride"
}

In [21]:
rate_code_dim = df.select("RatecodeID").distinct() \
    .withColumn("rate_code_id", row_number().over(windowSpec)) \
    .withColumn("rate_code_name", when(col("RatecodeID").isin(list(rate_code_type.keys())),
                                       when(col("RatecodeID") == 1, "Standard rate")
                                       .when(col("RatecodeID") == 2, "JFK")
                                       .when(col("RatecodeID") == 3, "Newark")
                                       .when(col("RatecodeID") == 4, "Nassau or Westchester")
                                       .when(col("RatecodeID") == 5, "Negotiated fare")
                                       .when(col("RatecodeID") == 6, "Group ride")
                                       .otherwise("Unknown")
                                      ))

# Display the first 5 rows of rate code dimension
rate_code_dim.show(5)

+----------+------------+--------------------+
|RatecodeID|rate_code_id|      rate_code_name|
+----------+------------+--------------------+
|         1|           1|       Standard rate|
|         3|           2|              Newark|
|         5|           3|     Negotiated fare|
|         4|           4|Nassau or Westche...|
|         2|           5|                 JFK|
+----------+------------+--------------------+
only showing top 5 rows



In [22]:
# Create pickup location dimension
pickup_location_dim = df.select("pickup_longitude", "pickup_latitude").distinct() \
    .withColumn("pickup_location_id", row_number().over(windowSpec))

In [23]:
# Create dropoff location dimension
dropoff_location_dim = df.select("dropoff_longitude", "dropoff_latitude").distinct() \
    .withColumn("dropoff_location_id", row_number().over(windowSpec))

In [24]:
# Payment type dimension with human-readable names
payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

In [25]:
payment_type_dim = df.select("payment_type").distinct() \
    .withColumn("payment_type_id", row_number().over(windowSpec)) \
    .withColumn("payment_type_name", when(col("payment_type").isin(list(payment_type_name.keys())),
                                         when(col("payment_type") == 1, "Credit card")
                                         .when(col("payment_type") == 2, "Cash")
                                         .when(col("payment_type") == 3, "No charge")
                                         .when(col("payment_type") == 4, "Dispute")
                                         .when(col("payment_type") == 5, "Unknown")
                                         .when(col("payment_type") == 6, "Voided trip")
                                         .otherwise("Unknown")
                                        ))

In [26]:
# Display the first 5 rows of payment type dimension
payment_type_dim.show(5)

+------------+---------------+-----------------+
|payment_type|payment_type_id|payment_type_name|
+------------+---------------+-----------------+
|           1|              1|      Credit card|
|           3|              2|        No charge|
|           4|              3|          Dispute|
|           2|              4|             Cash|
+------------+---------------+-----------------+



In [27]:
# Create a unique trip_id column for each trip
df = df.withColumn("trip_id", row_number().over(windowSpec))

In [28]:
# Create fact table by joining dimensions
fact_table = df.join(passenger_count_dim, "passenger_count") \
               .join(trip_distance_dim, "trip_distance") \
               .join(rate_code_dim, "RatecodeID") \
               .join(pickup_location_dim, ["pickup_longitude", "pickup_latitude"]) \
               .join(dropoff_location_dim, ["dropoff_longitude", "dropoff_latitude"]) \
               .join(datetime_dim, ["tpep_pickup_datetime", "tpep_dropoff_datetime"]) \
               .join(payment_type_dim, "payment_type") \
               .select(
                   "trip_id", "VendorID", "datetime_id", "passenger_count_id",
                   "trip_distance_id", "rate_code_id", "store_and_fwd_flag",
                   "pickup_location_id", "dropoff_location_id", "payment_type_id",
                   "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount",
                   "improvement_surcharge", "total_amount"
               )

# Show the fact table
fact_table.show()

+-------+--------+-----------+------------------+----------------+------------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|datetime_id|passenger_count_id|trip_distance_id|rate_code_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+-----------+------------------+----------------+------------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|      1|       2|          1|                 3|            1473|           1|                 N|             22300|              20073|              1|        7.5|  0.0|    0.5|       1.0|         0.0|                  0.3|         9.3|
|      2|       2|          2|              