# Spark session creation

In this tutorial, we will show how to create a spark session with the right configuration.


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = (
    SparkSession.builder
    .appName("LocalMode_memo_config")
    .master("local[*]")
    # enable AQE
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    # give a partition size advice.
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
    # set AQE partition range
    .config("spark.sql.adaptive.maxNumPostShufflePartitions", "100")
    .config("spark.sql.adaptive.minNumPostShufflePartitions", "1")
    # to support old date like 1900-01-01
    .config("spark.sql.legacy.useLegacyDateTimestamp", "true")
    # increase worker timeout
    .config("spark.network.timeout", "800s")
    .config("spark.executor.heartbeatInterval", "60s")
    .config("spark.sql.sources.commitProtocolClass",
            "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
    # JVM memory allocation
    .config("spark.driver.memory", "8g")  # Half of RAM for driver
    .config("spark.driver.maxResultSize", "4g")  # Avoid OOM on collect()
    # Shuffle & partition tuning
    .config("spark.sql.files.maxPartitionBytes", "128m")  # Avoid large partitions in memory
    .config("spark.reducer.maxSizeInFlight", "48m")  # Limit shuffle buffer
    # Unified memory management
    .config("spark.memory.fraction", "0.7")  # Reduce pressure on execution memory
    .config("spark.memory.storageFraction", "0.3")  # Smaller cache area
    # Spill to disk early instead of crashing
    .config("spark.shuffle.spill", "true")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.compress", "true")
    # optimize jvm GC
    .config("spark.driver.extraJavaOptions",
            "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:+HeapDumpOnOutOfMemoryError")
    # Use Kryo serializer
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    # Optional: buffer size for serialization
    .config("spark.kryoserializer.buffer", "64m")
    .config("spark.kryoserializer.buffer.max", "512m")
    .getOrCreate()
)



In [3]:
data_folder_path = "C:/Users/pliu/Documents/data_set/sas_vs_parquet"

In [4]:
parquet_file1= f"{data_folder_path}/nyc_taxi_1GB_parquet.parquet"

In [5]:
df1 = spark.read.parquet(parquet_file1)

In [6]:
print(f"partition number is : {df1.rdd.getNumPartitions()}")

partition number is : 18


In [6]:
df1.show(5)

+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|vendor_id|          pickup_at|         dropoff_at|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code_id|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+-------------------+-------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|      VTS|2009-01-04 03:52:00|2009-01-04 04:02:00|              1|            2|             -73|             40|        NULL|              NULL|              -73|              40|        CASH|          8|    0|   NULL|      

In [8]:
from pyspark.sql.functions import count

different_passenger_count = df1.groupBy("passenger_count").agg(count("passenger_count").alias("count_of_diff_passengers"))

In [9]:
different_passenger_count.show(5)

+---------------+------------------------+
|passenger_count|count_of_diff_passengers|
+---------------+------------------------+
|              0|                     377|
|              6|                   36200|
|              5|                  719399|
|              1|                 5359229|
|              3|                  353516|
+---------------+------------------------+
only showing top 5 rows

