In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
df=spark.read.format("parquet").load("/mnt/adls/target_tables/Dim/customers_table/")

# Understanding Job, Stage, and Task Creation in Spark Architecture

### Example Question:
We need to provide the count of customers whose phone number ends with '7' from the given DataFrame.

### Breakdown of Spark Execution Flow

### Explanation of the Flow:

1. **User Query**:
   - The user writes a query to filter customers whose phone number ends with '7' (e.g., df.filter(df.Phone.endswith("7")).count()).


2. **Driver Program**:
   - The Driver Program parses the user query and constructs a Directed Acyclic Graph (DAG) that represents the stages of computation.
   - The Driver passes this logical plan to the Catalyst Optimizer for optimization.
   
   1. ***Catalyst Optimizer***:
      - The Catalyst Optimizer applies rule-based transformations to optimize the logical plan.
      - It generates one or more physical plans based on the optimization and may apply cost-based optimization to choose the most efficient plan.
   
   - The driver program will create stages to apply the filter and perform the action to count the results.
   - The driver program reaches out to the *Cluster Manager* for resource management.

3. **Cluster Manager**:
   - The cluster manager allocates resources (memory, CPU) and divides the data into partitions.
   - The manager sends each partition to different worker nodes for parallel execution.

4. **Worker Nodes (Executors)**:
   - The worker nodes execute the task of filtering customers whose phone numbers end with '7'.
   - Multiple worker nodes will process the partitions of the data in parallel.

5. **Driver Program**:
   - After task execution, the driver collects the filtered results from the worker nodes.
   - The driver aggregates the results and returns the count of customers whose phone numbers end with '7' to the user.

---
## Flow Diagram

```plaintext
+-------------------+
|   User Query      |         (Write query in notebook)
|                   |         (Example: df.filter(col("Phone").cast(StringType()).like("%7")).count())
+-------------------+ 
         |
         v
+-------------------+     
|   Driver Program  |         (Creates DAG, plans execution)
|                   |         (Filter rows, create actions)
+-------------------+
         |
         v
+-------------------+
| Catalyst Optimizer|         (Optimizes the query plan)
|                   |         (Logical Plan -> Optimized Logical Plan -> Physical Plan)
+-------------------+
         |
         v
+-------------------+     
|  Cluster Manager  |         (Allocates resources, partitions data)
+-------------------+  
         |
         v
+-------------------+        +-------------------+
| Worker Node 1     |<------>| Worker Node 2     |      (Parallel execution of tasks)
| (Executor)        |        | (Executor)        |
+-------------------+        +-------------------+ 
         |                        |
         v                        v
+-------------------+        +-------------------+
| Task Execution    |        | Task Execution    |      (Filtering phone numbers ending with 7)
| (on Data Partitions)|      | (on Data Partitions)|
+-------------------+        +-------------------+
         |
         v
+-------------------+
| Driver Program    |         (Collect results and send them to user)
| (Count result)    |         (Final count of customers)
+-------------------+


In [0]:
df=df.filter(col("Phone").cast(StringType()).like("%7"))
# df=df.filter(col("Phone").cast(IntegerType()).like("%7"))
df.count()
# df.show()

Out[28]: 5926

## Role of the Driver Node:

The **Driver Node** is the main process that controls the execution of a Spark job. It does the following:

1. **Receives the user's query**:
   - The driver is responsible for accepting the Spark code (queries, transformations, and actions) written by the user, typically in the form of a Spark job.

2. **Creates the DAG (Directed Acyclic Graph)**:
   - The driver takes the code and converts it into a **DAG** that represents the sequence of operations (transformations) that need to be performed.

3. **Plans and schedules the execution**:
   - Once the DAG is created, the driver schedules tasks and breaks them down into smaller operations that can be executed across the Spark cluster.

4. **Distributes tasks**:
   - The driver sends the tasks (divided by partitions) to the **Worker Nodes (Executors)** for parallel execution.

5. **Collects results**:
   - After execution, the driver aggregates the results of the tasks and sends them back to the user.

---
 - we have only 4 task in the diagram becasue for each executer node in our databricks cluster we have 4 cores to execute out task 
 
```plaintext
+----------------------------------+
|      User Query:                |
|      df.filter(col("Phone").cast(StringType()).like("%7")).count() |
+----------------------------------+
                  |
                  v
         +------------------+ 
         |   Driver Node    |        (Creates DAG, plans the execution)
         +------------------+
                  |
                  v
    +-----------------------------+
    |      Stage 1: Filter Rows    | (Filter rows where phone number ends with '7')
    +-----------------------------+
                  |
                  v
      +-------------------------+     
      | Task 1 (Partition 1)     |        (Filter on partition 1)
      +-------------------------+   
                  |
                  v
      +-------------------------+     
      | Task 2 (Partition 2)     |        (Filter on partition 2)
      +-------------------------+ 
                  |
                  v
      +-------------------------+     
      | Task 3 (Partition 3)     |        (Filter on partition 3)
      +-------------------------+
                  |
                  v
      +-------------------------+     
      | Task 4 (Partition 4)     |        (Filter on partition 4)
      +-------------------------+
                  |
                  v
    +-----------------------------+
    |   Action: count()           |        (Collect the results and count the filtered rows)
    +-----------------------------+
                  |
                  v
      +-------------------------+     
      | Final Result (Count)     |        (Send final count back to user)
      +-------------------------+ 
 


