### Spark architecture
- Spark follows a master-slave architecture with a central Driver and multiple Executors.
- The Driver manages the SparkContext, coordinates tasks, and maintains metadata.
- Executors run on worker nodes, execute tasks, and store data in memory or disk.
- Cluster managers (like YARN, Mesos, or Kubernetes) allocate resources for Spark applications.
- Spark supports multiple deployment modes: local, standalone, and on external cluster managers.

### Spark optimizations and scenario based
- Use predicate pushdown and column pruning to minimize data read.
- Cache/persist intermediate DataFrames when reused multiple times.
- Leverage broadcast joins for small lookup tables to avoid shuffles.
- Repartition or coalesce DataFrames to optimize parallelism and avoid data skew.
- Tune Spark configurations (memory, shuffle partitions, executor cores) based on workload.

### Optimizing your Spark SQL queries

1. **Partition Pruning:** Use partitioned tables and filter on partition columns to minimize data scanned.
2. **Predicate Pushdown:** Ensure filters are pushed down to the data source (like Parquet/ORC) to reduce data read.
3. **Column Pruning:** Select only required columns to reduce I/O and memory usage.
4. **Broadcast Joins:** Use broadcast joins for small tables to avoid expensive shuffles.
5. **Avoid UDFs When Possible:** Prefer built-in Spark SQL functions over UDFs for better performance and optimization.

### Broadcasting
- Broadcasting sends a small DataFrame to all worker nodes, enabling efficient joins by avoiding shuffles.
- Use `broadcast()` in Spark when joining a large table with a much smaller one to optimize performance.

### Long running jobs, how you will debug?
- Check Spark UI for slow stages, skewed tasks, or failed jobs.  

- Review job DAG and execution plan for bottlenecks.  

- Inspect stage/task metrics for data skew, long task durations, or high shuffle read/write.  

- Monitor executor and driver logs for errors, GC pauses, or resource issues.  

- Validate cluster resource allocation (CPU, memory, disk, network).  

- Ensure efficient data partitioning and avoid small files or partitions.  

- Check for excessive shuffles, wide transformations, or large broadcast joins. 

- Confirm that Spark configurations (memory, cores, partitions) are tuned for the workload.

### How to check UI for slow stages, skewed tasks, or failed jobs.  

- Open the Spark UI (usually accessible at `http://<driver-node>:4040` during job execution).
- Navigate to the **Stages** tab to identify stages with high duration, many failed tasks, or significant skew (uneven task durations).
- Use the **Jobs** tab to see job status and failure reasons.
- Drill down into individual stages and tasks to inspect metrics like task time, input size, and shuffle read/write.
- Check the **Executors** tab for resource usage, task failures, and GC overhead.
- Review logs for error messages or stack traces related to failed jobs or tasks.

##### If pipelines are running from so long, how you will debug it? what can be potential bottleneck/issues?

- Analyze Spark UI for slow or stuck stages, long-running tasks, and task failures.
- Check for data skew (some partitions processing much more data than others).
- Review shuffle operations—excessive shuffling can cause performance bottlenecks.
- Inspect resource utilization (CPU, memory, disk, network) on executors and driver.
- Look for frequent garbage collection or out-of-memory errors in logs.
- Validate input data size and partitioning; too many or too few partitions can impact performance.
- Check for inefficient joins (e.g., missing broadcast joins, large shuffles).
- Review code for expensive operations (UDFs, wide transformations, unnecessary caching).
- Ensure cluster resources are sufficient and not over-allocated to other jobs.
- Investigate external dependencies (slow data sources, network latency, etc.).
- Use event logs and metrics to trace where the pipeline is spending most of its time.

###### Try catch and alert mechanisms implemented in your project , for what purpose do you use them?
- In our project, try-catch blocks are used to gracefully handle data ingestion errors, such as malformed records or connectivity issues, ensuring the pipeline continues processing valid data while logging or alerting on failures.
- Alert mechanisms (like email, Slack, or monitoring tools) notify the team of job failures, data quality issues, or resource bottlenecks, enabling quick response and minimizing downtime.

Different file types
| File Type   | Schema Evolution | Compression Support | Read Speed     | Write Speed    | ACID Support | Partitioning | Iceberg Integration |
|-------------|------------------|--------------------|----------------|----------------|--------------|--------------|---------------------|
| CSV         | No               | No (external only) | Slow           | Fast           | No           | Manual       | No                  |
| JSON        | No               | No (external only) | Slow           | Fast           | No           | Manual       | No                  |
| Parquet     | Limited (add cols)| Yes                | Fast           | Fast           | No           | Yes          | Yes                 |
| ORC         | Limited (add cols)| Yes                | Fast           | Fast           | No           | Yes          | Yes                 |
| Delta Lake  | Yes              | Yes                | Fast           | Fast           | Yes          | Yes          | Yes                 |
| Iceberg     | Yes (full)       | Yes                | Fast           | Fast           | Yes          | Yes          | Native              |


