In [86]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Open Parquet File - TLC NYC Dataset") \
    .getOrCreate()


In [87]:
pip install pyspark[hadoop]

Note: you may need to restart the kernel to use updated packages.


In [88]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Connect S3") \
    .config("spark.hadoop.fs.s3a.access.key", "Your Access Key") \
    .config("spark.hadoop.fs.s3a.secret.key", "Your Secret Key") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .getOrCreate()


In [89]:
s3_path = "s3a://yourbucketname/yourdataset"
df1 = spark.read.parquet(s3_path)

# Show the DataFrame
df1.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [43]:
# Handle missing values
df_cleaned = df1.na.drop()


In [44]:
from pyspark.sql.functions import col

df1 = df1.withColumn("passenger_count", col("passenger_count").cast("double"))


In [45]:
# Replace non-numeric values with null before casting
# For example, if you have unexpected non-numeric values, you might filter them out:
df1 = df1.filter(col("passenger_count").rlike("^-?\\d+(\\.\\d+)?$"))  # regex for numeric
df1 = df1.withColumn("passenger_count", col("passenger_count").cast("double"))


In [84]:
df_cleaned = df_cleaned.filter(
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("tpep_pickup_datetime").isNotNull()) &
    (col("tpep_dropoff_datetime").isNotNull())
)


In [47]:
from pyspark.sql.functions import udf, hour, dayofweek, to_timestamp

# Convert timestamps
df_transformed = df_cleaned.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
                           .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))


In [48]:
df_transformed.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|            1.0|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [49]:
#Extract new features
df_transformed = df_transformed.withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
                               .withColumn("day_of_week", dayofweek(col("tpep_pickup_datetime"))) \
                               .withColumn("trip_duration", (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long"))/60)


In [38]:
df_transformed.show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|hour_of_day|day_of_week|     trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|     

In [50]:
# Geospatial transformations (assuming a UDF to map coordinates to neighborhoods)
df_transformed = df_transformed.withColumn("pickup_neighborhood", udf(col("pickup_latitude"), col("pickup_longitude"))) \
                               .withColumn("dropoff_neighborhood", udf(col("dropoff_latitude"), col("dropoff_longitude")))


PySparkTypeError: [NOT_CALLABLE] Argument `func` should be a callable, got Column.

In [64]:
# Average trip duration per hour
avg_duration_per_hour = df_transformed.groupBy("hour_of_day").mean("trip_duration").sort('hour_of_day')
avg_duration_per_hour.show()
# Total trips per day
trips_per_day = df_transformed.groupBy("day_of_week").count().sort('day_of_week')
trips_per_day.show()

+-----------+------------------+
|hour_of_day|avg(trip_duration)|
+-----------+------------------+
|          0|14.931092739377048|
|          1|13.983606421176544|
|          2|13.339922194922297|
|          3|13.519846329511443|
|          4|15.716580553656032|
|          5| 17.00300467719035|
|          6|16.363315948622635|
|          7| 15.49126086473263|
|          8|15.619396414837336|
|          9|15.742807517036816|
|         10| 15.95533647430144|
|         11| 15.77710048461487|
|         12|15.667867756713733|
|         13| 16.41696107437071|
|         14|17.319411049938676|
|         15| 17.57105709587764|
|         16| 17.72594070158531|
|         17|16.755265589551456|
|         18|14.989222009150991|
|         19| 14.47545112513305|
+-----------+------------------+
only showing top 20 rows

+-----------+------+
|day_of_week| count|
+-----------+------+
|          1|310479|
|          2|377094|
|          3|430713|
|          4|463891|
|          5|401889|
|          6|3

In [85]:
df_transformed.show()
df1.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|hour_of_day|day_of_week|     trip_duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+-----------+------------------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|     