## Steps in the DAG Creation:

1. **The Driver reads the user query**:
   - The driver receives the user's query (e.g., `df.filter(col("Phone").cast(StringType()).like("%7"))`).

2. **The DAG for this query consists of one stage**:
   - Since there’s no data shuffling involved in the `filter` operation, the DAG for this query is a **single stage**.

3. **Stage 1**:
   - This stage involves the `filter` transformation, which checks if the `Phone` column ends with '7'. 
   - This operation is a **narrow transformation**, meaning it does not require data shuffling between partitions. 
   - Hence, Spark executes this operation in a single stage, and it can be split across multiple tasks running on different partitions.


## Catalyst Optimizer Breakdown

#### 1. **Analysis**
- **Purpose**: The query is parsed and translated into a **Logical Plan**.
- **Steps**:
  - Spark begins by analyzing the user query.
  - It parses the query to determine the structure of the operations and the underlying data.
  - This results in a **Logical Plan**: a high-level, abstract representation of the transformations (such as filters, joins, etc.) that need to be applied to the data
  
  **Example**: 
  If the user writes `df.filter(col("Phone").like("%7")).count()`, the logical plan would describe the filtering and counting operations that need to be performed on the DataFrame `df`.

#### 2. **Logical Optimization**
- **Purpose**: Apply **rule-based optimizations** to the Logical Plan to improve query efficiency.
- **Steps**:
  - The Catalyst Optimizer applies various transformations to the logical plan to improve its efficiency.
  - These transformations include:
    - **Predicate Pushdown**: Moving filters closer to the data source to reduce the amount of data read. For example, if the filter is on a column (e.g., `Phone`), it tries to push this filter operation to the database or data source, which can handle it more efficiently.
    - **Projection Pruning**: Removing unnecessary columns from the plan, ensuring that only the required columns are retrieved, thus saving memory and computation.
    - **Constant Folding**: Simplifying expressions involving constants at compile time. If an expression like `1 + 2` is present, it will be optimized to `3` before execution.
  
  **Outcome**: This step reduces the cost of computation by simplifying the logical operations, improving the query plan’s overall efficiency.


#### 3. **Physical Planning**
- **Purpose**: Create a **Physical Plan** based on the optimized logical plan.
- **Steps**:
  - Once the logical plan has been optimized, the Catalyst Optimizer generates one or more **Physical Plans**.
  - A **Physical Plan** specifies how the operations in the logical plan should be executed. It provides the detailed execution strategy, including how data will be read, processed, and written.
  - Key components of physical planning include:
    - **Join Strategy**: The physical plan will decide the most efficient join method to use (e.g., **broadcast join**, **shuffle join**, etc.) based on factors such as data size and distribution.
    - **Data Partitioning**: Data will be partitioned and distributed across worker nodes. The physical plan specifies how data will be partitioned and processed in parallel to optimize performance.

  **Outcome**: The physical plan provides a concrete, executable strategy for Spark to follow.


