In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [2]:
spark = SparkSession \
    .builder \
    .appName("Data with Nikk the Greek Spark Session") \
    .master("local[4]") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/16 22:32:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Turning off AQE as it generates more jobs which might be confusing for this scenario here. 
spark.conf.set("spark.sql.adaptive.enabled", "false")
#to not cache datafrimes... this may not create repeatable results
spark.conf.set("spark.databricks.io.cache.enabled", "false")

In [4]:

def sdf_generator(num_rows: int, num_partitions: int = None) -> "DataFrame":
    return (
        spark.range(num_rows, numPartitions=num_partitions)
        .withColumn("date", f.current_date())
        .withColumn("timestamp",f.current_timestamp())
        .withColumn("idstring", f.col("id").cast("string"))
        .withColumn("idfirst", f.col("idstring").substr(0,1))
        .withColumn("idlast", f.col("idstring").substr(-1,1))
        )

In [5]:
sdf_gen = sdf_generator(20)
sdf_gen.count()

20

In [6]:
sdf_gen.show()

+---+----------+--------------------+--------+-------+------+
| id|      date|           timestamp|idstring|idfirst|idlast|
+---+----------+--------------------+--------+-------+------+
|  0|2024-12-16|2024-12-16 22:32:...|       0|      0|     0|
|  1|2024-12-16|2024-12-16 22:32:...|       1|      1|     1|
|  2|2024-12-16|2024-12-16 22:32:...|       2|      2|     2|
|  3|2024-12-16|2024-12-16 22:32:...|       3|      3|     3|
|  4|2024-12-16|2024-12-16 22:32:...|       4|      4|     4|
|  5|2024-12-16|2024-12-16 22:32:...|       5|      5|     5|
|  6|2024-12-16|2024-12-16 22:32:...|       6|      6|     6|
|  7|2024-12-16|2024-12-16 22:32:...|       7|      7|     7|
|  8|2024-12-16|2024-12-16 22:32:...|       8|      8|     8|
|  9|2024-12-16|2024-12-16 22:32:...|       9|      9|     9|
| 10|2024-12-16|2024-12-16 22:32:...|      10|      1|     0|
| 11|2024-12-16|2024-12-16 22:32:...|      11|      1|     1|
| 12|2024-12-16|2024-12-16 22:32:...|      12|      1|     2|
| 13|202

In [7]:
def rows_per_partition(sdf: "DataFrame") -> None:
    num_rows = sdf.count()
    sdf_part = sdf.withColumn("partition_id", f.spark_partition_id())
    sdf_part_count = sdf_part.groupBy("partition_id").count()
    sdf_part_count = sdf_part_count.withColumn("count_perc", 100*f.col("count")/num_rows)
    sdf_part_count.orderBy("partition_id").show()

In [8]:

def rows_per_partition_col(sdf: "DataFrame", num_rows: int, col: str) -> None:
    sdf_part = sdf.withColumn("partition_id", f.spark_partition_id())
    sdf_part_count = sdf_part.groupBy("partition_id", col).count()
    sdf_part_count = sdf_part_count.withColumn("count_perc", 100*f.col("count")/num_rows)
    sdf_part_count.orderBy("partition_id", col).show()

In [9]:
base_dir = "base_dir/"

In [10]:
num_rows = 1000000

# 1. How many files are written based on the number of partitions

* Answer is Easy. Parquet devides the data in multiple snappy files based on the spark partitions. That's how it's able to write those files in parallel
* Here you can find details on parquet: https://learncsdesigns.medium.com/understanding-apache-parquet-d722645cfe74

In [11]:
sdf = sdf_generator(num_rows, 1)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write 1 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_1_file.parquet")
sc.setJobDescription("None")

1


                                                                                

In [12]:
sdf = sdf_generator(num_rows, 4)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write 4 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_4_file.parquet")
sc.setJobDescription("None")

4


In [13]:
sdf = sdf_generator(num_rows, 12)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write 12 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_12_file.parquet")
sc.setJobDescription("None")

12


# 3. Using coalesce and Repartition to save data

* Remember: repartitioning is an expense operation but might be helpfull for following processes.
* For going down to one partition repartion(1) can be better than coalesce(1) as all cores are used for the processing step.

In [14]:
sdf = sdf_generator(num_rows, 12)
sdf = sdf.coalesce(4)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write with Coalesce 12 to 4 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_12_to_4_coalesce_file.parquet")
sc.setJobDescription("None")

4


In [15]:
sdf = sdf_generator(num_rows, 12)
sdf = sdf.coalesce(1)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write with Coalesce 12 to 1 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_12_to_1_coalesce_file.parquet")
sc.setJobDescription("None")

1


In [16]:
sdf = sdf_generator(num_rows, 12)
sdf = sdf.repartition(4)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write with Repartition 12 to 4 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_12_to_4_repartition_file.parquet")
sc.setJobDescription("None")

4


In [17]:
sdf = sdf_generator(num_rows, 12)
sdf = sdf.repartition(1)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Write with Repartition 12 to 1 file")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/test_12_to_1_repartition_file.parquet")
sc.setJobDescription("None")

1


# 4. Empty partitions problem when writing

* Spark is smart enough to only write partitions with actual record/data in it. E.g. after filtering

In [18]:
sdf = sdf_generator(num_rows, 20)
sdf = sdf.filter(f.col("id") < 200)
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("Empty rows")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/emptyRows.parquet")
sc.setJobDescription("None")

20


In [19]:
rows_per_partition(sdf)

+------------+-----+----------+
|partition_id|count|count_perc|
+------------+-----+----------+
|           0|  200|     100.0|
+------------+-----+----------+



# 5. Repartitioning by column idfirst

In [20]:
sdf = sdf_generator(num_rows, 20)
sdf = sdf.repartition(10, "idfirst")
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("repartition 10 idfirst")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/repartition_10_idfirst.parquet")
sc.setJobDescription("None")

10


In [21]:
rows_per_partition(sdf)

+------------+------+----------+
|partition_id| count|count_perc|
+------------+------+----------+
|           3|111111|   11.1111|
|           4|111111|   11.1111|
|           5|111112|   11.1112|
|           6|222222|   22.2222|
|           8|111111|   11.1111|
|           9|333333|   33.3333|
+------------+------+----------+



In [23]:
spark.read.parquet(f"{base_dir}/repartition_10_idfirst.parquet/part-00000-c36f2adc-e01d-4f64-84b3-8f06fcfcee4c-c000.snappy.parquet").show()

                                                                                

+---+----+---------+--------+-------+------+
| id|date|timestamp|idstring|idfirst|idlast|
+---+----+---------+--------+-------+------+
+---+----+---------+--------+-------+------+



In [24]:
sdf = sdf_generator(num_rows, 20)
sdf = sdf.repartition(8, "idfirst")
print(sdf.rdd.getNumPartitions())
sc.setJobDescription("repartition 8 idfirst")
sdf.write.format("parquet").mode("overwrite").save(f"{base_dir}/repartition_8_idfirst.parquet")
sc.setJobDescription("None")

8


In [25]:

rows_per_partition(sdf)



+------------+------+----------+
|partition_id| count|count_perc|
+------------+------+----------+
|           0|111111|   11.1111|
|           1|111111|   11.1111|
|           2|222222|   22.2222|
|           3|333334|   33.3334|
|           5|111111|   11.1111|
|           6|111111|   11.1111|
+------------+------+----------+



                                                                                

24/12/17 01:22:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 932602 ms exceeds timeout 120000 ms
24/12/17 01:22:59 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/17 01:23:01 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$