In [115]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.dynamicAllocation.enabled", "false")\
    .config("spark.sql.adaptive.enabled", "false")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .appName("NYC Taxi ETL2") \
    .master("local[4]")\
    .getOrCreate()

In [116]:
spark

In [66]:
# spark.stop()

In [120]:
# Read Taxi Zones data
taxi_data = (
                  spark
                    .read 
                    .option("header",True)
                    .option("inferschema",True)
                    .csv("file:/home/jovyan/work/advanced_big_data/projects/ETL/taxiData2.csv/")
              )

# Check number of partitions
print("Partitions = "    + str( taxi_data.rdd.getNumPartitions() ))

# Check number of records
print("Record Count = "  + str( taxi_data.count() ))

Partitions = 4
Record Count = 99999


In [121]:
taxi_data.columns

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'pickUp_date_MilliS',
 'dropOff_date_MilliS',
 'duration',
 'duration_ByMin',
 'pickup_point',
 'dropoff_point']

In [122]:
taxi_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- pickUp_date_MilliS: long (nullable = true)
 |-- dropOff_date_MilliS: long (nullable = true)
 |-- duration: integer (nullable = true)
 |-- duration_ByMin: double (nullable = true)
 |-- pickup_point: string (nullable = true)
 |-- dropoff_point: string (nullable = true)



In [123]:
taxi_data.count()

99999

In [124]:
from pyspark.sql.functions import spark_partition_id

In [125]:
taxi_data.groupBy(spark_partition_id()).count().orderBy("spark_partition_id()",ascending=True).show()

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|25259|
|                   1|25267|
|                   2|25286|
|                   3|24187|
+--------------------+-----+



In [73]:
# from pyspark import StorageLevel

In [74]:
# taxi_data.persist(StorageLevel.MEMORY_AND_DISK)

In [127]:
geoBorough_data=spark.read\
.option("header",True)\
.option("inferschema",True)\
.csv("file:/home/jovyan/work/advanced_big_data/projects/ETL/geoBoroughData2.csv/")

In [128]:
geoBorough_data.rdd.getNumPartitions()

3

In [129]:
geoBorough_data.groupBy(spark_partition_id()).count().orderBy("spark_partition_id()",ascending=True).show()

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|   41|
|                   1|   34|
|                   2|   29|
+--------------------+-----+



In [130]:
from shapely.geometry import Point, Polygon
from shapely.wkt import loads
from pyspark.sql.types import StringType
from pyspark.sql.functions import broadcast

In [131]:
joined_df = taxi_data.crossJoin(broadcast(geoBorough_data))

In [132]:
joined_df.rdd.getNumPartitions()

4

In [133]:
joined_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- pickUp_date_MilliS: long (nullable = true)
 |-- dropOff_date_MilliS: long (nullable = true)
 |-- duration: integer (nullable = true)
 |-- duration_ByMin: double (nullable = true)
 |-- pickup_point: string (nullable = true)
 |-- dropoff_point: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- boroughCode: integer (nullable = true)
 |-- borough_area_code: integer (nu

In [134]:
joined_df.count()

10399896

In [135]:
from pyspark.sql.functions import count, sum, avg, stddev_pop,max, min,asc,desc,col,unix_millis,round, explode, udf

In [136]:
def point_in_polygon(wkt_point, polygon_wkt, borough):
    point = loads(wkt_point)  # Convert WKT string to a Point object
    polygon = loads(polygon_wkt)  # Convert WKT string to a Polygon object
    return borough if polygon.contains(point) else None

# Register the updated UDF in PySpark
contains_borough_udf = udf(point_in_polygon, StringType())


In [137]:
joined_df = joined_df.withColumn(
    "pickup_borough",
    contains_borough_udf(col("pickup_point"), col("polygon"), col("borough"))
).withColumn(
    "dropoff_borough",
    contains_borough_udf(col("dropoff_point"), col("polygon"), col("borough"))
)

In [138]:
joined_df.columns

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'pickUp_date_MilliS',
 'dropOff_date_MilliS',
 'duration',
 'duration_ByMin',
 'pickup_point',
 'dropoff_point',
 'borough',
 'boroughCode',
 'borough_area_code',
 'polygon',
 'pickup_borough',
 'dropoff_borough']

In [139]:
joined_df.select([
 'borough_area_code',
 'polygon',
 'borough',
 'pickup_borough',
 'dropoff_borough'
    
]
    
).show(2)

+-----------------+--------------------+-------+--------------+---------------+
|borough_area_code|             polygon|borough|pickup_borough|dropoff_borough|
+-----------------+--------------------+-------+--------------+---------------+
|                5|POLYGON ((-73.836...| Queens|          NULL|           NULL|
|                5|POLYGON ((-73.813...| Queens|          NULL|           NULL|
+-----------------+--------------------+-------+--------------+---------------+
only showing top 2 rows



In [140]:
filtered_df = joined_df.filter(col("pickup_borough").isNotNull() | col("dropoff_borough").isNotNull())


In [141]:
filtered_df.filter(col("borough")=="Queens").select(filtered_df.columns[-5:-1]).show(2)

+-----------+-----------------+--------------------+--------------+
|boroughCode|borough_area_code|             polygon|pickup_borough|
+-----------+-----------------+--------------------+--------------+
|          4|                5|POLYGON ((-73.891...|        Queens|
|          4|                5|POLYGON ((-73.891...|        Queens|
+-----------+-----------------+--------------------+--------------+
only showing top 2 rows



In [142]:
filtered_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- pickUp_date_MilliS: long (nullable = true)
 |-- dropOff_date_MilliS: long (nullable = true)
 |-- duration: integer (nullable = true)
 |-- duration_ByMin: double (nullable = true)
 |-- pickup_point: string (nullable = true)
 |-- dropoff_point: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- boroughCode: integer (nullable = true)
 |-- borough_area_code: integer (nu

In [143]:
filtered_df.columns

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'pickUp_date_MilliS',
 'dropOff_date_MilliS',
 'duration',
 'duration_ByMin',
 'pickup_point',
 'dropoff_point',
 'borough',
 'boroughCode',
 'borough_area_code',
 'polygon',
 'pickup_borough',
 'dropoff_borough']

In [144]:
filtered_df=filtered_df.select(
    [
    'medallion',
 'rate_code',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickUp_date_MilliS',
 'dropOff_date_MilliS',
 'duration',
 'duration_ByMin',
 'borough_area_code',
 'borough',
 'pickup_borough',
 'dropoff_borough'
    ]
)

In [145]:
filtered_df.write\
.option("schema",filtered_df.schema)\
.option("header",True)\
.csv("file:/home/jovyan/work/advanced_big_data/projects/ETL/marged_taxiData2.csv")

In [146]:
spark.stop()