# Spark SQL Catalyst Optimizer and Tungsten Engine

## Catalyst Optimizer

![Catalyst Optimizer Overview](../images/catalyst_overview.png)

### Definition
The **Catalyst Optimizer** is a key component of **Spark SQL** responsible for optimizing the execution plans of **DataFrame** and **Dataset** operations.

### Role in Spark SQL
Catalyst acts as a **query planner** that transforms **logical plans** into **optimized physical plans**, ensuring queries execute efficiently.

### Phases of Optimization 

- **Analysis**
  - Resolves column names, table names, and functions.
  - Generates an **AST (Abstract Syntax Tree)** and checks for errors (missing columns, incorrect data types).

- **Logical Optimization**
  - Applies **rule-based transformations** such as:
    - predicate pushdown
    - projection pruning
    - constant folding
    - boolean expression simplification
  - Goal: reduce the data processed early.

- **Physical Planning**
  - Chooses the best physical plan considering:
    - data distribution
    - join strategies
    - available resources
    
- **Code Generation (Whole-Stage Code Generation)**
  - Introduced as part of Project Tungsten.
  - Generates optimized Java bytecode to minimize CPU and memory overhead.

### Why it improves performance
Catalyst helps by:
- reducing the amount of data processed
- minimizing operations and complexity
- using resources (CPU/memory) more efficiently


### Example: See Catalyst Plans Using `explain()`

We will create a small “orders” dataset and run a query with filter + select + aggregation.  
Then we will use `explain(True)` to view:
- the logical plan
- the optimized logical plan
- the physical plan


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, broadcast

spark = SparkSession.builder.appName("SparkSQL-Internals-Demo").getOrCreate()

In [None]:
orders = [
    (1, "IN", "Electronics", 1200.0),
    (2, "IN", "Grocery",      250.0),
    (3, "US", "Electronics",  999.0),
    (4, "IN", "Electronics",  450.0),
    (5, "US", "Grocery",      300.0),
    (6, "IN", "Grocery",      150.0),
]
df_orders = spark.createDataFrame(orders, ["order_id", "country", "category", "amount"])

query_df = (
    df_orders
    .filter(col("country") == "IN")
    .select("category", "amount")
    .groupBy("category")
    .agg(F.sum("amount").alias("total_amount"))
)

query_df.show(truncate=False)


In [None]:
query_df.explain(True)

## Tungsten Engine (Execution Engine)

The **Tungsten execution engine** focuses on **low-level optimizations** for:
- memory management
- CPU utilization

### How it works with Catalyst
- **Catalyst** handles **high-level query planning** (logical → optimized physical plan).
- **Tungsten** focuses on **efficient execution** of that plan.
- Together, they enable Spark to execute queries with maximum efficiency.
 