# Deeper Dive Into Optimization Techniques

## Option 1 : Avoid Shuffling and Minimize Data Movement
 - 1.1 : Using Broadcast Joins Instead of Shuffling for Joins
 - 1.2 : Using Bucketing Instead of Shuffling for Joins

## Option 2 : Optimize Data Partitioning
 - 2.1 : Repartition for Parallelism (Be careful too many partitions can increase task scheduling overhead. Also, dataset with 10000 rows is not a large dataset. 10M - 1B is a large dataset
 - 2.2 : Reduce Partitions for Writing - if you write data as a single file, reduce the number of partitions

## Option 3 : Use Efficient Data Formats
 - 3.1 : Use Parquet Over CSV
   
 `df.write.parquet('output.parquet')`

 * Columnar Storage - Parquet stores data column-wise resulting to faster queries
 * Efficient compression - Parquet compresses data efficiently using advanced compression techniques like Snappy, Gzip, LZ4
## Option 4 : Optimize Spark Execution Plan

`df.join(df2, 'id').explain(True)`

* Look for expensive "Shuffle Exchange" operations

## Option 5 : Use Caching and Persistence Wisely

`df.cache()`

* Cache the data - if you reuse dataframe multiple times. But, don't cache huge dataframes unless necessary

  `df.persist(StorageLevel.DISK_ONLY`

  - PERSIST = Stores in memory + disk
  - If memory is limited, use disk persistence

## Option 6 : Skew Handling for Large Datasets

Step 1 : Check Data Skew - check if some partitions are much larger than others
Step 2 : Perform Salting - if one key has too many records, introduce salting b/c we need to prevent one partition from handling most of the data

## Option 7 : Optimize Joins for Large Datasets

- Use Sort-Merge Join Instead of Shuffle-Hash Join (If both dataframes are too large for memory, sort them first)

`df_large = df_large.sort('id')`

`df_small = df_small.sort('id')`

`df_joined = df_large.join(df_small, 'id')`

## Option 8 : Parallel Processing and UDF Optimization

## Option 9 : Optimize Read Performance With Indexing
- Optimizing by using Z-Ordering on large tables for faster queries. WHY? Sottres similar rows together, reducing scan time.

## Option 1 : Avoid Shuffling and Minimize Data Movement
So, what's the alternatives? Here it is:

- 1.1 Use `Broadcast Joins` for small tables - when joining a small DataFrame with a large DataFrame, always use broadcast joins to avoid expensive shuffling
- 1.2 Use `Bucketing` instead of Shuffling for Joins

### 1.1 Using Broadcast Joins Instead of Shuffling for Joins


In [12]:
# Using Broadcast joins for Small Tables
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import time

spark = SparkSession.builder.appName('BroadcastJoinExample').getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)
df_small = spark.read.csv('./resources/9_small_file.csv', header=True, inferSchema=True)

start_time = time.time()
df_joined = df_large.join(broadcast(df_small), 'id')
df_joined.count()
end_time = time.time()
print(f"Execution Time (With Broadcast): {end_time - start_time} seconds")



# without broadcasting
df_large1 = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)
df_small1 = spark.read.csv('./resources/9_small_file.csv', header=True, inferSchema=True)

start_time = time.time()
df_joined1 = df_large1.join(df_small1, 'id')
df_joined1.count()
end_time = time.time()
print(f"Execution Time (Without Broadcast): {end_time - start_time} seconds")


# use .explain() to view query plans
print('----------------NOW WITH NO BROADCAST (Look for (strategy=broadcast))--------------')
df_joined.explain(True)
print('=================================================================================')
print('----------------NOW WITH NO BROADCAST (Look for Shuffle Operations)--------------')
df_joined1.explain(True)

spark.stop()

25/03/29 21:45:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/29 21:45:54 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/03/29 21:45:54 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/03/29 21:45:54 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/03/29 21:45:54 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


