In [0]:
from pyspark.sql.types import IntegerType
df = spark.createDataFrame(range(10),IntegerType())

In [0]:
print("Default Partitions: ", df.rdd.getNumPartitions())

Default Partitions:  8


In [0]:
# This will show how the data get partitioned in the 8 Partitions
df.rdd.glom().collect()

Out[6]: [[Row(value=0)],
 [Row(value=1)],
 [Row(value=2)],
 [Row(value=3), Row(value=4)],
 [Row(value=5)],
 [Row(value=6)],
 [Row(value=7)],
 [Row(value=8), Row(value=9)]]

### The Fundamental concept of Spark DataFrames is basically runs of Partitions, lets say if we have 1000 pages book, we just dont put all the necessary content in just one single page, instead we put it all the 1000 pages equally, in the same way we divide DataFrames into equal partitions and process them parllelly. 

## In Pyspark, we can do partitions in two different ways
## - repartition()
## - coalesce()

| Advantage             | Why It Matters                      |
| --------------------- | ----------------------------------- |
| Parallelism           | Speeds up processing using all CPUs |
| Resource efficiency   | Keeps all workers busy and balanced |
| Control output files  | Fewer, bigger files = faster I/O    |
| Better joins/grouping | Less shuffling, more efficient      |
| Smart data storage    | Faster queries on partitioned files |


In [0]:
dbutils.fs.ls('/FileStore/tables/')