##### Delta table/Deltalake concepts

- Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.
- Supports schema enforcement and evolution, allowing safe changes to table structure over time.
- Enables time travel, letting users query previous versions of data using version numbers or timestamps.
- Provides scalable metadata handling, making it efficient for large tables with many partitions.
- Supports upserts, deletes, and merges using the `MERGE INTO` command for complex ETL operations.

#### Why does Driver go OOM (Out Of Memory), solutions and preventions

- **Causes:**
    - Collecting large datasets to the driver using actions like `.collect()`, `.toPandas()`, or `.take()` on big DataFrames/RDDs.
    - Accumulating large broadcast variables or driver-side data structures.
    - Excessive logging or storing large objects in driver memory.
    - High concurrency or too many tasks scheduled at once.

- **Solutions & Preventions:**
    - Avoid collecting large datasets to the driver; use distributed operations and aggregate results before collecting.
    - Increase driver memory via `--driver-memory` or `spark.driver.memory` configuration.
    - Use `persist()` or `cache()` judiciously to avoid unnecessary memory usage.
    - Limit the size of broadcast variables and driver-side data structures.
    - Optimize Spark code to minimize driver-side data accumulation.
    - Monitor driver memory usage and tune Spark configurations accordingly.
    - Use logging levels appropriately to avoid excessive log data in memory.

#### Why does Executor go OOM (Out Of Memory), solutions and preventions

- **Causes:**
    - Processing partitions that are too large, causing a single executor to exceed its memory limits.
    - Inefficient joins (e.g., shuffles, skewed data, or large broadcast joins).
    - Caching/persisting large DataFrames/RDDs without enough memory.
    - Memory leaks or holding references to large objects in user code.
    - Too many concurrent tasks per executor, leading to memory contention.
    - Uncompressed or poorly compressed data formats increasing memory usage.

- **Solutions & Preventions:**
    - Repartition data to ensure partitions are of manageable size.
    - Use `persist()`/`cache()` judiciously and unpersist data when no longer needed.
    - Tune executor memory (`spark.executor.memory`) and core settings for workload.
    - Avoid wide transformations on skewed data; use salting or custom partitioning to balance load.
    - Prefer built-in Spark functions over UDFs to reduce memory overhead.
    - Monitor executor memory usage and adjust Spark configurations as needed.
    - Use efficient data formats (like Parquet/ORC) and enable compression.
    - Limit the number of concurrent tasks per executor (`spark.executor.cores`).
    - Regularly review and optimize Spark code to avoid unnecessary memory retention.

#### Catalyst Optimizer and SQL Optimization Plans

- The Catalyst Optimizer is Spark SQL’s query optimization framework, responsible for transforming SQL queries into efficient execution plans.
- It uses a tree-based architecture to represent and optimize queries through a series of rule-based and cost-based transformations.

**Different Plans in Catalyst Optimizer:**

| Plan Type                | Description                                                      |
|--------------------------|------------------------------------------------------------------|
| Unresolved Logical Plan  | Parsed query, unresolved references                              |
| Analyzed Logical Plan    | References resolved, validated against schema/catalog            |
| Optimized Logical Plan   | Logical optimizations applied                                    |
| Physical Plan(s)         | Concrete execution strategies generated                          |
| Executed Plan            | Final plan with RDD operations, ready for execution              |

- Catalyst enables extensibility for custom rules, supports advanced optimizations, and is key to Spark SQL’s performance.

##### Spark Jobs,stages,tasks and how it is decided how many jobs/stages/tasks will be created of a spark application?

- **Jobs:** A Spark job is triggered by an action (like `collect()`, `save()`, `count()`) on a DataFrame/RDD. Each action creates a new job.
- **Stages:** Each job is divided into stages based on shuffle boundaries. A stage is a set of tasks that can be executed in parallel, separated by wide transformations (like `groupBy`, `join`).
- **Tasks:** A stage is split into tasks, one per data partition. Each task processes a single partition of data.

**How are the numbers decided?**
- The number of **jobs** equals the number of actions invoked.
- The number of **stages** is determined by the number of shuffle boundaries in the job’s DAG. Each wide transformation introduces a new stage.
- The number of **tasks** in each stage equals the number of partitions in the input RDD/DataFrame for that stage. Partitioning can be controlled via `repartition()`, `coalesce()`, or data source partitioning.