Execution Time (With Broadcast): 0.1299881935119629 seconds
Execution Time (Without Broadcast): 0.13207006454467773 seconds
----------------NOW WITH NO BROADCAST (Look for (strategy=broadcast))--------------
== Parsed Logical Plan ==
'Join UsingJoin(Inner, [id])
:- Relation [id#957,name#958,salary#959,department#960] csv
+- ResolvedHint (strategy=broadcast)
   +- Relation [id#982,bonus_percentage#983] csv

== Analyzed Logical Plan ==
id: int, name: string, salary: int, department: string, bonus_percentage: double
Project [id#957, name#958, salary#959, department#960, bonus_percentage#983]
+- Join Inner, (id#957 = id#982)
   :- Relation [id#957,name#958,salary#959,department#960] csv
   +- ResolvedHint (strategy=broadcast)
      +- Relation [id#982,bonus_percentage#983] csv

== Optimized Logical Plan ==
Project [id#957, name#958, salary#959, department#960, bonus_percentage#983]
+- Join Inner, (id#957 = id#982), rightHint=(strategy=broadcast)
   :- Filter isnotnull(id#957)
   :  +- Rela

### 1.2 Using Bucketing Instead of Shuffling for Joins

- `If you're frequently using same column for join`, use bucketing to pre-sort the data


In [18]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName('BucketOptimization').getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)
df_small = spark.read.csv('./resources/9_small_file.csv', header=True, inferSchema=True)

# apply bucketing and sorting (avoids shuffling in joins)
df_large.write.mode('overwrite').bucketBy(8, 'id').sortBy('id').saveAsTable('large_bucketed_table')
df_small.write.mode('overwrite').bucketBy(8, 'id').sortBy('id').saveAsTable('small_bucketed_table')

# Now perform joins (Shuffle Heavy)
start_time = time.time()
df_joined = df_large.join(df_small, 'id')
df_joined.count()
end_time = time.time()
print(f'Execution Time {end_time - start_time:.2f} seconds')
df_joined.explain(True)

# Now perform joins (No Shuffle) - Optimized join using Bucketed tables
df_large_bucketed = spark.table('large_bucketed_table')
df_small_bucketed = spark.table('small_bucketed_table')
start_time = time.time()
df_joined = df_large_bucketed.join(df_small_bucketed, 'id')
df_joined.count()
end_time = time.time()
print(f'Execution Time {end_time - start_time:.2f} seconds')
df_joined.explain(True)

Execution Time 0.11 seconds
== Parsed Logical Plan ==
'Join UsingJoin(Inner, [id])
:- Relation [id#1435,name#1436,salary#1437,department#1438] csv
+- Relation [id#1460,bonus_percentage#1461] csv

== Analyzed Logical Plan ==
id: int, name: string, salary: int, department: string, bonus_percentage: double
Project [id#1435, name#1436, salary#1437, department#1438, bonus_percentage#1461]
+- Join Inner, (id#1435 = id#1460)
   :- Relation [id#1435,name#1436,salary#1437,department#1438] csv
   +- Relation [id#1460,bonus_percentage#1461] csv

== Optimized Logical Plan ==
Project [id#1435, name#1436, salary#1437, department#1438, bonus_percentage#1461]
+- Join Inner, (id#1435 = id#1460)
   :- Filter isnotnull(id#1435)
   :  +- Relation [id#1435,name#1436,salary#1437,department#1438] csv
   +- Filter isnotnull(id#1460)
      +- Relation [id#1460,bonus_percentage#1461] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#1435, name#1436, salary#1437, department#1438, bonus_

# Option 2 : Optimize Data Partioning

### 2.1 : Repartition for Parallelism

In [31]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName('RepartitionOptimization').getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)
# Spark automatically assigns a default number of partitions


start_time = time.time()
df_large.count()
end_time = time.time()
print(f'Execution Time (for Default Partitions) : {end_time - start_time:.2f} seconds')

# Now, increase partitions for paralled processing
df_repartitioned = df_large.repartition(10) # Increase to 10 partitions


start_time = time.time()
df_repartitioned.count()
end_time = time.time()
print(f'Execution Time (for Repartitioned) : {end_time - start_time:.2f} seconds')

Execution Time (for Default Partitions) : 0.07 seconds
Execution Time (for Repartitioned) : 0.09 seconds


### 2.2 : Reduce Partitions for Writing Using coalesce()
If you write data as a single file, reduce the number of partitions

In [39]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ReducePartitionsForWriting').getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema = True)

start_time = time.time()
df_repartitioned = df_large.repartition(10) # Increase to 10 partitions
df_repartitioned.write.mode('overwrite').parquet('./output/writeWith10Partitions')
end_time = time.time()
print(f'Initial partitions: {df_large.rdd.getNumPartitions()}')
print(f'Execution Time (for Repartitioned) : {end_time - start_time:.2f} seconds')


# Reduce partitions using coalesce()
start_time = time.time()
df_coalesced = df_large.coalesce(1) # reducing to 1 partition
df_coalesced.write.mode('overwrite').parquet('./output/writeWith1Partition')
end_time = time.time()
print(f'Partitions after coalesce: {df_large.rdd.getNumPartitions()}')
print(f'Execution Time (for coalesced - reducing to 1 partition) : {end_time - start_time:.2f} seconds')


Initial partitions: 1
Execution Time (for Repartitioned) : 0.25 seconds
Partitions after coalesce: 1
Execution Time (for coalesced - reducing to 1 partition) : 0.16 seconds


### Skew Handling for Large DataSets

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand

spark = SparkSession.builder.appName('HandleDataSkew').getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)
df_skew_check = df_large.groupBy('department').count().orderBy(col('count').desc())
print('Checking Skewness')
df_skew_check.show()


# Applying salting to reduce skewness
df_large = df_large.withColumn('salt_department', (rand() * 10).cast('int'))
df_salted = df_large.repartition('department', 'salt_department') # Repartition using both 'department' and 'salt_department'
df_salted_skew_check = df_salted.groupBy('department', 'salt_department').count().orderBy(col('count').desc())
print('After Applying Salting:')
df_salted_skew_check.show()

# Now dropping salt column
df_final = df_salted.drop('salt_department')

# stopping Spark Session
spark.stop()

Checking Skewness
+----------+-----+
|department|count|
+----------+-----+
|   Finance|25170|
|        HR|25033|
|     Sales|24901|
|        IT|24896|
+----------+-----+

After Applying Salting:
+----------+---------------+-----+
|department|salt_department|count|
+----------+---------------+-----+
|   Finance|              7| 2573|
|   Finance|              1| 2568|
|     Sales|              9| 2564|
|        HR|              8| 2560|
|        HR|              6| 2552|
|        HR|              7| 2543|
|        IT|              1| 2538|
|        IT|              8| 2537|
|        IT|              5| 2530|
|   Finance|              2| 2525|
|        HR|              2| 2520|
|   Finance|              8| 2520|
|     Sales|              7| 2517|
|     Sales|              0| 2515|
|   Finance|              9| 2511|
|        HR|              0| 2508|
|   Finance|              0| 2507|
|        HR|              1| 2506|
|     Sales|              6| 2506|
|        IT|              4| 2503|


# Option 8 : Parallel Processing and UDF Optimization
- Avoid standard Python UDFs (slow) and use vectorized UDFs(pandas_udf)

In [63]:
import pandas as pd
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import FloatType

spark = SparkSession.builder.appName('PandasUDFExample').getOrCreate()
df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema = True)

# Define Pandas UDF for Bonus Calculation
@pandas_udf(FloatType())
def calculate_bonus(salary: pd.Series) -> pd.Series:
    return salary * 0.1

df_large = df_large.withColumn('bonus', calculate_bonus(df_large['salary']))
df_large.show(10)

+---+-----------+------+----------+-------+
| id|       name|salary|department|  bonus|
+---+-----------+------+----------+-------+
|  1| Employee_1|113419|   Finance|11341.9|
|  2| Employee_2| 44395|   Finance| 4439.5|
|  3| Employee_3|134158|     Sales|13415.8|
|  4| Employee_4| 59648|     Sales| 5964.8|
|  5| Employee_5| 96002|   Finance| 9600.2|
|  6| Employee_6| 69460|     Sales| 6946.0|
|  7| Employee_7| 39196|   Finance| 3919.6|
|  8| Employee_8|132461|   Finance|13246.1|
|  9| Employee_9|111085|     Sales|11108.5|
| 10|Employee_10| 93008|   Finance| 9300.8|
+---+-----------+------+----------+-------+
only showing top 10 rows



## Optimize Read Performance With Indexing

### 1. Use Data Skipping with Z-Ordering (if specific columns is used for filtering)
- For faster queries on large tables, use Z-Ordering (especially on Databricks) b/c it stores similar rows together, reducing scan time

## Open Table Formats (https://spark.apache.org/third-party-projects.html) : Delta Lake, Hudi, Apache Iceberg
### Delta Lake - Storage layer that provides ACID transactions and scalable metadata handling for Apache Spark workloads
* An open-source storage framework that enables building a format agnostic Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, Hive, Snowflake, Google BigQuery, Athena, Redshift, Databricks, Azure Fabric and APIs for Scala, Java, Rust, and Python.

In [79]:
from pyspark.sql import SparkSession
import time

# builder = pyspark.sql.SparkSession.builder.appName("ZOrderingOptimization").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

# spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark = SparkSession.builder.appName('ZOrderingOptimization').config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0").getOrCreate()

df_large = spark.read.csv('./resources/8_large_file.csv', header=True, inferSchema=True)


# Write the DataFrame to Delta format (Parquet-based storage)
df_large.write.mode('overwrite').format('delta').option('optimizeWrite', 'true').save('./output/z_order_optimization_table')
df_large.write.csv('./output/without_z_order_optimization_mode.csv')

# Z-ordering - after writing the data we perform Z-Ordering optimization on the `department` column by calling, so that queries that filter by `department` will benefit from faster data
spark.sql('OPTIMIZE delta.`./output/z_order_optimization_table` ZORDER BY (department)')

# Now you can run efficient queries on this table. Example:
start_time = time.time()
df_optimized = spark.read.format('delta').load('./output/z_order_optimization_table')
df_optimized.count()
end_time = time.time()
print(f'Execution Time (Reading Z-ordered dataframe) : {end_time - start_time:.2f} seconds')

start_time = time.time()
df_non_optimized = spark.read.csv('./output/without_z_order_optimization_mode.csv')
df_non_optimized.count()
end_time = time.time()
print(f'Execution Time (Reading Non Z-ordered dataframe) : {end_time - start_time:.2f} seconds')


spark.stop()

Py4JJavaError: An error occurred while calling o1345.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:76)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


In [70]:
spark.version

'3.5.5'

SyntaxError: invalid syntax (1302965558.py, line 1)