# **PYSPARK INTERVIEW QUESTIONS - PRASAD**

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.sql import Window

**Q1 While ingesting customer data from an external source, you notice duplicate entries. How would you remove duplicates and retain only the latest entry based on a timestamp column?**

In [0]:
data = [("101", "2023-12-01", 100), ("101", "2023-12-02", 150), 
        ("102", "2023-12-01", 200), ("102", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,date,sales
101,2023-12-01,100
101,2023-12-02,150
102,2023-12-01,200
102,2023-12-02,250


**Solution**

In [0]:
# casting date column from string to date format
df = df.withColumn("date", to_date(col('date')))
df.display()

product_id,date,sales
101,2023-12-01,100
101,2023-12-02,150
102,2023-12-01,200
102,2023-12-02,250


In [0]:
# Drop duplicates
df.orderBy('product_id', 'date', ascending=[1,0]).dropDuplicates(subset=['product_id']).display()

product_id,date,sales
101,2023-12-02,150
102,2023-12-02,250


**2. While processing data from multiple files with inconsistent schemas, you need to merge them into a single DataFrame. How would you handle this inconsistency in PySpark?**

**Solution**

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('PySparkInterviewQuestions').getOrCreate()

df = spark.read.format('parquet')\
        .option('mergeSchema', True)\
        .load('File/Data/datafiles')

**3. 🔍 Key Difference Between Spark and Hadoop MapReduce (Performance & Scalability)**

#### ✅ Performance

- **Apache Spark** is significantly faster than **Hadoop MapReduce** due to **in-memory processing**.
  - **Spark** keeps intermediate data in **memory (RAM)**, avoiding slow disk I/O.
  - **Hadoop MapReduce** writes intermediate data to **disk** after each map and reduce phase, increasing latency.
- Spark also uses **advanced optimizations**:
  - **Catalyst Optimizer** – Optimizes logical and physical query plans.
  - **Tungsten Execution Engine** – Efficient memory management and byte code generation.

#### ✅ Scalability

- Both **Spark** and **Hadoop MapReduce** are **horizontally scalable** across clusters.
- However, **Spark scales more efficiently** due to:
  - Faster execution
  - Lower resource usage for similar workloads
- **Hadoop MapReduce** also scales well but often requires **more hardware** and **longer processing times** due to its disk-based nature.

---

#### 🚀 Why Spark Is Faster: Comparison Table

| Feature                  | Spark                             | Hadoop MapReduce                 |
|--------------------------|-----------------------------------|----------------------------------|
| **Data Processing**      | In-memory                         | Disk-based                       |
| **Execution Model**      | Directed Acyclic Graph (DAG)      | Map → Shuffle → Reduce phases    |
| **Optimization**         | Catalyst + Tungsten               | No built-in query optimizer      |
| **Speed**                | Up to 100× faster (in-memory ops) | Slower due to disk I/O           |
| **Processing Type**      | Batch, Streaming, ML, Graph       | Batch only                       |

---

#### 📝 Summary

> The **key performance difference** is that **Spark performs in-memory computation**, drastically reducing I/O overhead, while **Hadoop MapReduce relies on disk** between operations. This makes Spark **much faster and more efficient**, especially for **iterative and real-time tasks**.  
>  
> In terms of **scalability**, both scale well across clusters, but **Spark uses resources more efficiently**, offering better performance with less overhead.

**4. You are working with a real-time data pipeline, and you notice missing values in your streaming data Column - Category. How would you handle null or missing values in such a scenario?**

**df_stream = spark.readStream.schema("id INT, value STRING").csv("path/to/stream")**

In [0]:
df_stream = spark.readStream.schema("id INT, value STRING").csv("path/to/stream")
df  = df_stream.fillna({'Category': 'N/A'})

**5. You need to calculate the total number of actions performed by users in a system. How would you calculate the top 5 most active users based on this information?**

In [0]:
data = [("user1", 5), ("user2", 8), ("user3", 2), ("user4", 10), ("user2", 3)]
columns = ["user_id", "actions"]

df = spark.createDataFrame(data, columns)
df.display()

user_id,actions
user1,5
user2,8
user3,2
user4,10
user2,3


In [0]:
# Groups actions by user_id, sums total actions per user, sorts by total actions descending, and selects top 5 users
df = df.groupBy('user_id')\
        .agg(sum('actions').alias('total_actions'))\
        .orderBy('total_actions', ascending=False)\
        .limit(5)
df.display()

user_id,total_actions
user2,11
user4,10
user1,5
user3,2


**6. While processing sales transaction data, you need to identify the most recent transaction for each customer. How would you approach this task?**

In [0]:
data = [("cust1", "2023-12-01", 100), ("cust2", "2023-12-02", 150),
        ("cust1", "2023-12-03", 200), ("cust2", "2023-12-04", 250)]
columns = ["customer_id", "transaction_date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

customer_id,transaction_date,sales
cust1,2023-12-01,100
cust2,2023-12-02,150
cust1,2023-12-03,200
cust2,2023-12-04,250


In [0]:
from pyspark.sql.window import Window

In [0]:
# convert transaction_date from string to date type
df = df.withColumn('transaction_date', col('transaction_date').cast(DateType()))
# rank the transactions by date for each customer
df = (
    df.withColumn(
        'flag',
        dense_rank().over(
            Window.partitionBy('customer_id').orderBy(desc('transaction_date'))
        )
    )
    .filter(col('flag') == 1)
)
df.show()

+-----------+----------------+-----+----+
|customer_id|transaction_date|sales|flag|
+-----------+----------------+-----+----+
|      cust1|      2023-12-03|  200|   1|
|      cust2|      2023-12-04|  250|   1|
+-----------+----------------+-----+----+



**7. You need to identify customers who haven’t made any purchases in the last 30 days. How would you filter such customers?**

In [0]:
data = [("cust1", "2025-09-01"), ("cust2", "2024-11-20"), ("cust3", "2024-11-25")]
columns = ["customer_id", "last_purchase_date"]

df = spark.createDataFrame(data, columns)

df.display()

customer_id,last_purchase_date
cust1,2025-09-01
cust2,2024-11-20
cust3,2024-11-25


In [0]:
# convert last_purchase_date from string to date type
df = df.withColumn('last_purchase_date', to_date('last_purchase_date'))
# calculate the gap between last_purchase_date and current_date
df = df.withColumn('gap', datediff(current_date(), 'last_purchase_date')).filter(col('gap') > 30)
df.display()

customer_id,last_purchase_date,gap
cust2,2024-11-20,310
cust3,2024-11-25,305


**8. While analyzing customer reviews, you need to identify the most frequently used words in the feedback. How would you implement this?**

In [0]:
data = [("customer1", "The product is great"), ("customer2", "Great product, fast delivery"), ("customer3", "Not bad, could be better")]
columns = ["customer_id", "feedback"]

df = spark.createDataFrame(data, columns)

df.display()

customer_id,feedback
customer1,The product is great
customer2,"Great product, fast delivery"
customer3,"Not bad, could be better"


In [0]:
# convert feedback to lowercase and split into words
df1 = df.withColumn('feedback', lower('feedback'))\
        .withColumn('feedback', explode(split('feedback', ' ')))
# group by feedback and count the number of occurrences
df_grp = df1.groupBy('feedback').agg(count('feedback').alias('word_count'))

df_grp.display()

feedback,word_count
product,1
great,2
the,1
is,1
"product,",1
fast,1
delivery,1
better,1
be,1
could,1


**9. You need to calculate the cumulative sum of sales over time for each product. How would you approach this?**

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-02", 200), ("product1", "2023-12-03", 150), ("product2", "2023-12-04", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

product_id,date,sales
product1,2023-12-01,100
product2,2023-12-02,200
product1,2023-12-03,150
product2,2023-12-04,250


In [0]:
df1 = df.withColumn('date', to_date('date'))
# Calculates the cumulative sum of sales for each product over time, ordered by date
df1 = df1.withColumn('cumsum', sum('sales').over(Window.partitionBy('product_id').orderBy('date')))

df1.display()

product_id,date,sales,cumsum
product1,2023-12-01,100,100
product1,2023-12-03,150,250
product2,2023-12-02,200,200
product2,2023-12-04,250,450


**10. While preparing a data pipeline, you notice some duplicate rows in a dataset. How would you remove the duplicates without affecting the original order?**

In [0]:
data = [("John", 25), ("Jane", 30), ("John", 25), ("Alice", 22)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
df.display()

name,age
John,25
Jane,30
John,25
Alice,22


In [0]:
df1 = df.withColumn('rowflag', row_number().over(Window.partitionBy('name').orderBy('age'))).filter(col('rowflag') == 1)

df1.display()

name,age,rowflag
Alice,22,1
Jane,30,1
John,25,1


**11. You are working with user activity data and need to calculate the average session duration per user. How would you implement this?**

In [0]:
data = [("user1", "2023-12-01", 50), ("user1", "2023-12-02", 60), 
        ("user2", "2023-12-01", 45), ("user2", "2023-12-03", 75)]
columns = ["user_id", "session_date", "duration"]
df = spark.createDataFrame(data, columns)

df.display()

user_id,session_date,duration
user1,2023-12-01,50
user1,2023-12-02,60
user2,2023-12-01,45
user2,2023-12-03,75


In [0]:
df = df.groupBy('user_id').agg(avg('duration').alias('avg_duration'))
df.display()

user_id,avg_duration
user1,55.0
user2,60.0


**12. While analyzing sales data, you need to find the product with the highest sales for each month. How would you accomplish this?**

In [0]:
data = [("product1", "2023-12-01", 100), ("product2", "2023-12-01", 150), 
        ("product1", "2023-12-02", 200), ("product2", "2023-12-02", 250)]
columns = ["product_id", "date", "sales"]
df = spark.createDataFrame(data, columns)
df.display()

product_id,date,sales
product1,2023-12-01,100
product2,2023-12-01,150
product1,2023-12-02,200
product2,2023-12-02,250


### 13. 🔍 What is the role of sparkcontext in pyspark?

---

#### 🔑 Role of `SparkContext` in PySpark

* **Entry point to Spark Core**
  `SparkContext` is the **gateway** to all Spark functionality in the JVM.
  It represents the connection between your Python process (driver) and the underlying **Spark cluster** (executors).

* **Cluster communication**
  It is responsible for talking to the **cluster manager** (like YARN, Mesos, Kubernetes, or Spark’s built-in standalone scheduler) and acquiring executor resources.

* **Task scheduling**
  Once executors are allocated, `SparkContext` coordinates how **tasks** (your transformations and actions) are distributed and executed in parallel across the cluster.

* **RDD API support**
  Historically, Spark was built around **RDDs** (Resilient Distributed Datasets).
  `SparkContext` is still the object that creates RDDs directly. For example:

  ```python
  sc = SparkContext("local", "MyApp")
  rdd = sc.parallelize([1, 2, 3, 4, 5])
  ```

* **Bridge for higher-level APIs**
  When you use the newer **`SparkSession`** (introduced in Spark 2.0 for DataFrame/Dataset API), it internally holds a `SparkContext`.
  👉 `spark.sparkContext` gives you access to it.

---

#### 📊 Typical workflow today

Most PySpark developers don’t explicitly create a `SparkContext` anymore — they use `SparkSession`:

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()
sc = spark.sparkContext   # access underlying SparkContext if needed
```

You only touch `sc` when:

* You want to work with **RDDs directly** (rare, but useful for low-level transformations).
* You need to check cluster info, for example:

  ```python
  print(sc.master)        # cluster manager URL
  print(sc.appName)       # application name
  print(sc.defaultParallelism)  # default number of partitions
  ```

---

#### ⚡ In short:

* `SparkContext` = low-level **engine connection & cluster interface**.
* `SparkSession` = high-level **entry point for SQL/DataFrames**, built on top of `SparkContext`.


In [0]:
df1 = df.withColumn('date', to_date('date'))

df1 = df1.withColumn('date', month('date')).groupBy('date', 'product_id').agg(sum("sales").alias('sum_sales'))

df1 = df1.withColumn(
        'ranking', 
        dense_rank().over(
                Window.partitionBy('date').orderBy(col('sum_sales').desc())
        )
).filter(col('ranking') == 1)
df1.display()

date,product_id,sum_sales,ranking
12,product2,400,1


## 14. 🔍 Spark Architecture - Explain


```pgsql
            +----------------------+
            |      Driver Program  |
            |  (SparkContext, DAG) |
            +-----------+----------+
                        |
               Job Scheduling / DAG
                        |
             +----------v----------+
             |    Cluster Manager  |
             |  (YARN, Mesos, etc) |
             +----------+----------+
                        |
        +---------------+---------------+
        |                               |
+-------v--------+              +-------v--------+
|    Executor    |              |    Executor    |
|   (Worker 1)   |              |   (Worker 2)   |
| - Tasks        |              | - Tasks        |
| - Cache / RDDs |              | - Cache / RDDs |
+----------------+              +----------------+
```


The entire process is orchestrated by the **Driver Node** and mediated by the **Cluster Manager**.

### 1. Application Submission

When an application (your PySpark code) is submitted, it first interfaces with the **Cluster Manager**. The Cluster Manager can be a system like Yarn Hadoop or a Spark standalone cluster.

### 2. Driver Setup

The Cluster Manager then performs the following crucial setup steps:

*   **Driver Node Selection:** It selects one machine or node within the cluster (or outside, depending on the mode) to establish the **Driver Program**, which becomes the **Driver Node**. The Driver Node is regarded as the **brain** of the operation.
*   **Spark Context Creation:** Within the Driver Node, the **Spark Context** is created. The Spark Context is the **starting point of Spark** and acts as a **bridge between the driver program and the cluster manager**. It is responsible for creating the connection to the cluster so Spark can access essential resources like CPU and memory for processing.

### 3. Planning and Resource Allocation

The Driver Program takes over the orchestration:

*   **Analysis:** The Driver Program analyzes the submitted code to determine the necessary transformations that need to be applied.
*   **Resource Request:** If the execution plan requires more resources, the Driver Program uses the **Spark Context** to communicate that information to the Cluster Manager (e.g., requesting additional worker machines or nodes).
*   **Worker Assignment:** The Cluster Manager allocates the requested **Worker Nodes** to the application.

### 4. Task Execution

Finally, the execution of the code occurs:

*   The Driver Node **communicates with the Worker Nodes**, passing on the required information and **guidelines (transformations)** they must follow to execute the work.
*   The Worker Nodes then execute these parallel processing tasks.

### Execution Modes

The flow varies slightly based on where the Driver Node resides:

| Execution Mode | Driver Node Location |
| :--- | :--- |
| **Cluster Mode** | The Driver Node is picked and resides **within the cluster**. |
| **Client Mode** | The Driver Node resides **outside of the cluster** (e.g., the user's local machine or another VM). |

**13. You are working with a large Delta table that is frequently updated by multiple users. The data is stored in partitions, and sometimes updates can cause inconsistent reads due to concurrent transactions. How would you ensure ACID compliance and avoid data corruption in PySpark?**

**New data**

df_new = spark.read.format('parquet').load('path')

from delta.tables import DeltaTable

delta_tbl = DeltaTable.forPath('path')

delta_tbl.alias('target').merge(df_new.alias('src'), "src.id == target.id")\
  .whenNotMatchedInsertAll()\
  .whenMatchedUpdateAll()\
  .execute()

**14. You need to process a large dataset stored in PARQUET format and ensure that all columns have the right schema (Almost). How would you do this?**

In [0]:
df_new = spark.read.format('parquet')\
              .option('inferSchema', True)\
              .load('path')

**15. You are reading a CSV file and need to handle corrupt records gracefully by skipping them. How would you configure this in PySpark?**

In [0]:
df_new = spark.read.format('csv')\
            .option('mode', 'DROPMALFORMED')\
            .load('staging_location')

## 16. 🔍What is difference between RDDs, Dataframe and Dataset ?

In PySpark, **RDDs (Resilient Distributed Datasets)**, **DataFrames**, and **Datasets** are different abstractions for working with distributed data. Here's a breakdown of their key differences:

### 1. 🧩**RDD (Resilient Distributed Dataset)**

* **Low-Level Abstraction**: RDDs are the most basic and low-level abstraction for distributed data in Spark.
* **Immutable**: RDDs are immutable collections of objects distributed across a cluster.
* **Fault Tolerance**: They offer fault tolerance via lineage, meaning Spark can recompute lost data from the original dataset.
* **No Schema**: RDDs don't have a schema, so the data structure isn't enforced.
* **Data Processing**: You need to manually handle the data transformations (like `map`, `flatMap`, `filter`, `reduce`).
* **Performance**: Generally slower than DataFrames and Datasets because Spark doesn't optimize RDD operations.

#### Use Case:

* When you need fine-grained control over transformations and data processing.

#### Example:

```python
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x * 2).collect()
```

### 2. 📊**DataFrame**

* **Higher-Level Abstraction**: DataFrames are a higher-level abstraction, built on top of RDDs, designed for working with structured data.
* **Schema**: DataFrames are organized into columns with a schema (like a table in a relational database).
* **Optimized**: Spark applies optimizations like Catalyst query optimization and Tungsten physical execution engine to DataFrames, making them more performant than RDDs.
* **APIs**: DataFrames support SQL-like queries (using methods like `select`, `filter`, `groupBy`, etc.).
* **Interoperability**: You can convert between DataFrames and RDDs easily.
* **Lazy Evaluation**: DataFrame operations are lazily evaluated, meaning computation is triggered only when an action (e.g., `show()`, `collect()`) is performed.

#### Use Case:

* When working with structured data, especially when performance optimization is required (like for SQL operations).

#### Example:

```python
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["ID", "Name"])
df.filter(df["ID"] == 1).show()
```

### 3. 🛠️**Dataset**

* **Type-Safe**: Datasets provide type safety, which is available only in the Scala API. However, in PySpark, Datasets are basically a DataFrame with additional type-safe operations available in languages like Scala (not directly in Python).
* **Combination of RDDs and DataFrames**: Datasets combine the advantages of RDDs and DataFrames — they provide the strong typing and functional programming capabilities of RDDs, along with the optimization features of DataFrames.
* **Only in Scala/Java**: Datasets are available in Scala and Java but are not a native concept in PySpark, so for PySpark, the closest equivalent is DataFrames.

#### Use Case:

* For Scala/Java users who need both type safety and optimization.

#### Example (Scala):

```scala
val ds = Seq((1, "Alice"), (2, "Bob")).toDS()
ds.filter($"_1" === 1).show()
```

### Key Differences in Summary:

| Feature              | **RDD**                              | **DataFrame**                      | **Dataset** (Scala/Java)          |
| -------------------- | ------------------------------------ | ---------------------------------- | --------------------------------- |
| **Type Safety**      | No                                   | No                                 | Yes (only in Scala/Java)          |
| **Schema**           | No schema                            | Schema (structured data)           | Schema + Type-safe                |
| **Optimization**     | No optimizations                     | Optimized (Catalyst, Tungsten)     | Optimized (Catalyst, Tungsten)    |
| **API**              | Functional programming (map, reduce) | SQL-like (select, filter, groupBy) | SQL-like + functional programming |
| **Fault Tolerance**  | Yes (via lineage)                    | Yes (via DataFrame lineage)        | Yes                               |
| **Language Support** | Python, Scala, Java, R               | Python, Scala, Java, R             | Scala, Java                       |

### When to Use Which:

* **RDD**: Use when you need fine control and when performance isn't a concern. Ideal for low-level transformations or working with unstructured data.
* **DataFrame**: Use when you have structured data and need to leverage SQL-like queries and optimizations.
* **Dataset**: Use in Scala or Java when you need both the power of RDDs and the optimization of DataFrames, along with type safety.

In PySpark, you'll most often be working with **DataFrames** since **Datasets** are not available in Python, and **RDDs** are mainly used for lower-level operations.


### **17. 🔍 What is Query Optimization ?**

Query optimization, also known as query planning, is the process by which Spark optimizes a query to determine the most efficient way to execute the code on multiple machines. This is a fundamental concept, as it is one of the reasons Spark is faster than Hadoop MapReduce.

Query optimization is facilitated by Spark's internal mechanisms, notably the **Catalyst Optimizer**, and involves converting the written code through several stages before execution:

### 1. The Code

The process begins when you write code in a PySpark notebook or application.

### 2. Logical Plan

The code is first converted into a Logical Plan.

*   **Purpose:** The Logical Plan determines the optimal order in which transformations should be performed. It analyzes the defined transformations (such as `select`, `where`, and `join`).
*   **Optimization:** Spark does not necessarily execute all transformations in the exact order you specify. Instead, it smartly decides the execution order to make the query efficient and run faster.
*   **Example:** If you apply a join and then select two columns, the logical plan might choose to select the columns first to immediately reduce the data size before executing the join.
*   **Mechanism:** The Logical Plan is created by the **Catalyst Optimizer**.

### 3. Physical Plan

Once the Logical Plan is finalized, it is converted into a Physical Plan.

*   **Purpose:** The Physical Plan determines the concrete execution strategy for the transformations defined in the Logical Plan.
*   **Cost Comparison:** It compares the cost of the transformation (e.g., a join) against a specific cost model.
*   **Strategy Selection:** It picks the **least expensive transformation** type for the given scenario. For example, if a join is required, the Physical Plan chooses the best specific join type, such as sort merge join, shuffle join, or hash join.

### 4. Conversion to RDDs

Finally, the chosen Physical Plan is converted into **Resilient Distributed Data Sets (RDDs)**.

*   **Execution:** All transformations (whether from DataFrames or DataSets) are ultimately converted into RDDs so that the data can be distributed among nodes and executed in parallel across the cluster.

This optimization capability is a key distinction between DataFrames/DataSets and RDDs; pure RDDs are "really, really slow" because they **cannot perform any query planning** or optimizations, while DataFrames enable users to perform query planning and various optimizations.

---

## 18. 🔍Tell us about SPARK SESSION.

### ✅ Spark Session in PySpark

The **Spark Session** is a core component in PySpark, serving as the modern entry point for interacting with Spark functionality.

---

#### 1. Definition and Role

The Spark Session is the **entry point for Spark**. It acts as a single, unified point of entry that allows developers to use various Spark functionalities.

---

#### 2. History and Context (Spark Session vs. Spark Context)

A common point of confusion arises because the sources mention two different entry points: **Spark Session** and **Spark Context**.

* **Spark Context** was the **older entry point** for Spark.
* The **Spark Session** is the **newer entry point**, introduced with **Spark 2.0** and available in all versions released after Spark 2.0.

---

#### 3. Consolidation of Functionality

The major advantage and reason for the introduction of the Spark Session was to consolidate multiple entry points previously needed before Spark 2.0.

Previously, users needed three different types of entry points:

1. **Spark Context**
2. **SQL Context**
3. **Hive Context**

The Spark Session **includes everything** that Spark Context does, plus it offers **support for SQL operations**. It internally manages the Spark Context, SQL Context, and Hive Context, meaning you do not need to explicitly create them.

This allows developers to manage all entry points together **under one hood**.

---

#### 4. Current Usage

It is generally **recommended** that developers use **Spark Session** after Spark 2.0. Although it is still possible to use Spark Context to support legacy applications created by companies, using Spark Session is preferred because:

* It is compatible
* It streamlines the process
* It manages all necessary contexts internally

---


## 19. 🔍Difference Between Wide Transformations and Narrow Transformations in PySpark

✅ The difference between **Wide Transformations** and **Narrow Transformations** is a fundamental concept in PySpark, revolving around the necessity of **data shuffling** across the nodes in a cluster. Since Spark is primarily used for transformations, understanding this distinction is crucial for performance optimization.

---

#### Narrow Transformation

| Feature            | Description                                                                                                                                                                                                                                                                                                                                                                     | Source |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
| **Data Shuffling** | **No data shuffling** occurs among the nodes.                                                                                                                                                                                                                                                                                                                                   |        |
| **Processing**     | The machine/node is **independent** and "doesn't need to bother data sitting in other machine". Transformations are applied using only the data residing within the local partition.                                                                                                                                                                                            |        |
| **Efficiency**     | Narrow transformations are generally **more efficient** because they avoid the expensive process of data shuffling across the network.                                                                                                                                                                                                                                          |        |
| **Example**        | A **Filter** transformation is a basic and very popular example. If you filter for `ID greater than three`, the local machine can simply check its data and return the result without needing to consult other nodes. For instance, if Node 1 has IDs 1, 2, 5, 9, it returns 5 and 9; and if Node 2 has IDs 3, 6, 7, it returns 6 and 7. No data is shuffled between the nodes. |        |

---

#### Wide Transformation

| Feature            | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          | Source |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------ |
| **Data Shuffling** | **Data shuffling will be there** between the machines or nodes. Data needs to be shuffled to give the correct final output.                                                                                                                                                                                                                                                                                                                                                                                                          |        |
| **Processing**     | Processing requires gathering data from different partitions across multiple nodes.                                                                                                                                                                                                                                                                                                                                                                                                                                                  |        |
| **Efficiency**     | Wide transformations are **inherently slower and more expensive** because shuffling involves moving large amounts of data across the network.                                                                                                                                                                                                                                                                                                                                                                                        |        |
| **Example**        | A **Group By** transformation is a very good example of a wide transformation. If you apply a `Group by` on an `ID` column, and the same ID exists on multiple machines, the system needs to **shuffle** (gather) all related records to the same machine to complete the aggregation. For example, if ID '1' exists on Machine A, Machine B, and Machine C, the system must move all records associated with ID '1' to one machine before the aggregation (like summing the price associated with ID '1') can be correctly applied. |        |

---

## 1. Narrow Transformation Example: `filter()`

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NarrowTransformation").getOrCreate()

data = [("Alice", 29), ("Bob", 31), ("Cathy", 25)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Narrow transformation: filter (each output partition depends only on input partition)
filtered_df = df.filter(df.Age > 28)

filtered_df.show()

spark.stop()
```

* Here, `filter()` is a narrow transformation — no shuffle needed, each partition filters its own data.

---

## 2. Wide Transformation Example: `groupBy()`

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WideTransformation").getOrCreate()

data = [("Alice", "Math", 85), ("Bob", "Math", 90), ("Alice", "English", 95), ("Bob", "English", 80)]
df = spark.createDataFrame(data, ["Name", "Subject", "Score"])

# Wide transformation: groupBy causes shuffle, since data must be grouped across partitions
grouped_df = df.groupBy("Name").avg("Score")

grouped_df.show()

spark.stop()
```

* `groupBy()` triggers a shuffle because Spark needs to group all records with the same key (Name) together, which might involve moving data between partitions.

---



## 20. 🔍`COALESCE()` vs. `REPARTITION()` in PySpark

✅ The source material discusses the functions `COALESCE()` and `REPARTITION()` as very handy functions in PySpark. They are primarily used for managing the number and size of data partitions across the cluster.

Here is a breakdown of the use and characteristics of each function:

---

#### 1. `COALESCE()`

The primary use of the `COALESCE()` function is to **reduce the number of partitions**.

* **Purpose:**
  `COALESCE()` is used to combine existing partitions into fewer partitions. It is directly linked to the `OPTIMIZE` command, which also involves coalescing partitions to create fewer, bigger partitions.

* **Best Practice / Rule of Thumb:**
  When dealing with Big Data, it is generally considered easier to read data from **bigger and fewer partitions** rather than smaller and numerous ones.

* **Data Shuffling:**
  A key characteristic of `COALESCE()` is that it **does not require shuffling** the data.

---

#### 2. `REPARTITION()`

The `REPARTITION()` function provides flexibility in managing partitions, allowing the user to increase or decrease the number of partitions.

* **Purpose:**
  `REPARTITION()` is used to change the number of partitions.

  * **Increasing Partitions:**
    If you have two large partitions and are experiencing memory issues, you can use `REPARTITION()` to distribute the data among more machines/nodes.

  * **Syntax Example:**
    You can simply use `df.repartition(n)` and provide any number (e.g., 3, 4, 5, 6) to create that number of partitions.

* **Data Shuffling:**
  Unlike `COALESCE()`, `REPARTITION()` **requires shuffling** the data. This is because it needs to break up and redistribute the data into the specified number of partitions.

---

#### 🔄 Summary Table

| Feature             | `COALESCE()`                                                                         | `REPARTITION()`                                                                              |
| ------------------- | ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------- |
| **Primary Use**     | **Always reduces** the number of partitions.                                         | Can **increase or decrease** the number of partitions.                                       |
| **Data Shuffling**  | **Does not require shuffling** the data.                                             | **Requires shuffling** the data.                                                             |
| **Common Scenario** | Combining many small partitions into fewer large ones (e.g., for optimized reading). | Redistributing data among more nodes (e.g., to resolve memory issues with large partitions). |

---

## 21. 🔍When to use `CACHE()` and `PERSIST()` in PySpark? What's the difference ?

✅ The core use of both the `CACHE()` and `PERSIST()` functions is to **store the intermediate results** of a DataFrame. This prevents Spark from having to **recompute** the DataFrame from scratch every time it is used.

You should use either `CACHE()` or `PERSIST()` **when you need to use the result or intermediate results multiple times**.

---

#### 🔹 1. `CACHE()`

* **Relationship to `PERSIST()`:**
  `CACHE()` is essentially a special, simplified version of the `PERSIST()` function.

* **Storage Level:**
  When you use `df.cache()`, it is equivalent to calling:

  ```python
  df.persist(StorageLevel.MEMORY_AND_DISK)
  ```

* **Convenience:**
  This specific storage level (`MEMORY_AND_DISK`) is so commonly used that Spark provides `cache()` as a **shortcut**, requiring **no arguments**.

---

#### 🔸 2. `PERSIST()`

* **Functionality:**
  `PERSIST()` is used when you want to **customize how and where** Spark stores intermediate results.

* **Flexibility (Storage Options):**
  With `PERSIST()`, you can choose from several storage levels:

  * `MEMORY_AND_DISK` *(default, same as `CACHE()`)*
  * `MEMORY_ONLY`
  * `DISK_ONLY`
  * `MEMORY_AND_DISK_SER`
  * `MEMORY_ONLY_SER`

  Example:

  ```python
  from pyspark import StorageLevel
  df.persist(StorageLevel.MEMORY_ONLY)
  ```

---

#### 🧾 Summary Table

| Feature           | `CACHE()`                                            | `PERSIST()`                                                 |
| ----------------- | ---------------------------------------------------- | ----------------------------------------------------------- |
| **Primary Goal**  | Stores intermediate results for reuse.               | Stores intermediate results for reuse.                      |
| **Storage Level** | Fixed to **`StorageLevel.MEMORY_AND_DISK`**          | User-defined (e.g., `MEMORY_ONLY`, `DISK_ONLY`, etc.)       |
| **Syntax**        | No arguments (e.g., `df.cache()`)                    | Requires a storage level argument (e.g., `df.persist(...)`) |
| **Relationship**  | Shortcut for `persist(StorageLevel.MEMORY_AND_DISK)` | More flexible, base function for storage configuration      |

---

#### ✅ When to Use Which?

* Use **`CACHE()`** if:

  * You’re okay with the default behavior (store in memory and spill to disk if needed).
  * You want **simplicity**.

* Use **`PERSIST()`** if:

  * You need **fine-grained control** over how data is stored.
  * You want to **optimize for memory or disk usage** based on your cluster's resources.

---

## 22. 🔍What is the importance of PARTITIONS in PYSPARK ?

#### 📦 Importance of Partitions in PySpark

✅ The importance of partitions in PySpark is fundamental to achieving **high performance** and **efficiency** in processing large datasets. Partitions enable Spark to utilize **parallel processing** capabilities effectively.

Here is a detailed explanation of why partitions matter:

---

#### 1. ⚙️ Enabling Parallelism (MPP)

The primary reason for using partitions is to achieve **parallelism**.

* When you talk about partitions in PySpark, you are asking Spark to **perform parallelism**.
* This concept is also referred to using the technical term **MPP (Massive Parallel Processing)**.
* Partitions work by taking a massive dataset and **distributing it into smaller chunks** across multiple machines or nodes.
* These machines then process the data **in parallel**, improving speed and scalability.

---

#### 2. 🚀 Promoting Query Optimization

Partitions are crucial for optimizing how data is **read and processed**.

* When you write data, it is more efficient to **read from fewer, larger partitions** rather than from many small ones.
* This is a common **rule of thumb** in Big Data processing.
* Partitions can be created during the **write process** using functions like `partitionBy`, which:

  * Organize the data layout.
  * **Speed up queries** by limiting the amount of data read.

---

#### 3. 📈 Impact on Performance Management

Partitions directly impact **execution speed**, **resource management**, and **cluster efficiency**:

* **Handling Large Data:**

  * Large partitions can cause **memory issues**.
  * Use `REPARTITION()` to **increase partition count** and better distribute load across the cluster.

* **Optimizing Small Partitions:**

  * Use `COALESCE()` to **reduce partition count**, often after filtering or joins, to avoid overhead from too many tiny partitions.
  * Typically combined with `OPTIMIZE` for **efficient reads**.

* **Delta Lake Optimization:**

  * The `OPTIMIZE` command coalesces partitions into fewer, larger ones.
  * Combine with `Z-ORDER BY` to **sort data within partitions**.
  * Enables **data skipping**, where Spark **skips partitions** that don’t contain relevant data, drastically improving performance.

---

#### ✅ Summary

Partitions are **fundamental** because they determine how data is:

* **Divided** across the cluster
* **Processed** in parallel
* **Stored** and **read** efficiently

Proper partitioning leads to:

* Faster execution
* Lower memory pressure
* Better scalability
* Smarter query planning (via partition pruning & data skipping)

---

**22. You have a dataset containing the names of employees and their departments. You need to find the department with the most employees.**

In [0]:
data = [("Alice", "HR"), ("Bob", "Finance"), ("Charlie", "HR"), ("David", "Engineering"), ("Eve", "Finance")]
columns = ["employee_name", "department"]

df = spark.createDataFrame(data, columns)
df.display()

employee_name,department
Alice,HR
Bob,Finance
Charlie,HR
David,Engineering
Eve,Finance


In [0]:
df1 = df.groupBy('department')\
        .agg(count('employee_name').alias('total_emp'))\
        .sort('total_emp', ascending=False)
df1.display()

department,total_emp
HR,2
Finance,2
Engineering,1


**23. While processing sales data, you need to classify each transaction as either 'High' or 'Low' based on its amount. How would you achieve this using a when condition**

In [0]:
data = [("product1", 100), ("product2", 300), ("product3", 50)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,100
product2,300
product3,50


In [0]:
df1 =df.withColumn('price_cat', when(col('sales')>50, 'High').otherwise('Low'))
df1.display()

product_id,sales,price_cat
product1,100,High
product2,300,High
product3,50,Low


**24. While analyzing a large dataset, you need to create a new column that holds a timestamp of when the record was processed. How would you implement this and what can be the best USE CASE?**

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,100
product2,200
product3,300


In [0]:
df.withColumn('processed_time', current_timestamp()).display()

product_id,sales,processed_time
product1,100,2025-09-26T18:00:15.285Z
product2,200,2025-09-26T18:00:15.285Z
product3,300,2025-09-26T18:00:15.285Z


### 🛠️ Implementation

The implementation involves using the built-in PySpark function `current_timestamp()` to capture the exact time of processing and adding it as a new column using `withColumn()`.

1.  🧠 **Function Used:** The function required is `current_timestamp()` because the interviewer wants a detailed time.
2.  💻 **Implementation Code:** You would use the following approach (assuming the new column is named `process_time`):
    ```python
    DF = DF.withColumn("process_time", current_timestamp())
    ```
3.  ⏱️ **Output Detail:** When implemented, this provides a detailed timestamp, showing the year, month, day, and the time down to milliseconds and seconds, demonstrating exactly **at what time these records were actually processed**.

### 🚀 Best Use Case

The best use case for creating a column that holds the processing timestamp relates directly to scenarios where auditing, tracking, or managing data evolution is necessary.

*   🧩 **Scenario:** The most common and best use case is when dealing with **Slowly Changing Dimensions (SCD)**.
*   🎯 **Purpose:** In the scenario of building a data warehouse or dealing with SCDs, you need to tell **at what time this record was changed**, modified, or created.
*   📁 **Specific Columns:** This functionality is often used to create audit columns such as **`create_time`** and **`modify_time`**.
*   🏗️ **Context:** By applying the `current_timestamp()` function, you can demonstrate that you used this approach when **building a data warehouse** or dealing with Slowly Changing Dimensions.


**25. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it. How would you achieve this?**

In [0]:
data = [("product1", 100), ("product2", 200), ("product3", 300)]
columns = ["product_id", "sales"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,sales
product1,100
product2,200
product3,300


In [0]:
df.createOrReplaceTempView('tempsqldf')

In [0]:
spark.sql("SELECT * FROM tempsqldf")

DataFrame[product_id: string, sales: bigint]

In [0]:
%sql
SELECT * FROM tempsqldf WHERE sales = 100;

product_id,sales
product1,100


**26. You need to register this PySpark DataFrame as a temporary SQL object and run a query on it (FROM DIFFERENT NOTEBOOKS AS WELL)?**

df.createOrReplaceGlobalTempView('globalview')

SELECT * FROM global_temp.globalview;

**27. You need to query data from a PySpark DataFrame using SQL, but the data includes a nested structure. How would you flatten the data for easier querying?**

In [0]:
data = [("product1", {"price": 100, "quantity": 2}), 
        ("product2", {"price": 200, "quantity": 3})]
columns = ["product_id", "product_info"]

df = spark.createDataFrame(data, columns)
df.display()

product_id,product_info
product1,"Map(price -> 100, quantity -> 2)"
product2,"Map(price -> 200, quantity -> 3)"


In [0]:
# df.select('product_id', 'product_info.price', 'product_info.quantity').display()
df.select('product_id', 'product_info.price', 'product_info.quantity').createOrReplaceTempView('flatview')

In [0]:
%sql
select * from flatview;

product_id,price,quantity
product1,100,2
product2,200,3


**28. You are ingesting data from an external API in JSON format where the schema is inconsistent. How would you handle this situation to ensure a robust pipeline?**

In [0]:
# scenario based
df =  spark.read.format('json').option('mergeSchema', True)

**29. While reading data from Parquet, you need to optimize performance by partitioning the data based on a column. How would you implement this?**

**scenario based**
df.write.format('parquet').mode('append').partitionBy('category').save('location')

**30. You are working with a large dataset in Parquet format and need to ensure that the data is written in an optimized manner with proper compression. How would you accomplish this?**

**scenario based**
df.write.format('parquet').option('compression', 'snappy')

**31. Your company uses a large-scale data pipeline that reads from Delta tables and processes data using complex aggregations. However, performance is becoming an issue due to the growing dataset size. How would you optimize the performance of the pipeline?**

OPTIMIZE tabledelta ZORDER BY ('order_date')

## 🔍Delta Table & Spark Aggregation Optimization Strategies

✅ This scenario addresses a highly relevant issue in modern data engineering, as performance optimization for growing datasets is a key skill sought by companies. Since the pipeline reads from large Delta tables and performs complex aggregations, the optimization strategy should focus on both the Delta table structure and the Spark execution plan.

The sources highlight the following key strategies for optimizing performance, especially concerning large Delta tables:

---

### 1️⃣ Delta Table Optimization using `OPTIMIZE` and `ZORDER BY`

The most direct and impactful step to optimize performance when reading from large Delta tables is applying the `OPTIMIZE` command coupled with `ZORDER BY`.

#### 🧱 A. `OPTIMIZE` Command (Partition Coalescing):

The `OPTIMIZE` command addresses performance issues by **coalescing partitions**.

* 🔄 It combines many smaller partitions into fewer, larger partitions.
* 📏 This is based on the "rule of thumb" that it is always easier to read data from **bigger and fewer partitions** rather than smaller and numerous partitions.
* 🤖 The system automatically determines the best partition size required.

#### 🧭 B. `ZORDER BY` Command (Data Skipping):

The `ZORDER BY` command should be used alongside `OPTIMIZE` to significantly boost query performance.

* 🧮 It sorts the data *within* the partitions based on chosen columns (e.g., `order date`).
* 🚀 When a query runs (e.g., `SELECT * FROM table WHERE ID <= 5`), Spark utilizes **column statistics** (available for the first 32 or 33 columns) that are generated for the Delta table.
* ⛔ Because the data is sorted (Z-ordered), Spark knows the minimum and maximum values in each partition. If a partition's data range does not meet the query condition, Spark performs **Data Skipping**, avoiding the read entirely for that partition, which saves significant time.
* 💡 *Implementation Example (using SQL syntax):*

  ```sql
  OPTIMIZE table Delta_table_name
  ZORDER BY (order_date) 
  ```

  (assuming `order_date` is the relevant column for ordering).

---

### 2️⃣ Optimizing Complex Aggregations

Since the pipeline includes complex aggregations, which often involve wide transformations like `GROUP BY` that trigger expensive data shuffling, the following strategies should be employed:

#### ⚙️ A. Adaptive Query Execution (AQE):

Adaptive Query Execution is a powerful optimization technique that optimizes query execution at runtime. It handles several complexities related to large-scale aggregations automatically:

* 🎯 **Dynamically Optimizes Skewness:** AQE automatically mitigates data skewness (where some keys are heavily biased) by breaking overly large partitions into multiple smaller ones at runtime. This prevents "Executor out of memory" errors caused when skewed keys pull too much data to a single machine.
* 🧹 **Dynamic Partition Pruning:** AQE can coalesce unnecessary small or empty partitions dynamically, ensuring efficient parallelism.
* 🆗 AQE is generally auto-enabled in recent Spark versions (post 3.0/3.12).

#### 🧩 B. Partition Management (Coalesce and Repartition):

For manual control over partitions:

* ➖ Use the `coalesce` function to **reduce** the number of partitions to a smaller, more manageable count, which does not require data shuffling.
* 🔄 Use `repartition` to increase or decrease the number of partitions. Note that `repartition` requires data shuffling.

#### 🧠 C. Caching Intermediate Results:

If the results of a complex aggregation step (an intermediate data frame) are used multiple times later in the pipeline, store that result using `cache` or `persist`.

* 💾 This avoids recomputing the data frame from scratch repeatedly, saving time.
* 🧱 `cache` is equivalent to using `persist` with the storage level set to `MEMORY_AND_DISK`, meaning it attempts to store data in memory first and spills to disk if memory runs out.

#### 🚚 D. Broadcast Joins (if complex aggregations involve joins):

If the aggregation requires joining the large Delta table with a significantly smaller data frame (e.g., a lookup table), use a **broadcast join**.

* 📡 A broadcast join copies the small data frame to all executor nodes, eliminating the expensive networking overhead and data shuffling typically required for standard joins.
* 🧠 Spark automatically performs a broadcast join if the smaller data frame is below a set threshold, but it can be explicitly enforced using the `broadcast` keyword.

#### 📦 E. Compression:

Ensure the data is written with efficient compression, such as **Snappy** for Parquet format (often used internally by Delta Lake), to save storage cost and optimize query performance.

---

**43. You are processing sales data. Group by product categories and create a list of all product names in each category.**

In [0]:
data = [("Electronics", "Laptop"), ("Electronics", "Smartphone"), ("Furniture", "Chair"), ("Furniture", "Table")]
columns = ["category", "product"]
df = spark.createDataFrame(data, columns)
df.display()

category,product
Electronics,Laptop
Electronics,Smartphone
Furniture,Chair
Furniture,Table


In [0]:
df1 = df.groupBy('category').agg(collect_list('product').alias('products'))
df1.display()

category,products
Electronics,"List(Laptop, Smartphone)"
Furniture,"List(Chair, Table)"


**44. You are analyzing orders. Group by customer IDs and list all unique product IDs each customer purchased.**

In [0]:
data = [(101, "P001"), (101, "P002"), (102, "P001"), (101, "P001")]
columns = ["customer_id", "product_id"]
df = spark.createDataFrame(data, columns)
df.display()

customer_id,product_id
101,P001
101,P002
102,P001
101,P001


In [0]:
df.groupBy('customer_id').agg(collect_set('product_id').alias('unique_products')).display()

customer_id,unique_products
101,"List(P001, P002)"
102,List(P001)


**45. For customer records, combine first and last names only if the email address exists.**

In [0]:
data = [("John", "Doe", "john.doe@example.com"), ("Jane", "Smith", None)]
columns = ["first_name", "last_name", "email"]
df = spark.createDataFrame(data, columns)
df.display()

first_name,last_name,email
John,Doe,john.doe@example.com
Jane,Smith,


In [0]:
df.withColumn(
    'fullname', 
    when(
        col('email').isNotNull(), 
        concat_ws('-', col('first_name'), col('last_name'))).otherwise(None)).display()

first_name,last_name,email,fullname
John,Doe,john.doe@example.com,John-Doe
Jane,Smith,,


**46. You have a DataFrame containing customer IDs and a list of their purchased product IDs. Calculate the number of products each customer has purchased.**

In [0]:
data = [
    (1, ["prod1", "prod2", "prod3"]),
    (2, ["prod4"]),
    (3, ["prod5", "prod6"]),
]
myschema = "customer_id INT ,product_ids array<STRING>"

df = spark.createDataFrame(data, myschema)
df.display()

customer_id,product_ids
1,"List(prod1, prod2, prod3)"
2,List(prod4)
3,"List(prod5, prod6)"


In [0]:
df1 = df.withColumn('number_of_products', size(col('product_ids')))
df1.display()

customer_id,product_ids,number_of_products
1,"List(prod1, prod2, prod3)",3
2,List(prod4),1
3,"List(prod5, prod6)",2


**47. You have employee IDs of varying lengths. Ensure all IDs are 6 characters long by padding with leading zeroes.**

In [0]:
data = [
    ("1",),
    ("123",),
    ("4567",),
]
schema = ["employee_id"]

df = spark.createDataFrame(data, schema)
df.display()

employee_id
1
123
4567


In [0]:
df.withColumn('employee_id', lpad(col('employee_id'), 6, '0')).display()

employee_id
1
123
4567


**48. You need to validate phone numbers by checking if they start with "91"**

In [0]:
data = [
    ("911234567890",),
    ("811234567890",),
    ("912345678901",),
]
schema = ["phone_number"]

df = spark.createDataFrame(data, schema)
df.display()

phone_number
911234567890
811234567890
912345678901


In [0]:
df.filter(substring(col("phone_number"), 1, 2) == '91').display()

phone_number
911234567890
912345678901


**49. You have a dataset with courses taken by students. Calculate the average number of courses per student.**

In [0]:
data = [
    (1, ["Math", "Science"]),
    (2, ["History"]),
    (3, ["Art", "PE", "Biology"]),
]
schema = ["student_id", "courses"]

df = spark.createDataFrame(data, schema)
df.display()

student_id,courses
1,"List(Math, Science)"
2,List(History)
3,"List(Art, PE, Biology)"


In [0]:
df.withColumn('course_size', size('courses')).groupBy().agg(avg('course_size')).display()

avg(course_size)
2.0


**50. You have a dataset with primary and secondary contact numbers. Use the primary number if available; otherwise, use the secondary number.**

In [0]:
data = [
    (None, "1234567890"),
    ("9876543210", None),
    ("7894561230", "4567891230"),
]
schema = ["primary_contact", "secondary_contact"]

df = spark.createDataFrame(data, schema)
df.display()

primary_contact,secondary_contact
,1234567890.0
9876543210.0,
7894561230.0,4567891230.0


In [0]:
df.withColumn('contact', coalesce(col("primary_contact"), col('secondary_contact'))).display()

primary_contact,secondary_contact,contact
,1234567890.0,1234567890
9876543210.0,,9876543210
7894561230.0,4567891230.0,7894561230


**51. You are categorizing product codes based on their lengths. If the length is 5, label it as "Standard"; otherwise, label it as "Custom".**

In [0]:
data = [
    ("prod1",),
    ("prd234",),
    ("pr9876",),
]
schema = ["product_code"]

df = spark.createDataFrame(data, schema)
df.display()

product_code
prod1
prd234
pr9876


In [0]:
df.withColumn('code_flag', when(length(col("product_code"))==5, "Standard").otherwise("Custom")).display()

product_code,code_flag
prod1,Standard
prd234,Custom
pr9876,Custom