**Summary:**  
- Actions → Jobs  
- Shuffle boundaries → Stages  
- Partitions → Tasks

##### Spark shuffling (sort-merge/hash) 
- **Shuffling** is the process of redistributing data across partitions, typically required by wide transformations like `groupBy`, `join`, or `distinct`. It is an expensive operation involving disk and network I/O.

- **Sort-Merge Shuffle:** Used for operations like `sortBy` or `sort-merge join`. Data is sorted within each partition and then merged. It is efficient for large datasets but can be slower due to sorting overhead.

- **Hash Shuffle:** Used for hash-based operations like `groupByKey` or `reduceByKey`. Data is assigned to partitions based on a hash function. It is generally faster for smaller datasets but can cause memory pressure and data skew if keys are unevenly distributed.

- **Optimizations:**
    - Use `reduceByKey` instead of `groupByKey` to minimize data shuffled.
    - Tune `spark.sql.shuffle.partitions` to control the number of shuffle partitions.
    - Prefer broadcast joins for small tables to avoid shuffling large datasets.
    - Monitor shuffle read/write metrics in Spark UI to identify bottlenecks.

##### Data skewness - how you will handle it? 
- **Salting:** Add a random prefix or suffix to skewed keys to distribute data more evenly across partitions, then remove the salt after aggregation.
- **Custom Partitioning:** Implement a custom partitioner to control how data is distributed, ensuring skewed keys are spread out.
- **Broadcast Join:** Use broadcast joins when one side of the join is small to avoid shuffling large, skewed datasets.
- **Skew Join Hints:** Use Spark SQL’s `skew` join hints to handle skewed joins automatically.
- **Splitting Skewed Keys:** Process skewed keys separately from the rest of the data, then union the results.
- **Increase Shuffle Partitions:** Increase the number of shuffle partitions to reduce the amount of data per partition.
- **Filter Outliers:** If possible, filter or pre-aggregate highly skewed keys before the main computation.

Spark UI - uses, how it helps in debugging

DAG,lazy evaluation,narrow/wide transformations


SQL: 
user_id,website,time
user1,Site1,10:00
user1,Site2,10:17
user1,Site3,10:35
user1,Site4,11:00
user2,Site1,11:15
user2,Site2,11:32
user2,Site3,12:05
user2,Site4,12:20
user3,Site1,12:30
user3,Site2,13:05
user3,Site3,13:15
user3,Site4,13:37
  
Find the time for each User,for each site they have spend on? (considering 1st site default to 0 for each user)
Example - User1 at site2 , spent 10:00-10:17 = 17mins,
                 user1 at site3, spent 10:35-10:17 = 18mins and so on.....

In [10]:

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("Manna").getOrCreate()
columns = ["user_id","website","time"]
data = [
    ("user1", "Site1", "10:00"),
    ("user1", "Site2", "10:17"),
    ("user1", "Site3", "10:35"),
    ("user1", "Site4", "11:00"),
    ("user2", "Site1", "11:15"),
    ("user2", "Site2", "11:32"),
    ("user2", "Site3", "12:05"),
    ("user2", "Site4", "12:20"),
    ("user3", "Site1", "12:30"),
    ("user3", "Site2", "13:05"),
    ("user3", "Site3", "13:15"),
    ("user3", "Site4", "13:37"),
]
spark.sparkContext.setJobDescription("Loading data from CSV file")
df = spark.createDataFrame(data, schema=columns)
df.createOrReplaceTempView("users")
spark.sparkContext.setJobDescription("Applying Transformation")
res=spark.sql("""
    select *,
        coalesce(cast(to_timestamp(time,'HH:mm')-to_timestamp(lag(time) over(partition by user_id order by time),'HH:mm') as int )/60,0) as duration_in_mins
        from users
""")
spark.sparkContext.setJobDescription("Showing Data")
res.show()

+-------+-------+-----+----------------+
|user_id|website| time|duration_in_mins|
+-------+-------+-----+----------------+
|  user1|  Site1|10:00|             0.0|
|  user1|  Site2|10:17|            17.0|
|  user1|  Site3|10:35|            18.0|
|  user1|  Site4|11:00|            25.0|
|  user2|  Site1|11:15|             0.0|
|  user2|  Site2|11:32|            17.0|
|  user2|  Site3|12:05|            33.0|
|  user2|  Site4|12:20|            15.0|
|  user3|  Site1|12:30|             0.0|
|  user3|  Site2|13:05|            35.0|
|  user3|  Site3|13:15|            10.0|
|  user3|  Site4|13:37|            22.0|
+-------+-------+-----+----------------+



