In [10]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [None]:
# Disable AQE and adjust session parameters

In [11]:
conf = SparkConf() \
        .set("spark.driver.memory", "4G") \
        .set("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .set("spark.sql.shuffle.partitions", "201") \
        .set("spark.sql.adaptive.enabled", "false")

spark = SparkSession\
        .builder\
        .master("local[8]")\
        .config(conf=conf)\
        .appName("Skewness") \
        .getOrCreate()

24/03/19 22:44:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
# manually adds skew to the trip data - replace some of the pickup locations to arbitrary number (237)

In [17]:
import pyspark.sql.functions as F


def prepare_trips_data():
    pickupChange = [
        236, 132, 161, 186, 142, 141, 48, 239, 170, 162, 230, 163, 79, 234, 263, 140, 238, 107, 68, 138, 229, 249,
        237, 164, 90, 43, 100, 246, 231, 262, 113, 233, 143, 137, 114, 264, 148, 151
    ]

    res = spark.read\
        .parquet("/Users/shaytavor/Dropbox/Public/Knowledge Base/Spark/Data/TripData/*.parquet")\
        .withColumn(
            "PULocationID",
            F.when(F.col("PULocationID").isin(pickupChange), F.lit(237))
            .otherwise(F.col("PULocationID"))
        )
    return res

In [None]:
# join the trip data with the zip codes to get the location of each trip.

In [13]:
def join_skewed_data():
    tripsDF = prepare_trips_data()
    locationDF = spark.read.csv("/Users/shaytavor/Dropbox/Public/Knowledge Base/Spark/Data/TripData/zone_lookup.csv", header = 'True', inferSchema = 'True')

    tripsLocationDF = tripsDF\
        .join(locationDF, tripsDF.PULocationID == locationDF.LocationID)

    tripsLocationDF \
        .groupBy("Zone") \
        .agg(F.avg("trip_distance").alias("avg_trip_distance")) \
        .sort(F.col("avg_trip_distance").desc()) \
        .show(truncate=False, n=1000)

    tripsLocationDF \
        .groupBy("Borough") \
        .agg(F.avg("trip_distance").alias("avg_trip_distance")) \
        .sort(F.col("avg_trip_distance").desc()) \
        .show(truncate=False, n=1000)

In [None]:
# run the join - view SparkUI to detect the skewness

In [20]:
import time
startTime = time.time()
join_skewed_data()
print(f"Elapsed_time: {(time.time() - startTime)} seconds")


24/03/19 23:23:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/19 23:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/19 23:23:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+---------------------------------------------+------------------+
|Zone                                         |avg_trip_distance |
+---------------------------------------------+------------------+
|Morrisania/Melrose                           |936.171469594594  |
|Westchester Village/Unionport                |726.9454487179488 |
|Crotona Park East                            |534.0415021459228 |
|Corona                                       |392.7733024691358 |
|Laurelton                                    |254.4450787401576 |
|Prospect Park                                |220.04481675392645|
|Starrett City                                |207.7077204301075 |
|Borough Park                                 |190.78443686006852|
|Brooklyn Heights                             |188.71234232486032|
|Carroll Gardens                              |185.954820303816  |
|Mount Hope                                   |159.89288188976388|
|East Tremont                                 |133.24979750778

24/03/19 23:23:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/19 23:23:53 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/19 23:23:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-------------+------------------+
|Borough      |avg_trip_distance |
+-------------+------------------+
|Bronx        |60.56271240903504 |
|Brooklyn     |54.57467611422346 |
|Queens       |15.646755301422479|
|Staten Island|11.551419472247497|
|Unknown      |7.317219626716715 |
|Manhattan    |5.401644745476663 |
|EWR          |0.8880582232893172|
+-------------+------------------+

Elapsed_time: 17.378972053527832 seconds


                                                                                

In [None]:
# Dividing large partitions using derived column
# This function adds a column to the trips dataset, based on the timestamp, and explodes each row in the zip dataset.

In [24]:
def join_with_subsplit():
    partitions = 20
    tripsDF = prepare_trips_data()\
        .withColumn("day_mod", F.dayofyear(F.col("tpep_pickup_datetime"))).withColumn("day_mod", F.col("day_mod") % partitions)

    locationDf = spark\
        .read\
        .csv("/Users/shaytavor/Dropbox/Public/Knowledge Base/Spark/Data/TripData/zone_lookup.csv", header = 'True', inferSchema = 'True')\
        .withColumn("location_id_alt", F.array([F.lit(num) for num in range(0,partitions)])) \
        .withColumn("location_id_alt", F.explode(F.col("location_id_alt")))

    tripsLocationDF = tripsDF\
        .join(
            locationDf,
            (F.col("PULocationID") == F.col("LocationID")) & (F.col("location_id_alt") == F.col("day_mod"))
        )

    tripsLocationDF \
        .groupBy("Zone") \
        .agg(F.avg("trip_distance").alias("avg_trip_distance")) \
        .sort(F.col("avg_trip_distance").desc()) \
        .show(truncate=False, n=1000)

    tripsLocationDF \
        .groupBy("Borough") \
        .agg(F.avg("trip_distance").alias("avg_trip_distance")) \
        .sort(F.col("avg_trip_distance").desc()) \
        .show(truncate=False, n=1000)

In [25]:
startTime = time.time()
join_with_subsplit()
print(f"Elapsed_time: {(time.time() - startTime)} seconds")

                                                                                

+---------------------------------------------+------------------+
|Zone                                         |avg_trip_distance |
+---------------------------------------------+------------------+
|Morrisania/Melrose                           |936.171469594595  |
|Westchester Village/Unionport                |726.9454487179488 |
|Crotona Park East                            |534.0415021459228 |
|Corona                                       |392.7733024691358 |
|Laurelton                                    |254.44507874015744|
|Prospect Park                                |220.04481675392668|
|Starrett City                                |207.7077204301075 |
|Borough Park                                 |190.7844368600683 |
|Brooklyn Heights                             |188.7123423248593 |
|Carroll Gardens                              |185.9548203038162 |
|Mount Hope                                   |159.8928818897638 |
|East Tremont                                 |133.24979750778



+-------------+------------------+
|Borough      |avg_trip_distance |
+-------------+------------------+
|Bronx        |60.562712409035065|
|Brooklyn     |54.57467611422333 |
|Queens       |15.646755301422477|
|Staten Island|11.551419472247497|
|Unknown      |7.317219626716755 |
|Manhattan    |5.401644745475775 |
|EWR          |0.8880582232893158|
+-------------+------------------+

Elapsed_time: 17.945346117019653 seconds


                                                                                

In [None]:
# Using AQE

In [29]:
conf = SparkConf() \
        .set("spark.driver.memory", "4G") \
        .set("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .set("spark.sql.shuffle.partitions", "201") \
        .set("spark.sql.adaptive.enabled", "true") \
        .set("spark.sql.adaptive.coalescePartitions.enabled", "false") \
        .set("spark.sql.adaptive.skewJoin.enabled", "true") \
        .set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3") \
        .set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256K")

spark = SparkSession\
        .builder\
        .master("local[8]")\
        .config(conf=conf)\
        .appName("Skewness") \
        .getOrCreate()

In [30]:
startTime = time.time()
join_skewed_data()
print(f"Elapsed_time: {(time.time() - startTime)} seconds")

24/03/20 00:02:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/20 00:02:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+---------------------------------------------+------------------+
|Zone                                         |avg_trip_distance |
+---------------------------------------------+------------------+
|Morrisania/Melrose                           |936.171469594594  |
|Westchester Village/Unionport                |726.9454487179488 |
|Crotona Park East                            |534.0415021459228 |
|Corona                                       |392.7733024691358 |
|Laurelton                                    |254.4450787401576 |
|Prospect Park                                |220.04481675392645|
|Starrett City                                |207.7077204301075 |
|Borough Park                                 |190.78443686006852|
|Brooklyn Heights                             |188.71234232486032|
|Carroll Gardens                              |185.954820303816  |
|Mount Hope                                   |159.89288188976388|
|East Tremont                                 |133.24979750778

24/03/20 00:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/03/20 00:02:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-------------+------------------+
|Borough      |avg_trip_distance |
+-------------+------------------+
|Bronx        |60.56271240903504 |
|Brooklyn     |54.57467611422346 |
|Queens       |15.646755301422479|
|Staten Island|11.551419472247497|
|Unknown      |7.317219626716715 |
|Manhattan    |5.401644745476554 |
|EWR          |0.8880582232893172|
+-------------+------------------+

Elapsed_time: 15.636811017990112 seconds


                                                                                