# 01. How do you handle duplicate rows in a PySpark DataFrame?

In [9]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
# Sample Data
data = [
    (1, "Alice", 25),
    (2, "Alice", 30),
    (1, "Alice", 25),  # Duplicate row
    (3, "Charlie", 35)
]

columns = ["ID", "Name", "Age"]
df = spark.createDataFrame(data=data,schema=columns)
display(df)

StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c1759277-85aa-4c03-9c99-6206e771eca9)

In [10]:
df_no_duplicates = df.dropDuplicates()
display(df_no_duplicates)

StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 768aa14f-e6f6-477b-bbce-1045350bb0eb)

In [11]:
# Remove Duplicates Based on "Name" Column
df_no_duplicates = df.dropDuplicates(["Name"])

df_no_duplicates.show()

StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 13, Finished, Available, Finished)

+---+-------+---+
| ID|   Name|Age|
+---+-------+---+
|  1|  Alice| 25|
|  3|Charlie| 35|
+---+-------+---+



In [18]:
# Identify & Handle Duplicates Using Window Functions
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy("ID", "Name", "Age").orderBy("ID")

df_with_rownum = df.withColumn("row_number",row_number().over(window_spec))
df_with_rownum.show()

StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 20, Finished, Available, Finished)

+---+-------+---+----------+
| ID|   Name|Age|row_number|
+---+-------+---+----------+
|  1|  Alice| 25|         1|
|  1|  Alice| 25|         2|
|  2|  Alice| 30|         1|
|  3|Charlie| 35|         1|
+---+-------+---+----------+



In [19]:
df_deduplicated = df_with_rownum.filter("row_number = 1").drop("row_number")

df_deduplicated.show()


StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 21, Finished, Available, Finished)

+---+-------+---+
| ID|   Name|Age|
+---+-------+---+
|  1|  Alice| 25|
|  2|  Alice| 30|
|  3|Charlie| 35|
+---+-------+---+



In [20]:
from pyspark.sql.functions import count

df_counts = df.groupBy("ID", "Name", "Age").count().orderBy("count", ascending=False)

df_counts.show()


StatementMeta(, a8a75a02-ed97-41cd-9029-ccb4afdbfd67, 22, Finished, Available, Finished)

+---+-------+---+-----+
| ID|   Name|Age|count|
+---+-------+---+-----+
|  1|  Alice| 25|    2|
|  2|  Alice| 30|    1|
|  3|Charlie| 35|    1|
+---+-------+---+-----+



- ✅ Remove all duplicates? → dropDuplicates()
- ✅ Remove based on specific columns? → dropDuplicates(["col1", "col2"])
- ✅ Identify and analyze duplicates? → row_number() with Window
- ✅ Count duplicates? → groupBy().count()

# 02. What makes Apache Spark unique compared to other big data frameworks?

- Apache Spark stands out among big data frameworks due to its speed, ease of use, and versatility.
- Here’s what makes Spark unique compared to other frameworks like Hadoop MapReduce, Flink, and Dask:



**01. Speed – In-Memory Processing**

🔥 Why it’s unique?

- Unlike Hadoop MapReduce, which writes intermediate data to disk, Spark processes data in memory (RAM), making it 100x faster for some workloads.
- Uses Resilient Distributed Datasets (RDDs) to minimize disk I/O.

**02. Unified Data Processing Engine**

🔥 Why it’s unique?

- Spark is a multi-purpose engine supporting batch, streaming, machine learning, and SQL-based analytics in a single framework.
- Other frameworks like Hadoop focus mainly on batch processing, and Flink specializes in real-time streaming.

**✅ Components of Spark:**

**Component	Purpose**
- Spark Core	> Handles distributed computing with RDDs
- Spark SQL	> Query data using SQL & DataFrames
- Spark Streaming	> Real-time data processing (Kafka, Flume)
- MLlib	> Machine learning algorithms
- GraphX	> Graph analytics (social networks, recommendation systems)


**03. Supports Multiple Languages**

🔥 Why it’s unique?

- Spark supports **Python (PySpark), Scala, Java, and R** for big data development.
- Hadoop MapReduce mainly uses Java, which can be complex for data analysts and scientists.

**04. Built-In Optimizations with Catalyst & Tungsten**

🔥 Why it’s unique?

- Spark uses the Catalyst Optimizer (for query optimization) and Tungsten Engine (for better memory & CPU management).
- Other frameworks don’t have such advanced query optimizations.