You have a csv file, and there are some **malformed records** in it...instead of stopping and failing the ingestion, you need to implement a try-catch block to handle the malformed records, so correct records will be processed further and malformed will be filtered out and stored in error path.

To handle malformed CSV records in Spark, use the `mode` and `badRecordsPath` options in `spark.read.csv`. Example:
- Use `mode("PERMISSIVE")` (default) to set malformed fields to `null`.
- Use `mode("DROPMALFORMED")` to drop malformed rows.
- Use `mode("FAILFAST")` to fail on malformed records.
- To capture bad records, use `option("badRecordsPath", "/path/to/error_dir")`.
Example code:
```python
df = spark.read.option("header", True) \
    .option("mode", "PERMISSIVE") \
    .option("badRecordsPath", "/path/to/error_dir") \
    .csv("/path/to/input.csv")
```
This will process valid records and store malformed ones in the specified error path for further inspection.

####  Decorators in python
- flask for check session or role
- for logging

You can define a simple logging decorator and use it to log function calls:

```python
import functools

def log_function_call(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Calling function: {func.__name__}")
        result = func(*args, **kwargs)
        print(f"Function {func.__name__} finished")
        return result
    return wrapper

@log_function_call
def process_data(df):
    return df.count()

# Usage
process_data(df)
```

This will print log messages before and after the `process_data` function is called.

#### Dataset vs Dataframe

- **Dataset**: Strongly-typed, distributed collection of data. Available in Scala and Java (not in Python). Supports compile-time type safety and functional transformations.
- **DataFrame**: Distributed collection of data organized into named columns (like a table). Available in Python, Scala, Java, and R. Offers high-level APIs, optimized by Catalyst, and is untyped in Python (no compile-time type safety).
- In PySpark, you work with DataFrames (no Dataset API). DataFrames are the primary abstraction for structured data processing in Python.



22. Write pyspark code : 
A) Input:  1|aaa|111|2|bbb|222|3|ccc|333
output: 
1 aaa 111
2 bbb 222
3 ccc 333

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
spark = SparkSession.builder.master("local[*]").appName("Manna").getOrCreate()
input_text = "1|aaa|111|2|bbb|222|3|ccc|333"
df = spark.createDataFrame([(input_text,)], ["value"])
df2 = df\
        .withColumn("splitted_val", F.explode(F.split("value", "\|")))\
        .withColumn("rnm", F.row_number().over(Window.orderBy("value")))\
        .withColumn("l1", F.lead("splitted_val").over(Window.orderBy("rnm")))\
        .withColumn("l2", F.lead("splitted_val", 2).over(Window.orderBy("rnm")))\
        .filter((F.col("rnm") - 1) % 3 == 0)
df2.show()

25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+------------+---+---+---+
|               value|splitted_val|rnm| l1| l2|
+--------------------+------------+---+---+---+
|1|aaa|111|2|bbb|2...|           1|  1|aaa|111|
|1|aaa|111|2|bbb|2...|           2|  4|bbb|222|
|1|aaa|111|2|bbb|2...|           3|  7|ccc|333|
+--------------------+------------+---+---+---+



25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/08/05 12:50:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.



B)  Input:
|Product|Amount|Country|
+-------+------+-------+
| Banana|  1000|    USA|
|Carrots|  1500|    USA|
|  Beans|  1600|   USA|
| Orange|  2000|    USA|
| Orange|  2000|    USA|
| Banana|   400|  China|
|Carrots|  1200|  China|
|  Beans|  1500| China|
| Orange|  4000|  China|
| Banana|  2000| Canada|
|Carrots|  2000| Canada|
|  Beans|  2000| Mexico|
+-------+-----+-------+
Output
+-------+------+-----+------+----+
|Product|Canada|China|Mexico| USA|
+-------+------+-----+------+----+
| Orange|  null| 4000|  null|4000|
|  Beans|  null| 1500| 2000|1600|
| Banana|  2000|  400| null|1000|
|Carrots|  2000| 1200|  null|1500| 

```
 select product,
        sum( case when(country = 'Canada') then amount else null end ) as Canada,
        sum( case when(country = 'China') then amount else null end ) as China,
        sum( case when(country = 'Mexico') then amount else null end ) as Mexico,
        sum( case when(country = 'USA') then amount else null end ) as USA
        from products
        group by product
```