#### 4. **Code Generation**
- **Purpose**: Generate **executable code** based on the physical plan for execution on Spark.
- **Steps**:
  - The final step in the Catalyst Optimizer is to generate **executable code** for the physical plan.
  - Spark uses **Tungsten** for code generation. This involves creating optimized JVM bytecode for each operation (like filtering, aggregating, etc.), ensuring that the operations are executed efficiently on the cluster.
  - **Spark Jobs Code**: This code is sent to the cluster to be executed across the worker nodes.
  
  **Outcome**: Spark creates efficient, low-level code that is optimized for execution on the underlying hardware (such as CPUs, memory, and network).


### Diagram Overview :

```plaintext
+-------------------+
| Catalyst Optimizer|  <-- Optimizes the query using various transformations
|                   |
|   +-------------+ |
|   |  Analysis   | |  <-- Parse query and create a logical plan
|   | - Logical Plan| |
|   +-------------+ |
|         |         |
|         v         |
|   +-------------+ |
|   | Logical     | |  <-- Apply rule-based optimizations
|   | Optimization| |
|   | - Predicate Pushdown| |
|   | - Projection Pruning| |
|   | - Constant Folding| |
|   +-------------+ |
|         |         |
|         v         |
|   +-------------+ |
|   | Physical    | |  <-- Generate physical plans based on optimizations
|   | Planning    | |
|   | - Join Strategy | |
|   | - Data Partitioning| |
|   +-------------+ |
|         |         |
|         v         |
|   +-------------+ |
|   | Code        | |  <-- Generate executable code for execution
|   | Generation  | |
|   | - Spark Jobs Code| |
|   +-------------+ |
+-------------------+


In [0]:
# output of this command show the dag whihch will be generate in the back end based on our logical and physical plan whihc have the details of every steps in a catlist optimizer


df.filter(col("Phone").cast(StringType()).like("%7")).explain(True)

== Parsed Logical Plan ==
'Filter cast('Phone as string) LIKE %7
+- Filter cast(Phone#518 as string) LIKE %7
   +- Filter cast(Phone#518 as string) LIKE %7
      +- Relation [CustomerID#514,FirstName#515,LastName#516,Email#517,Phone#518,RegistrationDate#519,LastPurchaseDate#520,LastUpdated#521] parquet

== Analyzed Logical Plan ==
CustomerID: int, FirstName: string, LastName: string, Email: string, Phone: string, RegistrationDate: string, LastPurchaseDate: string, LastUpdated: string
Filter cast(Phone#518 as string) LIKE %7
+- Filter cast(Phone#518 as string) LIKE %7
   +- Filter cast(Phone#518 as string) LIKE %7
      +- Relation [CustomerID#514,FirstName#515,LastName#516,Email#517,Phone#518,RegistrationDate#519,LastPurchaseDate#520,LastUpdated#521] parquet

== Optimized Logical Plan ==
Filter (isnotnull(Phone#518) AND EndsWith(Phone#518, 7))
+- Relation [CustomerID#514,FirstName#515,LastName#516,Email#517,Phone#518,RegistrationDate#519,LastPurchaseDate#520,LastUpdated#521] parquet

=


## Executor Memory Allocation Breakdown (8 GB Executor)

#### 1. **Reserved Memory (300 MB)**:
   - This portion is set aside for **internal use** by Spark or JVM overhead.
   - **Not available** for user tasks or Spark execution/storage.
   - **Example**: This memory might be used for managing Spark's internal state, JVM overhead, or system tasks.

---

#### 2. **Spark Memory (50% of total memory)**:
   - The total memory allocated to each executor for tasks is **8 GB**.
   - The Spark memory is **divided** into two parts:
   
   - ##### **Execution Memory (60% of 4 GB = 2.4 GB)**:
     - **Purpose**: Used for computation tasks like shuffling, sorting, filtering, and joining data.
     - **Example Task**: For the task `df.filter(col("Phone").like("%7")).count()`, Spark:
       - Filters the rows in the DataFrame where the "Phone" column contains the digit "7".
       - **Memory Use**: Execution memory is used to store temporary results while performing this filtering task.
       - **Memory Allocated**: **4.8 GB** (60% of 8 GB) is used for this task's execution.
   
   - ##### **Storage Memory (40% of 4 GB = 1.6  GB)**:
     - **Purpose**: Used for **caching** or **persisting** data (e.g., RDDs, DataFrames) for future stages or reuse.
     - **Example Task**: If you cache the DataFrame `df`, it will occupy the storage memory:
       - `df.cache()` stores the data in memory for faster access across stages.
       - **Memory Use**: Storage memory is allocated to hold the DataFrame or filtered data if cached.
       - **Memory Allocated**: **1.6 GB** (40% of 4 GB) is available for storing this cached data.

   - ##### **Slots**:
      - The **Spark memory** is divided into 4 (this is only for databricks clusters) **slots** (`slot1`, `slot2`, `slot3`, `slot4`).
      - Each **slot** is used for parallel task execution.
      - **Example Task**: For the `df.filter(col("Phone").like("%7")).count()` operation, the task might be split into multiple partitions, and each partition is processed in parallel using a separate slot.
      - If there are 4 partitions in the DataFrame, Spark could assign each partition to a separate slot, allowing the task to run **concurrently**.

- #### **Note**
    - Execution Memory or Storage Memory can be increased or decresed based on the requriments
---


  

---

#### 4. **User Memory (40% of total memory = 3.2 GB)**:
   - Represents the **memory allocated** for the user’s data and tasks.
   - The user memory is used to store **intermediate data** and **results** for the task.
   - **Example Task**: For `df.filter(col("Phone").like("%7")).count()`, the filtered data (matching "Phone" values with "7") will be temporarily stored in user memory before counting the rows.
   - **Memory Allocated**: **3.2 GB** (40% of 8 GB) is available for the user’s data and task results.

---

### Key Concept:
- ### **Unified Memory Manager**: 
  - Allows Spark to **dynamically allocate unused memory** between **execution** and **storage memory**, depending on which one is underutilized.
  - **For example**: If execution memory is not fully used, Spark can allocate some of it to storage memory (for caching), improving resource utilization.

---

#### Example Scenario (8 GB Executor):

Let’s say we have **8 GB of memory allocated** to each executor. Here’s how the memory is allocated based on the task `df.filter(col("Phone").like("%7")).count()`:

1. **Reserved Memory (300 MB)**:
   - **300 MB** is reserved for Spark’s internal overhead. This memory will not be used for user tasks like filtering.

2. **Executor Memory (50% = 4 GB)**:
   - **Execution Memory (60% = 2.4 GB)**: This memory will handle the computation for filtering the DataFrame based on the "Phone" column containing "7".
   - **Storage Memory (40% = 1.6 GB)**: If the filtered DataFrame is cached for future stages or reuse, it will be stored in the 3.2 GB of storage memory.

3. **Slots**:
   - The executor memory (4 GB) is divided into **4 slots** for parallel execution.
   - **Each slot** gets **1 GB** of memory, and each slot processes a partition of the DataFrame in parallel.
     - For example, if the DataFrame `df` has 4 partitions, Spark may process each partition in parallel across 4 slots.

4. **User Memory (40% = 3.2 GB)**:
   - The task `df.filter(col("Phone").like("%7")).count()` will utilize **3.2 GB** of user memory to store intermediate results and final results.
   - After the filtering, the count of matching rows is stored in the user memory.

---

#### Memory Management with Unified Memory Manager:
- The **Unified Memory Manager** in Spark ensures that unused memory from **Execution Memory** and **Storage Memory** can be dynamically shared.
- If the **Execution Memory** is underutilized while filtering, it can be reallocated for **Storage Memory**, allowing more data to be cached.

---

#### Final Summary:
- **Reserved Memory**: 300 MB for Spark's internal overhead.
- **Executor Memory** (4 GB): Divided into **Execution Memory** (2.4 GB) for processing tasks and **Storage Memory** (1.6 GB) for caching.
- **Slots**: The executor can process tasks in parallel using 4 slots, each with 1 GB of memory.
- **User Memory**: 3.2 GB is used for user data during task execution.

---

### Diagram Overview :  


```plaintext

+-------------------------------+
|   reserved memeory 300mb      |
|-------------------------------|<--+      +--------------------------------+
|+-----------------------------+|   |      |        |       |       |       |
||                             ||   |      | slot1  | slot2 | slot3 | slot4 | 
||   storage memeory (50%)     ||   |      |        |       |       |       | 
||                             ||   |      +--------------------------------+
|+-----------------------------+|   |------->(spark memory 60%)it has slots to execute our code      
||                             ||   |
||   Executer memeory (50%)    ||   |
||                             ||   |
|+-----------------------------+|   |
|-------------------------------|<--+
|                               |
|   user memeory 40%            |
|                               |
|                               |
+-------------------------------+