**✅ Example:**
- When you run **df.groupBy("column").count()**, Spark automatically optimizes execution plans to minimize computation.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CatalystExample").getOrCreate()

# Creating DataFrame
data = [(1, "Alice", 30), (2, "Bob", 25), (3, "Charlie", 35)]
df = spark.createDataFrame(data, ["ID", "Name", "Age"])

# Running an optimized query
df_filtered = df.filter("Age > 30").select("Name")

df_filtered.explain(True)  # Show Catalyst Execution Plan


StatementMeta(, a834627b-8505-4665-aecf-c03ee4e1417b, 4, Finished, Available, Finished)

== Parsed Logical Plan ==
'Project ['Name]
+- Filter (Age#696L > cast(30 as bigint))
   +- LogicalRDD [ID#694L, Name#695, Age#696L], false

== Analyzed Logical Plan ==
Name: string
Project [Name#695]
+- Filter (Age#696L > cast(30 as bigint))
   +- LogicalRDD [ID#694L, Name#695, Age#696L], false

== Optimized Logical Plan ==
Project [Name#695]
+- Filter (isnotnull(Age#696L) AND (Age#696L > 30))
   +- LogicalRDD [ID#694L, Name#695, Age#696L], false

== Physical Plan ==
*(1) Project [Name#695]
+- *(1) Filter (isnotnull(Age#696L) AND (Age#696L > 30))
   +- *(1) Scan ExistingRDD[ID#694L,Name#695,Age#696L]



In [3]:
df_filtered.explain("cost")  # Shows Tungsten-optimized execution plan

StatementMeta(, a834627b-8505-4665-aecf-c03ee4e1417b, 5, Finished, Available, Finished)

== Optimized Logical Plan ==
Project [Name#695], Statistics(sizeInBytes=5.1 EiB)
+- Filter (isnotnull(Age#696L) AND (Age#696L > 30)), Statistics(sizeInBytes=8.0 EiB)
   +- LogicalRDD [ID#694L, Name#695, Age#696L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
*(1) Project [Name#695]
+- *(1) Filter (isnotnull(Age#696L) AND (Age#696L > 30))
   +- *(1) Scan ExistingRDD[ID#694L,Name#695,Age#696L]




**05.Real-Time & Batch Processing Together**

🔥 Why it’s unique?

- Supports both batch processing (like Hadoop) and real-time streaming (like Flink).
- Spark Structured Streaming allows processing continuous data streams with micro-batch techniques.


In [2]:
#df = spark.readStream.format("kafka").option("subscribe", "topic1").load()
#df.writeStream.outputMode("append").format("console").start()


StatementMeta(, 7f872deb-f7be-464e-9563-ad2b76f1c4ca, 4, Finished, Available, Finished)

**6. Works with Any Storage System**

🔥 Why it’s unique?

- Spark can read/write data from HDFS, Amazon S3, Azure Blob, Google Cloud Storage, and even databases.
- Hadoop primarily relies on HDFS.

In [3]:
#df = spark.read.csv("wasbs://container@account.blob.core.windows.net/data.csv")
#df.show()


StatementMeta(, 7f872deb-f7be-464e-9563-ad2b76f1c4ca, 5, Finished, Available, Finished)

**7. Scalability & Fault Tolerance**

🔥 Why it’s unique?

- Spark runs on-premises or in the cloud (AWS EMR, Azure Databricks, GCP DataProc).
- Fault tolerance is built-in using RDD lineage (automatically recomputes lost data).
- ✅ Example: If a node in the cluster fails, Spark recomputes lost partitions without restarting the job.

**Summary: Why Choose Spark?**

| Feature |	Apache Spark |	Hadoop MapReduce |	Flink |
| --- | --- | --- | --- |
| Processing Type	| Batch + Streaming	| Batch Only	| Streaming First
| Speed	| Fast (In-memory)	| Slow (Disk-based)	| Real-time| 
| Ease of Use	| Python, Scala, SQL	| Java-heavy	| Java, Scala|
| Optimizations	| Catalyst, Tungsten| 	No major optimizer	| Stream optimizations|
| Machine Learning	| Yes (MLlib)	|No	|No|
| Fault Tolerance	| Yes (RDD lineage)	| Yes (HDFS)	|Yes|
| Best For	| General Big Data	| Large Batch Jobs	| Real-time Analytics

**Conclusion:**
- If you need fast, multi-purpose data processing, choose Spark.
- If you have huge batch jobs and don’t need real-time, Hadoop is fine.
- If you want ultra-low latency streaming, Apache Flink may be better.

# 03. Catalyst Optimizer & Tungsten Engine in Apache Spark



- Catalyst is Spark’s cost-based optimizer used in Spark SQL and DataFrames. It automatically optimizes query execution plans.
- Catalyst follows four phases to optimize queries:

1. Analysis Phase

- Checks syntax and resolves column names.
- Uses metadata from Hive Metastore, JDBC, or Data Catalog.

2. Logical Optimization Phase

- Rewrites inefficient queries (e.g., predicate pushdown, constant folding).
- Converts filters before joins for better performance.

3. Physical Planning Phase

- Generates multiple execution plans and selects the best one based on cost estimation.

4. Code Generation Phase (Tungsten Integration)

- Uses whole-stage code generation (WSCG) to convert Spark queries into efficient bytecode.

**Tungsten**

- Tungsten improves Spark’s performance by optimizing memory usage, CPU efficiency, and serialization.
- Key Features of Tungsten

**01. Binary Processing**

- Uses off-heap memory to reduce Java garbage collection (GC) overhead.
- Stores data in binary format instead of JVM objects.

**02. Whole-Stage Code Generation (WSCG)**

- Converts high-level Spark operations into efficient Java bytecode.
- Removes JVM interpretation overhead, making it 10x faster than standard execution.

**03. Cache-Aware Computation**

- Uses CPU registers and vectorized processing for faster computation.
- Efficient CPU cache usage, reducing memory access time.

**04. Efficient Memory Management**

- Uses row-based & columnar storage for optimized execution.
- Avoids unnecessary object creation, reducing GC pressure.


# 04. Explain different profilers in PySpark and their use cases.

| Profiler Type	| Performance Overhead	| Use Case |
| --- | --- | --- |
| BasicProfiler |	Low	| Quick profiling for small jobs |
| Custom Profiler	| Medium	| Custom profiling logic|
| CProfileProfiler |	High	|Detailed function call profiling |

**Key Takeaways**
- Use BasicProfiler for simple debugging.
- Use CProfileProfiler for detailed function-level performance analysis.
- Implement a Custom Profiler when you need specific profiling metrics beyond the built-in ones.

# 05. Write a PySpark program to check if a keyword exists in a large text file.

In [3]:
# download file and upload into fabric
# https://raw.githubusercontent.com/rritec/Azure-Cloud-Data-Engineering/refs/heads/main/Lab%20Data/sample.txt

StatementMeta(, 8ffb8c26-95b3-42cb-b9db-d3d9d1de0846, 5, Finished, Available, Finished)

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark Session (Fabric manages it automatically)
#spark = SparkSession.builder.appName("KeywordSearchFabric").getOrCreate()

# Define file path in OneLake or ADLS
file_path = "abfss://rrws1@onelake.dfs.fabric.microsoft.com/rrLakeHouse.Lakehouse/Files/data/sample.txt"  # Update path

# Define the keyword to search
keyword = "hello"

# Load the text file as a DataFrame
df = spark.read.text(file_path)

# Perform case-insensitive keyword search using regex (rlike)
matching_rows = df.filter(col("value").rlike(f"(?i){keyword}"))

# Stop execution early if at least one match is found
if matching_rows.limit(1).count() > 0:
    print(f"✅ Keyword '{keyword}' found in the file.")
else:
    print(f"❌ Keyword '{keyword}' NOT found in the file.")

# Stop Spark Session (Fabric handles cleanup automatically)
#spark.stop()


StatementMeta(, 8ffb8c26-95b3-42cb-b9db-d3d9d1de0846, 8, Finished, Available, Finished)

✅ Keyword 'hello' found in the file.


In [12]:
# For case-insensitive whole-word match:
# df.filter(col("value").rlike(f"(?i)\\b{keyword}\\b"))

# For multiple keyword search:
# df.filter(col("value").rlike(f"(?i)\\b(spark|big data)\\b"))



StatementMeta(, 8ffb8c26-95b3-42cb-b9db-d3d9d1de0846, 14, Finished, Available, Finished)

# 06. What is Executor Memory in PySpark, and how is it allocated?

In [13]:
# get all conf parameters
for key, value in spark.sparkContext.getConf().getAll():
    if key == "spark.executor.memory":
        print(f"{key} = {value}")


StatementMeta(, 8ffb8c26-95b3-42cb-b9db-d3d9d1de0846, 15, Finished, Available, Finished)

spark.executor.memory = 56g


In [14]:
# Define the keys we want to check
keys_to_check = {"spark.executor.memory", "spark.executor.memoryOverhead"}

# Get all Spark configurations and filter the required keys
config_values = {key: value for key, value in spark.sparkContext.getConf().getAll() if key in keys_to_check}

# Print the values
for key, value in config_values.items():
    print(f"{key} = {value}")


StatementMeta(, 8ffb8c26-95b3-42cb-b9db-d3d9d1de0846, 16, Finished, Available, Finished)

spark.executor.memory = 56g
spark.executor.memoryOverhead = 384


- Executor Memory in PySpark refers to the amount of memory allocated to each Spark executor in a cluster. Executors are responsible for executing tasks and storing data in memory.

**How Executor Memory is Allocated**

- Executor memory is divided into three main categories:

**1. Execution Memory**

- Used for runtime computations, such as:
1. Shuffling
2. Joins
3. Sorting
4. Aggregation

- If memory is not used for execution, it can be dynamically borrowed by storage.

**2. Storage Memory**

- Used for caching datasets (RDD cache, DataFrame cache).
- If the cached data is not used, it can be borrowed by execution memory.

**3. Overhead Memory**

- Memory reserved for Spark's internal processes, including:

1. Broadcast variables
2. Task metadata
3. Garbage collection (GC)




# 07. How do you minimize data transfers in PySpark applications?

| Technique	| Why It Works? |
| --- | --- |
| Broadcast small datasets |	Avoids shuffling in joins.|
| Reduce shuffle operations |	Prevents unnecessary data movement.|
| Cache/Persist reused data	| Avoids recomputation and reloading.|
| Use columnar formats (Parquet)	| Reduces data transfer and I/O costs.
Push filters early (Predicate Pushdown)	| Reduces data read before processing.
Use mapPartitions() instead of map()	| Processes larger chunks, reducing network traffic.
Optimize joins	| Align partitions and broadcast small datasets.
Avoid .collect() and .toPandas()	| Prevents memory overflow at the driver.

# 08. What is Piping in PySpark, and how does it work?

In [2]:
rdd = spark.sparkContext.parallelize(["Hello World", "PySpark is awesome", "Piping in Spark"])
rdd.collect()

StatementMeta(, 14995f92-c7e5-424b-b874-0ae4d6ab2197, 4, Finished, Available, Finished)

['Hello World', 'PySpark is awesome', 'Piping in Spark']

In [5]:
# Convert text to lowercase using the `tr` shell command
processed_rdd = rdd.pipe("tr 'A-Z' 'a-z'")

print(processed_rdd.collect())

StatementMeta(, 14995f92-c7e5-424b-b874-0ae4d6ab2197, 7, Finished, Available, Finished)

['hello world', 'pyspark is awesome', 'piping in spark']


**Key Considerations When Using pipe()**

1. Each partition runs separately:

- Spark pipes data per partition, meaning the external process receives only part of the dataset at a time.

2. Processes run in parallel:

- If Spark has multiple partitions, multiple instances of the external script will run in parallel.

3. Performance Overhead:

- Using pipe() adds overhead because data needs to be serialized and transferred to external processes.
4. Data Format Compatibility:

- Ensure your external script reads stdin and writes stdout correctly to avoid errors.

# 09. Does Apache Spark support checkpointing? How and why is it used?

- Yes, Apache Spark supports checkpointing, which is a mechanism to persist RDDs or streaming state to a reliable storage location (HDFS, Azure Data Lake, or Microsoft Fabric Lakehouse).

- Checkpointing is primarily used for fault tolerance and long-running applications, ensuring that Spark does not recompute lost data from scratch.


In [1]:
from pyspark.sql import SparkSession

# Initialize Spark Session
#spark = SparkSession.builder.appName("CheckpointExample").getOrCreate()

# Set checkpoint directory (should be HDFS, ADLS, or OneLake in Fabric)
spark.sparkContext.setCheckpointDir("abfss://rrws1@onelake.dfs.fabric.microsoft.com/rrLakeHouse.Lakehouse/Files")

# Create an RDD
rdd = spark.sparkContext.parallelize(range(1, 1000000))

# Apply transformations
rdd_transformed = rdd.map(lambda x: x * 2)

# Apply checkpoint
rdd_transformed.checkpoint()

# Trigger computation
print(rdd_transformed.count())  # Forces checkpointing


StatementMeta(, 784addaa-d8bc-47cf-9497-ddaeb564042f, 3, Finished, Available, Finished)

999999


# 10. What is the Spark Driver, and what role does it play in a PySpark job?

- The Spark Driver is the "brain" of a Spark job, managing execution and scheduling tasks.
- It runs in different locations based on the deployment mode (local, client, cluster).
- In PySpark, all transformations and actions are initiated by the driver.
- Too many operations on the driver (like collect()) can cause memory overload.

In [1]:
# View current Spark configurations (including driver settings)
for key, value in spark.sparkContext.getConf().getAll():
    if "driver" in key:
        print(f"{key}: {value}")


StatementMeta(, a99c59a5-4862-4dae-b4a8-67d84a7c367f, 3, Finished, Available, Finished)

spark.driver.cores: 8
spark.tracking.driverLogUrl: https://sparkui.fabric.microsoft.com/sparkui/ce42e2da-274a-411e-b5cc-3ee363a1f9f5/api/v1/wt/datacloud/workspaces/98012181-c620-4a02-9dbc-41f7bae43668/activities/a99c59a5-4862-4dae-b4a8-67d84a7c367f/applications/application_1738960707004_0001/driverlog/stderr?capacityId=1db03493-a167-438f-826c-3d5e684c58e2&pbiApi=api.fabric.microsoft.com&artifactId=44e61aca-83ff-4b17-a6a7-1f70a4f6556e
spark.driver.extraClassPath: /usr/lib/library-manager/bin/libraries/scala/*:/usr/lib/dw-connector/fabric/*
spark.driver.host: vm-60b73904
spark.driver.memory: 56g
spark.driver.extraLibraryPath: /usr/hdp/current/hadoop-client/lib/native:/opt/gluten/dep
spark.driver.maxResultSize: 4096m
spark.driver.memoryOverhead: 384
spark.driver.port: 36001
spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/

# 11. What is SparkSession, and why is it essential?

- SparkSession is the entry point for working with Apache Spark in PySpark. It provides a unified API for working with DataFrames, Datasets, SQL, and RDDs.

- In Spark 2.0+, SparkSession replaces SparkContext, SQLContext, and HiveContext, combining them into a single object.
- SparkSession is the main entry point for using Spark
- It manages data processing, SQL queries, and Spark jobs
- Microsoft Fabric provides a default SparkSession (spark)
- Use .config() to optimize performance (e.g., spark.sql.shuffle.partitions)

In [2]:
type(spark)

StatementMeta(, a99c59a5-4862-4dae-b4a8-67d84a7c367f, 4, Finished, Available, Finished)

pyspark.sql.session.SparkSession

In [3]:
print(spark)

StatementMeta(, a99c59a5-4862-4dae-b4a8-67d84a7c367f, 5, Finished, Available, Finished)

<pyspark.sql.session.SparkSession object at 0x7741904e2bd0>


In [4]:
print(spark.conf.get("spark.app.name"))


StatementMeta(, a99c59a5-4862-4dae-b4a8-67d84a7c367f, 6, Finished, Available, Finished)

SynapseEnvPy


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyCustomFabricJob") \
    .config("spark.sql.shuffle.partitions", "50") \
    .getOrCreate()

print(spark.conf.get("spark.app.name"))  # Output: MyCustomFabricJob


StatementMeta(, a99c59a5-4862-4dae-b4a8-67d84a7c367f, 7, Finished, Available, Finished)

MyCustomFabricJob


# 12. What are Broadcast Variables, and how do they optimize PySpark jobs?

✔ Broadcast variables enable efficient sharing of small, read-only data across all worker nodes.

✔ They reduce data transfer overhead and optimize performance by minimizing network communication.

✔ Best for joining small reference tables with large datasets or using small datasets across multiple operations.

✔ Use sparingly, as broadcasting large datasets can lead to memory overhead.

# 13. How do you handle data skewness in PySpark?

|Technique	|Use Case	|Benefit |
| --- | ---|  --- |
Salting Keys |	Skewed joins with large key imbalance	|Distributes skewed data more evenly
Repartitioning	|Before a skewed join or aggregation	|Reduces skewed partitions before shuffle
Broadcasting	|Small lookup table in joins	|Avoids shuffling large table, reduces data transfer
skewedJoin Optimization	| Join with skewed keys	| Optimizes skewed joins, handles large partitions separately
Coalesce Partitions	| After large shuffles	| Reduces task overhead by reducing partition count
mapPartitions()	| Custom transformations	|Allows fine-grained control over partition processing

# 14 . What’s the difference between coalesce() and repartition()?

| Feature |	repartition() |	coalesce() |
| --- | --- | --- |
Purpose	| Increase or decrease partitions	| Decrease partitions
Shuffle	| Full shuffle across partitions	| No full shuffle, just merging
When to Use	| When increasing partitions or need random distribution	| When reducing partitions (after filtering or transformation)
Performance	| Slower (because of full shuffle)	| Faster (no full shuffle)
Use Case Example	| Before large joins, aggregations	| Before writing data to disk (to reduce number of output files)

# 15. Explain Lazy Evaluation in PySpark with an example.

- Transformations: Operations like filter(), map(), select(), groupBy(), join() are transformations in Spark. These transformations are not executed immediately; instead, they are recorded as a logical plan.

- Actions: Operations like collect(), count(), show(), first(), save() are actions. These trigger the actual execution of the transformations and the computation.

In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
# spark = SparkSession.builder.master("local").appName("Lazy Evaluation Example").getOrCreate()

# Create a sample DataFrame
data = [
    ("Alice", 34),
    ("Bob", 45),
    ("Charlie", 23),
    ("David", 40),
    ("Eve", 29)
]

columns = ["name", "age"]

df = spark.createDataFrame(data, columns)

# Perform a series of transformations
df_filtered = df.filter(df.age > 30)  # Filter out rows where age <= 30
df_filtered = df_filtered.withColumn("age2",df_filtered.age+2)  # add one column
df_selected = df_filtered.select("name","age2")  # Select only the 'name' column


# At this point, Spark hasn't executed the operations yet.

# Perform an action to trigger the computation
df_selected.show()  # This will trigger the execution of all previous transformations


StatementMeta(, 01d67bce-b104-47cd-b1e2-f670eefc0117, 3, Finished, Available, Finished)

+-----+----+
| name|age2|
+-----+----+
|Alice|  36|
|  Bob|  47|
|David|  42|
+-----+----+



In [2]:
df_selected.explain()


StatementMeta(, 01d67bce-b104-47cd-b1e2-f670eefc0117, 4, Finished, Available, Finished)

== Physical Plan ==
*(1) Project [name#694, (age#695L + 2) AS age2#698L]
+- *(1) Filter (isnotnull(age#695L) AND (age#695L > 30))
   +- *(1) Scan ExistingRDD[name#694,age#695L]




In [3]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

# Lazy evaluation example
result = rdd.filter(lambda x: x % 2 == 0).map(lambda x: x * 2)  # Filter even numbers and then double them

# No computation yet, since this is lazy evaluation

# Trigger the computation
result.collect()  # Now Spark will apply both the filter and map operations in a single step


StatementMeta(, 01d67bce-b104-47cd-b1e2-f670eefc0117, 5, Finished, Available, Finished)

[4, 8, 12, 16]

# 16. Key Differences Between Narrow and Wide Transformations


| Aspect	| Narrow Transformations	| Wide Transformations
| --- | --- | --- |
Shuffling	| No shuffling of data. Data stays within the same partition.	| Data is shuffled across partitions or nodes.
Performance	| Faster, as there is no need for data movement.	| Slower, due to the cost of shuffling and network I/O.
Examples	| map(), filter(), select(), flatMap(), sample()	| groupByKey(), reduceByKey(), join(), distinct(), repartition()
Data Movement	| Data is processed locally on each partition.	| Data is moved between partitions or nodes in the cluster.
Use Case	| When operations can be performed independently on each partition.	| When operations require combining data from multiple partitions.

**Narrow Transformation**

In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
#spark = SparkSession.builder.master("local").appName("Narrow Transformation").getOrCreate()

# Create a sample DataFrame
data = [("Alice", 34), ("Bob", 45), ("Charlie", 23)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Apply a narrow transformation - 'map' on DataFrame (using 'rdd')
rdd = df.rdd.map(lambda x: (x[0], x[1] * 2))  # Multiply age by 2

# Show the result
print(rdd.collect())


StatementMeta(, 3eaaeb69-cd44-443c-8832-26ddd67dd8ae, 3, Finished, Available, Finished)

[('Alice', 68), ('Bob', 90), ('Charlie', 46)]


**Wide Transformation Example (groupBy):**

In [1]:
# Create a sample RDD with key-value pairs
rdd = spark.sparkContext.parallelize([("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5)])

# Apply a wide transformation - 'groupByKey' (grouping by keys)
grouped_rdd = rdd.groupByKey()

# Show the result
print(grouped_rdd.collect())


StatementMeta(, 4e32005c-daea-4ae9-ab60-36f306387aae, 3, Finished, Available, Finished)

[('B', <pyspark.resultiterable.ResultIterable object at 0x767475adc410>), ('A', <pyspark.resultiterable.ResultIterable object at 0x767475ade990>)]


# 17. How do you efficiently process huge datasets in PySpark?

To efficiently process huge datasets in PySpark, focus on: 

✅ Using optimized file formats (Parquet, ORC)

✅ Partitioning and bucketing wisely

✅ Avoiding collect() and using broadcast joins

✅ Minimizing shuffle operations

``` python
# Optimize executors, memory, and partitions based on the dataset
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")

```

✅ Configuring Spark memory and AQE(Adaptive Query Execution)

``` python
# Enables Spark to optimize queries dynamically

spark.conf.set("spark.sql.adaptive.enabled", "true")
```

✅ Using Pandas UDFs for better performance

# 18. How does Garbage Collection work in PySpark, and how can you optimize it?

| Level	| Optimization |
| --- | --- |
| JVM GC	| Use G1GC, tune memory allocation, reduce shuffling, enable GC logging
PySpark GC	|Use gc.collect(), optimize Pandas UDFs, free up memory, avoid large Python objects

# 19.When Should You Use Accumulators?



**Accumulators are best used for:**

- Counting Events in a Distributed Job (e.g., tracking bad records)
- Logging Debugging Information (e.g., counting null values in columns)
- Performance Metrics (e.g., counting total processed records)

**When NOT to Use Accumulators:**

- Not for Returning Results to Tasks: Since worker nodes cannot read accumulators, they should not be used for passing values between tasks.
- Not for Complex Data Structures: Use accumulators mainly for numeric values or simple data aggregations.

# 20. What is Dynamic Resource Allocation, and how does it optimize Spark performance?

**1. What is Dynamic Resource Allocation?**

- Dynamic Resource Allocation (DRA) in Spark allows an application to automatically adjust the number of executors based on workload demand. This feature helps in efficient resource utilization by scaling resources up when more tasks are running and down when executors are idle.

**2. How Does Dynamic Resource Allocation Work?**

- Spark starts with a minimal number of executors.
- If the job needs more resources, Spark requests additional executors from the cluster manager (YARN, Kubernetes, Mesos, or Standalone).
- If executors remain idle for a set duration, Spark automatically removes them to free resources.
- This process reduces resource waste and improves overall cluster efficiency.

**3. Why Use Dynamic Resource Allocation?**
Benefit	| Explanation
--- | ---
Optimized Resource Utilization	| Executors are allocated only when needed, reducing cluster waste.
Cost Savings	| Ideal for cloud environments (AWS EMR, Databricks) where billing is based on resource usage.
Improved Performance	| Ensures large workloads get additional executors dynamically, preventing bottlenecks.
Better Cluster Sharing	| Helps multiple Spark jobs coexist in a shared cluster by releasing unused resources.



**4. How to Enable Dynamic Resource Allocation?**
- Enable it by setting the following configuration parameters in spark-submit, spark-defaults.conf, or within a Spark application.


``` py
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=2
--conf spark.dynamicAllocation.maxExecutors=10
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.dynamicAllocation.executorIdleTimeout=60s
```


**5. When Should You Use Dynamic Resource Allocation?**

✅ Best For:

- ETL Pipelines & Batch Jobs → Jobs with variable workloads.
- Shared Clusters → Efficient use of resources across multiple teams.
- Cloud Environments → Saves costs by automatically scaling resources.

❌ Not Ideal For:

- Streaming Jobs (Structured Streaming/Kafka) → Executors should remain stable.
- Small Jobs → Overhead of acquiring/releasing resources may slow execution.
- Highly Predictable Workloads → Manually setting --num-executors can be more efficient