Out[31]: [FileInfo(path='dbfs:/FileStore/tables/Online_Retail-1.csv', name='Online_Retail-1.csv', size=46133248, modificationTime=1744228374000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail-2.csv', name='Online_Retail-2.csv', size=46133248, modificationTime=1744228637000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail-3.csv', name='Online_Retail-3.csv', size=46133248, modificationTime=1744229258000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail-4.csv', name='Online_Retail-4.csv', size=46133248, modificationTime=1744229659000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail-5.csv', name='Online_Retail-5.csv', size=46133248, modificationTime=1744313114000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail.csv', name='Online_Retail.csv', size=46133248, modificationTime=1744221409000),
 FileInfo(path='dbfs:/FileStore/tables/Online_Retail.xlsx', name='Online_Retail.xlsx', size=23715344, modificationTime=1744220158000),
 FileInfo(path='dbfs:/FileStore/tables

In [0]:
df = spark.read.format('csv')\
    .option('inferSchema',True)\
    .option('header',True)\
    .load('dbfs:/FileStore/tables/Online_Retail.csv')

In [0]:
df.createOrReplaceTempView("dfTable")

In [0]:
# This will let us know the default the Number of Partitions that we have (we can change this based on our needs to improve the performace)
sc.defaultParallelism

In [0]:
# This will let us know the max size of data that each partiton should have that is 128 MB (we can change this based on our needs to improve the performace)
spark.conf.get("spark.sql.files.maxPartitionBytes")

# 1.  repartition()
repartition(n) is a method used to change the number of partitions in a DataFrame by reshuffling the data, _**by using this function we ask spark to reshuffle this data into n evenly-sized chunks**_.

### **Why Use repartition()?**
- To increase or decrease the number of partitions.
- To balance the data across partitions for better performance.
- To prepare the data for joins, groupBy, or writing to disk efficiently.

| Feature                  |   repartition()                          |
| ------------------------ | ---------------------------------------- |
| Shuffles data?           | ✅ Yes (data is moved around the cluster) |
| Can increase partitions? | ✅ Yes                                    |
| Can decrease partitions? | ✅ Yes                                    |
| Speed                    | 🐢 Slower (because of the shuffle)       |
| Data Balance             | ✅ Better (after shuffle)                 |



## When Should You Use repartition()?
✅ Use it when:


- You are increasing the number of partitions
e.g., data is too big for current partitions, and you want more parallelism.
- You’re preparing for a heavy operation like:
join
groupBy
distinct
These work better if data is evenly partitioned.
- You want to avoid skew (data imbalance)

❌ When Not to Use repartition()
- If you're just reducing partitions, and performance matters → use coalesce() instead (faster, no shuffle).



### Imagine your data is a deck of cards:

### Before:
You have 2 piles (partitions) like this:


- [♠♣♥♦♠♣♠] [♥♦♥♦♣♠♦]

### repartition(4):
Cards are reshuffled and split into 4 piles evenly:
- [♠♦] [♣♠] [♥♠] [♦♣]
### coalesce(2):
You just merge existing piles (no shuffling):
- [♠♣♥♦♠♣♠♥] [♦♥♦♣♠♦]

### ✨ Bonus: Repartition by Column
You can repartition based on a column, which helps group related data together:

### df.repartition(10, "category")

This tells Spark:

- “Put all rows with the same category value into the same partition (as much as possible).”

- ✅ Helpful for joins and filtering!

### Without Repartition — Random Distribution:

- Partition 1: [ A, C ]
- Partition 2: [ B, A ]
- Partition 3: [ C, B ]



### With df.repartition(3, "Category"):

- Partition 1: [ A, A, A ]
- Partition 2: [ B, B, B ]
- Partition 3: [ C, C ]

In [0]:
# Check current number of partitions
print("Before:", df.rdd.getNumPartitions())

Before: 8


In [0]:
# Repartition to 10 partitions
df2 = df.repartition(10)

In [0]:
print("After:", df2.rdd.getNumPartitions())

After: 10


In [0]:
spark.sparkContext.defaultParallelism

Out[4]: 8

In [0]:
#Partitons by Category
df2 = df.repartition(10,'Country')

# 2. coalesce()

**coalesce(n)** is used to reduce the number of partitions without doing a full shuffle of the data , this function tells spark to _**combine some of the existing partitions to create fewer partitions — but don't move data around too much**_."

## Why Use coalesce()?
- You want to reduce the number of partitions (usually before writing data to disk).
- You want fewer output files.
- You want to avoid a full shuffle (which repartition() does).

## When is it Useful?
✅ Example: Writing Data
- Let’s say Spark made 200 partitions, and you want 5 output files, not 200!
### df.coalesce(5).write.parquet("output/")
✔️ This combines the 200 partitions into 5, without a heavy shuffle, and writes only 5 files.

### Visual Example
🔹 Before (6 Partitions):
- [ A1 A2 ]   [ B1 ]   [ C1 ]   [ D1 D2 ]   [ E1 ]   [ F1 F2 ]
### df.coalesce(3):
- [ A1 A2 B1 ]   [ C1 D1 D2 ]   [ E1 F1 F2 ]
#### ✔️ Spark merges existing partitions.
#### ❌ Does not rebalance or reshuffle the data.

### Key Difference vs repartition()

| Feature                  | `coalesce()`                 | `repartition()`              |
| ------------------------ | ---------------------------- | ---------------------------- |
| Can increase partitions? | ❌ No                         | ✅ Yes                        |
| Can decrease partitions? | ✅ Yes                        | ✅ Yes                        |
| Shuffles data?           | ❌ No shuffle                 | ✅ Full shuffle               |
| Speed                    | 🚀 Faster (no shuffle)       | 🐢 Slower (shuffle happens)  |
| Balancing                | ❌ May be uneven              | ✅ Balanced across partitions |
| Use when                 | Reducing file count, writing | Balancing for joins/groupBy  |


### 📌 When Should You Use coalesce()?
✅ Best time to use:
- Just before writing to disk, e.g., Parquet/CSV
- When you want to reduce number of files

### ❌ Avoid using coalesce():
If you need evenly balanced partitions for processing → use repartition() instead.

In [0]:
# reduce the data to 2 partitions before writing
df.coalesce(2).write.csv("FileStore/tables/retail/")

# 🧠 Pro Tip
If you're unsure:

✅ Use coalesce() when reducing partitions before writing

✅ Use repartition() when processing and balancing data

#3. combine repartition() and coalesce() 
we can combine repartition() and coalesce() in PySpark — and in fact, this is a smart strategy in real-world data engineering.

### Why Combine Them?
Because they serve different purposes:

- repartition() → balances data (for processing)
- coalesce() → reduces partitions (for writing)


### 🔁 What’s happening here?
- repartition() helps Spark rebalance and optimize computation (like joins, groupBy, etc.)
- coalesce() helps you avoid writing 100 tiny files by merging partitions before saving.

### ⚠️ Important Notes
- Always do repartition() before expensive operations.
- Use coalesce() only before writing to disk.
- Don’t use coalesce() if you're going to continue processing — it's unbalanced and may slow things down.

### 📊 Summary
| Use Case                    | Function Used                         |
| --------------------------- | ------------------------------------- |
| Balance data for joins      | `repartition()`                       |
| Reduce file count           | `coalesce()`                          |
| Combined use (best of both) | `repartition()` → work → `coalesce()` |


The Typical Patterns is as follows:


In [0]:
from pyspark.sql.functions import *

# 1. Repartition for balanced processing (e.g. for joins)
df2 = df.repartition(100, "Country")

# Do some heavy transformations...
df2 = df2.groupBy("Country").agg(sum(col('Quantity').cast("int")).alias('TotalQuantity'))

# 2. Coalesce to reduce output files (faster write)
df2.coalesce(6).write.parquet("CountryQuantity")

In [0]:
df2.display()

Country,TotalQuantity
Hong Kong,4769
Iceland,2458
Israel,4353
Channel Islands,9479
Sweden,35637
USA,1034
Cyprus,6317
Singapore,5234
Germany,117448
RSA,352
