
---

### ✅ **1. Introduction to Spark & PySpark**

---

1. **What is Apache Spark?**  
   Apache Spark is an open-source, distributed computing system designed for fast computation. It performs in-memory data processing and is used for big data analytics, ETL processes, machine learning, and real-time data stream processing. Spark supports multiple languages like Python, Java, Scala, and R.

---

2. **What are the main features of Spark?**  
   - In-memory computing for faster execution  
   - Support for batch and stream processing  
   - Fault tolerance via lineage and DAG  
   - APIs in multiple languages (Python, Scala, Java, R)  
   - Libraries like Spark SQL, MLlib, GraphX, and Spark Streaming  
   - Integration with Hadoop, Hive, HBase, Cassandra, Kafka, etc.

---

3. **What is PySpark?**   
   PySpark is the Python API for Apache Spark. It allows Python developers to leverage Spark’s power for distributed computing using familiar Python syntax. It supports all Spark features like DataFrames, SQL, RDDs, and MLlib.

---

4. **What are the advantages of Spark over Hadoop MapReduce?**  
   - Spark processes data in memory, whereas MapReduce writes to disk after every operation.  
   - Spark is significantly faster for iterative computations and machine learning.  
   - Spark supports real-time data processing with Structured Streaming.  
   - It has higher-level APIs (e.g., DataFrames) that simplify development.

---

5. **What are the main components of Spark?**   
   - **Driver Program**: Manages the Spark application lifecycle  
   - **Cluster Manager**: Allocates resources (YARN, Mesos, or Standalone)  
   - **Executors**: Run tasks on worker nodes  
   - **Tasks**: Smallest unit of work sent to executors  
   - **Jobs & Stages**: Logical and physical divisions of Spark workloads

---

6. **What is the Spark ecosystem?**   
   - **Spark Core**: Base engine for scheduling and memory management  
   - **Spark SQL**: For querying structured data using SQL or DataFrames  
   - **Spark Streaming**: For real-time data processing  
   - **MLlib**: Machine learning library  
   - **GraphX**: For graph processing

---

7. **What is the role of the Spark driver and executor?**   
   - **Driver**: Manages Spark context, prepares execution plan, and schedules jobs/stages/tasks.  
   - **Executors**: Run tasks assigned by the driver and store data for in-memory processing.

---

8. **What is a Spark application?**   
   A Spark application is a user program that uses SparkContext to perform a set of operations. It consists of a driver process and multiple executors working together on a job.

---

9. **What is lazy evaluation in Spark?**   
   Lazy evaluation means Spark doesn’t compute results right away. Transformations are not executed until an action (like `show()`, `collect()`, or `count()`) is called. This allows Spark to optimize execution plans for better performance.

---

10. **What are Spark jobs, stages, and tasks?**   
   - **Job**: Triggered by an action (e.g., `collect`)  
   - **Stage**: A set of parallel tasks based on DAG and shuffle boundaries  
   - **Task**: Unit of work sent to one executor, performs computation on a partition

---

11. **How does Spark achieve fault tolerance?**  
    Spark achieves fault tolerance primarily through its *lineage information* and *RDD immutability*.

    * Each RDD in Spark maintains a record of the operations (transformations) used to build it from source data (called lineage).
    * If a partition of an RDD is lost (due to node failure, for example), Spark can recompute only the lost partition by replaying the lineage.
    * For DataFrames and Datasets, Spark uses query plans and recomputes intermediate results when required.
    * Additionally, Spark can persist/cast RDDs and DataFrames in memory or on disk, and uses checkpointing in streaming scenarios for long lineage chains.
---

12. **What is the difference between Apache Spark and Apache Flink?**  
    **Apache Spark** and **Apache Flink** are both distributed data processing frameworks but differ in architecture and use cases:

    | Feature                  | Apache Spark                                     | Apache Flink                                                      |
    | ------------------------ | ------------------------------------------------ | ----------------------------------------------------------------- |
    | **Data Processing Mode** | Primarily micro-batch (Structured Streaming)     | True real-time streaming                                          |
    | **Latency**              | Milliseconds to seconds (higher)                 | Sub-second latency (lower)                                        |
    | **Fault Tolerance**      | Lineage and DAG recomputation                    | Distributed snapshots & state management                          |
    | **Ease of Use**          | Mature ecosystem, strong community support       | Steeper learning curve                                            |
    | **Batch Processing**     | Originally batch-oriented, later added streaming | Originally streaming-oriented, supports batch via bounded streams |
    | **Streaming Semantics**  | Exactly-once using checkpointing                 | Exactly-once using native state and snapshots                     |
    | **Common Use**           | ETL, batch processing, ML workflows              | Real-time analytics, complex event processing                     |

    In summary, Spark is better suited for ETL, batch, and ML use cases, while Flink is ideal for real-time, event-driven, and stateful streaming applications.

---







### ✅ **2. Spark Architecture & Concepts**

---

**1. What is an RDD (Resilient Distributed Dataset)?**  
An RDD (Resilient Distributed Dataset) is the fundamental data structure in Apache Spark.
It represents an immutable, distributed collection of objects that can be processed in parallel across a cluster.

**Key characteristics of RDDs:**

* **Resilient**: Can recover from node failures using lineage.
* **Distributed**: Data is split across multiple nodes.
* **Immutable**: Once created, RDDs cannot be changed.
* **Lazy Evaluation**: Operations are not executed until an action is called.

**Example:**

```python
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
```

**Use cases:** When you need fine-grained control over data and operations, especially for low-level transformations.

---


**2. What is a DataFrame in Spark?**  
A DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database or a DataFrame in Pandas.

**Advantages:**

* Built on top of RDDs, but with schema support.
* Optimized by Spark’s Catalyst optimizer for better performance.
* Can be queried using SQL or DSL (domain-specific language).

**Example:**

```python
df = spark.read.csv("data.csv", header=True, inferSchema=True)
```


---

**3. What is a Dataset in Spark?**  
Datasets are a strongly-typed collection of objects that combine the benefits of RDDs and DataFrames.
Available only in **Scala** and **Java**, not in PySpark.

* Offers compile-time type safety like RDDs.
* Optimized execution like DataFrames.
* In PySpark, DataFrame is the primary abstraction.


---

**4. What is the difference between RDD, DataFrame, and Dataset?**  

| Feature               | RDD             | DataFrame          | Dataset (Scala/Java)             |
| --------------------- | --------------- | ------------------ | -------------------------------- |
| **Abstraction Level** | Low-level       | High-level         | Medium-level (type-safe)         |
| **Optimization**      | No optimization | Catalyst Optimizer | Catalyst Optimizer + Type Safety |
| **Type Safety**       | Yes             | No                 | Yes                              |
| **Ease of Use**       | Verbose         | Simple SQL-like    | Moderate                         |
| **Performance**       | Slower          | Faster             | Faster                           |

---


**5. What is a partition in Spark?**  
A partition is a logical division of data across the cluster. Spark breaks data into partitions and processes each partition in parallel.

* Each partition is processed by a single task.
* Number of partitions impacts parallelism and performance.
* You can control partitions using `.repartition()` or `.coalesce()`.

**Example:**

```python
df.repartition(4)
```

---

**6. What is a transformation and an action in Spark?**  

* **Transformations** are lazy operations (e.g., `map()`, `filter()`, `groupBy()`) that define a new RDD or DataFrame.
* **Actions** trigger the actual computation (e.g., `collect()`, `count()`, `show()`).

**Example:**

```python
rdd.map(lambda x: x * 2)  # Transformation
rdd.count()               # Action
```


---

**7. What is lineage in Spark?**  
Lineage is the sequence of operations that lead to the creation of an RDD or DataFrame.
It is used for fault tolerance — if data is lost, Spark can reconstruct it using the lineage graph.

**Benefit:** No need to store intermediate data — reduces storage cost and improves fault tolerance.

---

**8. What is a DAG (Directed Acyclic Graph) in Spark?**  
A DAG is a graph where vertices represent RDDs or DataFrames and edges represent transformations.

* Spark builds a DAG of stages for execution.
* DAG helps in optimization by rearranging or combining transformations.
* It prevents recomputation of already executed steps.

---

**9. What is a shuffle operation?**  
A **shuffle** is a data movement process across partitions or nodes triggered by operations like `groupBy()`, `join()`, `distinct()`.

* It’s expensive as it involves disk I/O and network communication.
* Spark tries to avoid unnecessary shuffles using optimizations.

**Example:**

```python
df.groupBy("id").sum("amount")  # Triggers shuffle
```

---


**10. What is SparkSession?**  
`SparkSession` is the entry point for working with Spark functionality in DataFrame and SQL APIs.
Introduced in Spark 2.0 to replace `SQLContext` and `HiveContext`.

**Example:**

```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()
```

---


**11. What happens when an action is called in Spark?**  
When you call an action (like `count()`, `collect()`, or `show()`), Spark evaluates the transformations lazily defined earlier. It:

* Constructs a **DAG** (Directed Acyclic Graph) of stages.
* Divides the DAG into **stages** based on shuffle boundaries.
* Launches **tasks** for each stage across the cluster.
* Executes tasks in parallel and returns results.

Actions **trigger the entire execution plan**, optimize it, and return the computed result.

---

**12. How does Spark handle failures during execution?**  
Spark uses **lineage information** and **task retry mechanisms** for fault tolerance.

* **If a node fails**, Spark reschedules the failed task on another node.
* **If data is lost**, Spark recomputes it using the DAG lineage.
* Spark retries tasks by default (usually 4 times) before marking them failed.

---

**13. What is speculative execution in Spark?**  
Speculative execution helps avoid slow tasks (stragglers) by running backup copies of the same task on other nodes.

* The first task to complete wins; others are killed.
* It improves performance in the presence of **noisy or slow nodes**.
* Enabled by default for MapReduce-like operations.

Enable using:

```bash
spark.speculation true
```

---

**14. How does Spark handle memory management internally?**  
Spark divides memory into two main regions:

* **Execution memory**: Used for computation like joins, aggregations, sorting.
* **Storage memory**: Used for caching RDDs/DataFrames and broadcast variables.

Spark dynamically shares memory between execution and storage. It uses **Unified Memory Management** from Spark 1.6+.

Garbage collection and **Tungsten project optimizations** (off-heap memory, bytecode generation) also help with efficiency.

---

**15. What is an accumulator in Spark and how is it used?**
Accumulators are variables that can be used for **aggregating information** across executors in a Spark job.

* Mainly used for **debugging**, **metrics**, or **counters**
* Updates are only guaranteed to be **seen on the driver**

Example in PySpark:

```python
acc = spark.sparkContext.accumulator(0)

def count_nulls(row):
    global acc
    if row is None:
        acc += 1

df.foreach(lambda row: count_nulls(row))
print("Total nulls:", acc.value)
```

⚠️ They are **write-only from executors** and **readable only on the driver**.

---




### ✅ **3. Getting Started with PySpark**

---

**1. How do you install PySpark?**  
You can install PySpark using pip:

```bash
pip install pyspark
```

It installs the necessary PySpark binaries and dependencies.
For local development, ensure Java 8 or above is installed, and the `JAVA_HOME` environment variable is set.

---

**2. How do you create a SparkSession in PySpark?**  
A `SparkSession` is the entry point to Spark functionality in PySpark:

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()
```

This replaces the older `SQLContext` and `HiveContext` APIs.

---

**3. How do you run PySpark in local mode?**  
To run PySpark locally (without a cluster), set the master to `local`:

```python
spark = SparkSession.builder \
    .appName("LocalExample") \
    .master("local[*]") \
    .getOrCreate()
```

Here, `[*]` uses all available cores on your machine.

---

**4. How do you submit a PySpark job?**  
Use the `spark-submit` command from the terminal:

```bash
spark-submit my_script.py
```

You can also specify cluster configurations:

```bash
spark-submit --master yarn --deploy-mode cluster my_script.py
```

---

**5. How do you read a CSV file using PySpark?**  
You can read a CSV into a DataFrame using:

```python
df = spark.read.option("header", "true").csv("path/to/file.csv")
df.show()
```

Additional options like `inferSchema`, `delimiter`, and `nullValue` can be added.

---

**6. How do you write a DataFrame to a file?**  
You can write a DataFrame in various formats:

```python
df.write.mode("overwrite").parquet("output/path")
df.write.csv("output.csv", header=True)
```

Use `.mode()` with values like `overwrite`, `append`, `ignore`, `error`.

---

**7. How do you view the schema of a DataFrame?**  
Use the `printSchema()` method:

```python
df.printSchema()
```

This displays data types of each column in a tree format.

---

**8. How do you show the first N rows of a DataFrame?**  
Use:

```python
df.show(n)
```

By default, `df.show()` displays the first 20 rows. Use `truncate=False` to show full column values:

```python
df.show(10, truncate=False)
```

---

**9. How do you configure PySpark with different memory/executor settings?**  
You can configure these during SparkSession creation or via `spark-submit`:

```python
SparkSession.builder \
    .appName("ConfigExample") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()
```

Or from CLI:

```bash
spark-submit --executor-memory 4G --driver-memory 2G my_script.py
```

---

**10. How do you integrate PySpark with Jupyter Notebooks?**  
Install Jupyter and PySpark:

```bash
pip install jupyter pyspark
```

Then launch Jupyter:

```bash
jupyter notebook
```

In the notebook, create a SparkSession:

```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Notebook").getOrCreate()
```

---




### ✅ **4. RDD Operations**

---

**1. How do you create an RDD in PySpark?**  
You can create RDDs in two primary ways:

* From an existing collection (like a Python list):

```python
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
```

* From an external data source:

```python
rdd = spark.sparkContext.textFile("path/to/file.txt")
```

---

**2. What are common RDD transformations?**  
Transformations are **lazy operations** that define a new RDD from an existing one. Common transformations include:

* `map()`
* `filter()`
* `flatMap()`
* `distinct()`
* `union()`
* `intersection()`
* `groupByKey()`
* `reduceByKey()`
* `sortBy()`

Example:

```python
rdd2 = rdd.map(lambda x: x * 2)
```

---

**3. What are common RDD actions?**  
Actions trigger the actual execution and return results. Examples include:

* `collect()`
* `count()`
* `first()`
* `take(n)`
* `reduce()`
* `saveAsTextFile()`
* `foreach()`

Example:

```python
rdd.collect()
```

---

**4. How do you filter data in an RDD?**  
You can use `filter()` with a lambda function:

```python
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
```

This returns an RDD with only even numbers.

---

**5. How do you map a function over an RDD?**  
Use the `map()` transformation:

```python
squared_rdd = rdd.map(lambda x: x * x)
```

It applies the function to each element and returns a new RDD.

---

**6. How do you perform reduce operations on RDDs?**  
Use `reduce()` to aggregate elements:

```python
total = rdd.reduce(lambda a, b: a + b)
```

This sums all elements of the RDD.

---

**7. How do you cache or persist an RDD?**  

* `cache()` stores the RDD in memory:

```python
rdd.cache()
```

* `persist()` gives control over storage levels:

```python
rdd.persist(StorageLevel.MEMORY_AND_DISK)
```

Caching is useful when the same RDD is reused multiple times.

---

**8. How do you repartition or coalesce an RDD?**  

* `repartition(n)` increases/decreases partitions with full shuffle:

```python
rdd2 = rdd.repartition(4)
```

* `coalesce(n)` reduces partitions with minimal data movement:

```python
rdd2 = rdd.coalesce(2)
```

---

**9. How do you collect data from an RDD?**  
Use `collect()` to bring all data to the driver:

```python
data = rdd.collect()
print(data)
```

⚠️ Not recommended on large datasets—it can cause memory issues.

---

**10. How do you save an RDD to a file?**  
Use `saveAsTextFile()`:

```python
rdd.saveAsTextFile("output/path")
```

The result will be written as multiple part files, one per partition.

---


**11. What are key-value RDDs?**  
Key-value RDDs are RDDs where each element is a pair tuple: `(key, value)`. These RDDs are essential for aggregation, grouping, and joining operations.

Example:

```python
kv_rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
```

Key-value RDDs unlock powerful transformations like:

* `reduceByKey()`
* `groupByKey()`
* `combineByKey()`
* `aggregateByKey()`
* `join()`

These are common in log analysis, counts by category, etc.

---

**12. How do you use aggregateByKey or reduceByKey in RDDs?**  

**`reduceByKey()`** merges the values of each key using a reduce function:

```python
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
result = rdd.reduceByKey(lambda x, y: x + y).collect()
# Output: [('a', 4), ('b', 2)]
```

**`aggregateByKey()`** gives more flexibility by using:

* Initial zero value
* Two functions: one for aggregation within a partition and another between partitions

Example:

```python
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
result = rdd.aggregateByKey(0, 
                            lambda acc, v: acc + v,     # seqOp (within partition)
                            lambda acc1, acc2: acc1 + acc2)  # combOp (across partitions)
```

Use `aggregateByKey()` when you need a custom aggregation logic.

---





### ✅ **5. DataFrame Operations**
---

**1. How do you create a DataFrame in PySpark?**   
You can create a DataFrame from various sources like RDDs, Python collections, or external files.

✅ From Python list:
```python
data = [("Alice", 25), ("Bob", 30)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
```

✅ From CSV or other files:
```python
df = spark.read.csv("data.csv", header=True, inferSchema=True)
```

---

**2. How do you select columns from a DataFrame?**  
You can select single or multiple columns using `select()` or dot notation:
```python
df.select("name").show()
df.select("name", "age").show()
df.select(df["name"], df["age"]).show()
```

---

**3. How do you filter rows in a DataFrame?**  
Use the `filter()` or `where()` method to filter rows:
```python
df.filter(df["age"] > 25).show()
df.where("age > 25").show()
```

---

**4. How do you add or drop columns?**  
✅ Add a column:
```python
df = df.withColumn("new_col", df["age"] + 10)
```

✅ Drop a column:
```python
df = df.drop("new_col")
```

---

**5. How do you group and aggregate data?**  
Use `groupBy()` along with aggregate functions:
```python
df.groupBy("department").agg({"salary": "avg"}).show()

# OR using functions
from pyspark.sql.functions import avg
df.groupBy("department").agg(avg("salary")).show()
```

---

**6. How do you join DataFrames?**  
You can perform various types of joins using `join()`:
```python
df1.join(df2, df1.id == df2.emp_id, "inner").show()
# Join types: inner, left, right, outer, left_semi, left_anti
```

---

**7. How do you sort DataFrames?**  
Use `orderBy()` or `sort()`:
```python
df.orderBy("age").show()
df.sort(df["age"].desc()).show()
```

---

**8. How do you handle missing data?**  
You can drop or fill missing values:
```python
df.dropna().show()  # Remove rows with nulls
df.fillna({"age": 0, "name": "unknown"}).show()  # Replace nulls
```

---

**9. How do you change column data types?**  
Use the `cast()` function:
```python
from pyspark.sql.functions import col
df = df.withColumn("age", col("age").cast("double"))
```

---

**10. How do you use UDFs (User Defined Functions) in PySpark?**  
Define a custom Python function and register it as a UDF:
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def convert_case(s):
    return s.upper()

convert_udf = udf(convert_case, StringType())
df = df.withColumn("name_upper", convert_udf(df["name"]))
```

---

**11. How do you cache or persist a DataFrame?**  
You can use `.cache()` or `.persist()` to store the DataFrame in memory (or memory+disk) for performance improvement when reused multiple times.

```python
df.cache()           # Stores in memory
df.persist()         # Stores in memory and disk by default
df.persist(StorageLevel.DISK_ONLY)  # Custom persistence level
```

To unpersist:

```python
df.unpersist()
```

---

**12. How do you convert between DataFrame and RDD?**  
✅ DataFrame → RDD:

```python
rdd = df.rdd
```

✅ RDD → DataFrame:

```python
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(name="Alice", age=25)])
df = spark.createDataFrame(rdd)
```

---

**13. What’s the difference between `select`, `selectExpr`, and `withColumn`?**  

* `select()` is used to select specific columns:

  ```python
  df.select("name", "age")
  ```

* `selectExpr()` allows SQL expressions directly:

  ```python
  df.selectExpr("name", "age + 5 as new_age")
  ```

* `withColumn()` creates a new or modifies existing column:

  ```python
  df.withColumn("age_plus_5", df["age"] + 5)
  ```

---

**14. How do you handle nested or complex columns (arrays, structs, maps)?**  
You can access nested fields using dot notation and flatten them:

```python
df.select("user.name", "user.address.city").show()
```

To explode arrays:

```python
from pyspark.sql.functions import explode
df.select(explode(df["hobbies"])).show()
```

---

**15. How do you explode arrays in PySpark DataFrames?**  
The `explode()` function transforms each element of an array into a separate row.

```python
from pyspark.sql.functions import explode

data = [("A", ["Python", "SQL"]), ("B", ["Java", "Scala"])]
df = spark.createDataFrame(data, ["name", "skills"])

df_exploded = df.select("name", explode("skills").alias("skill"))
df_exploded.show()
```

---




### ✅ **6. PySpark SQL**

---

**1. What is Spark SQL?**  
Spark SQL is a Spark module that allows users to run SQL queries on structured data using Spark’s distributed computing engine. It supports:

* SQL queries using `spark.sql()`
* DataFrame API
* Integration with Hive
* UDFs (User Defined Functions)

It provides powerful query optimization via the **Catalyst optimizer** and **Tungsten engine**.

---

**2. How do you run SQL queries on DataFrames?**  
To run SQL queries, you must register the DataFrame as a temporary view:

```python
df.createOrReplaceTempView("employees")
spark.sql("SELECT name, salary FROM employees WHERE salary > 50000").show()
```

You can also register a **global temporary view**:

```python
df.createOrReplaceGlobalTempView("global_employees")
spark.sql("SELECT * FROM global_temp.global_employees").show()
```

---

**3. How do you register a DataFrame as a temporary view?**  
There are two types:

* **Session-Scoped Temporary View**:

  ```python
  df.createOrReplaceTempView("my_view")
  ```

* **Global Temporary View** (shared across sessions):

  ```python
  df.createOrReplaceGlobalTempView("global_view")
  ```

Then you can query it using `spark.sql()`.

---

**4. How do you read and write Parquet/ORC/JSON files?**  

✅ Reading:

```python
df_parquet = spark.read.parquet("data/emp.parquet")
df_json = spark.read.json("data/emp.json")
df_orc = spark.read.orc("data/emp.orc")
```

✅ Writing:

```python
df.write.parquet("output/emp.parquet")
df.write.json("output/emp.json")
df.write.orc("output/emp.orc")
```

You can also specify format:

```python
df.write.format("parquet").save("output/")
```

---

**5. What are catalog functions in Spark SQL?**  
Catalog functions provide metadata about Spark objects (databases, tables, functions):

```python
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("table_name")
spark.catalog.isCached("table_name")
```

You can also cache/un-cache tables:

```python
spark.catalog.cacheTable("my_table")
spark.catalog.uncacheTable("my_table")
```

---

**6. How do you use window functions in Spark SQL?**  
Window functions perform calculations across rows related to the current row:

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())
df.withColumn("rank", rank().over(windowSpec)).show()
```

Common window functions: `rank()`, `dense_rank()`, `row_number()`, `lead()`, `lag()`.

---

**7. How do you create and use global temp views?**  
Global temp views are available across Spark sessions:

```python
df.createOrReplaceGlobalTempView("global_employees")
spark.sql("SELECT * FROM global_temp.global_employees").show()
```

Note: You must prefix with `global_temp.` to access.

---

**8. How do you use SQL functions in PySpark?**  
PySpark provides many built-in functions via `pyspark.sql.functions`:

```python
from pyspark.sql.functions import col, upper, length

df.select(upper(col("name")), length(col("name"))).show()
```

You can also use SQL expressions:

```python
df.selectExpr("UPPER(name)", "LENGTH(name)").show()
```

---

**9. How do you optimize SQL queries in Spark?**  

* Use **DataFrames** instead of raw RDDs
* Use **broadcast joins** for small tables
* Avoid wide transformations
* Use **partition pruning**
* Enable **predicate pushdown** in file reads
* Cache reused data with `.cache()`
* Check physical/logical plans:

  ```python
  df.explain(True)
  ```

---

**10. How do you register a UDF in Spark SQL?**  
You can create custom UDFs using Python functions and register them with Spark:

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def add_ten(x):
    return x + 10

add_ten_udf = udf(add_ten, IntegerType())
df.withColumn("new_salary", add_ten_udf(col("salary"))).show()
```

Or register it for SQL:

```python
spark.udf.register("addTen", add_ten, IntegerType())
spark.sql("SELECT addTen(salary) FROM employees").show()
```

---



### ✅ **7. Data Sources & File Formats**

---

**1. What file formats does Spark support?**  
Spark supports various file formats for reading and writing:

* **Structured formats**: CSV, JSON, Parquet, ORC, Avro
* **Semi-structured**: XML (via external libraries)
* **Binary**: Images, SequenceFiles
* **Custom sources**: JDBC, Hive, Delta Lake

---

**2. How do you read/write CSV, JSON, Parquet, and Avro files?**  

✅ Reading:

```python
# CSV
df_csv = spark.read.option("header", True).csv("data/employees.csv")

# JSON
df_json = spark.read.json("data/employees.json")

# Parquet
df_parquet = spark.read.parquet("data/employees.parquet")

# Avro (requires spark-avro package)
df_avro = spark.read.format("avro").load("data/employees.avro")
```

✅ Writing:

```python
df.write.mode("overwrite").json("output/employees_json")
df.write.mode("append").parquet("output/employees_parquet")
```

---

**3. What is schema inference?**
Schema inference is the process where Spark automatically detects the column names and data types while reading files (like CSV, JSON):

```python
df = spark.read.option("inferSchema", True).csv("data.csv")
```

While convenient, it can be expensive for large files. For production, it's recommended to provide an explicit schema.

---

**4. How do you specify custom schemas?**  

```python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True)
])

df = spark.read.schema(schema).csv("data/employees.csv")
```

This improves performance and avoids incorrect type inference.

---

**5. How do you read data from a database using JDBC?**  

```python
df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/hrdb") \
    .option("dbtable", "employees") \
    .option("user", "root") \
    .option("password", "password") \
    .load()
```

You can also write to a DB using `.write.format("jdbc")`.

---

**6. How do you handle corrupt or malformed records?**  

You can configure how Spark handles bad records using the `mode` option:

* `"PERMISSIVE"` (default): Corrupt records go into a special column
* `"DROPMALFORMED"`: Drops bad records
* `"FAILFAST"`: Fails the job immediately

Example:

```python
spark.read.option("mode", "DROPMALFORMED").json("data.json")
```

---

**7. How do you partition data when writing files?**  

```python
df.write.partitionBy("department").parquet("output/partitioned/")
```

This creates subdirectories for each unique value in the partition column.

---

**8. How do you read multiple file formats from a directory?**  
If the files are of the same format (e.g., CSV), just point Spark to the folder:

```python
df = spark.read.csv("data/multiple_csv/")
```

To handle multiple formats, you'd need to read them separately and merge:

```python
df1 = spark.read.csv("data/a.csv")
df2 = spark.read.json("data/b.json")
df = df1.unionByName(df2)
```

---

**9. What’s the difference between save modes (overwrite, append, etc.)?**  

Spark supports the following save modes:

* `"append"` – Adds to existing data
* `"overwrite"` – Replaces existing data
* `"error"` (default) – Fails if path exists
* `"ignore"` – Skips writing if path exists

Example:

```python
df.write.mode("overwrite").json("output/")
```

---

**10. How does Spark handle large files and small files?**  

✅ Large files: Spark splits them into **multiple partitions**, allowing parallel processing.

✅ Small files: Too many small files can lead to performance bottlenecks (many tasks with low computation).

**Solution:**

* Use `.repartition(n)` or `.coalesce(n)`
* Use `merge` operations in output
* Optimize data ingestion with batching

---

**11. How do you read different file formats using `.read.format()` in Spark?**
Spark provides a unified API using `.read.format("format")` to read data from various sources:

```python
# Read CSV
df_csv = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("path/to/file.csv")

# Read Parquet
df_parquet = spark.read.format("parquet").load("path/to/file.parquet")

# Read JSON
df_json = spark.read.format("json").load("path/to/file.json")

# Read Avro
df_avro = spark.read.format("avro").load("path/to/file.avro")
```

You can also set options like `delimiter`, `mode`, `inferSchema`, and `compression`.

---






### ✅ **8. Data Cleaning & Transformation**

---

**1. How do you handle missing or null values?**  

You can use the following PySpark functions to handle nulls:

* **Drop nulls:**

```python
df.na.drop()  # Drops rows with any null
df.na.drop(how="all")  # Drops rows where all values are null
```

* **Fill nulls:**

```python
df.na.fill(0)  # Fill all numeric nulls with 0
df.na.fill({"salary": 0, "name": "Unknown"})  # Fill specific columns
```

* **Replace specific values:**

```python
df.replace("N/A", None)
```

---

**2. How do you drop duplicates in a DataFrame?**  

Use `.dropDuplicates()`:

```python
df.dropDuplicates()
df.dropDuplicates(["id", "name"])  # Drop based on specific columns
```

---

**3. How do you replace values in a DataFrame?**  

You can use `.replace()` or `when()`:

```python
df.replace("old_value", "new_value")
```

Or using `when()` for conditional replacement:

```python
from pyspark.sql.functions import when, col

df = df.withColumn("status", when(col("status") == "inactive", "INACTIVE").otherwise(col("status")))
```

---

**4. How do you split and extract columns?**  

For string splitting and extraction:

```python
from pyspark.sql.functions import split

df = df.withColumn("first_name", split(df["full_name"], " ").getItem(0))
df = df.withColumn("last_name", split(df["full_name"], " ").getItem(1))
```

---

**5. How do you apply custom functions to columns?**  

Use UDFs (User Defined Functions):

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def upper_case(name):
    return name.upper()

uppercase_udf = udf(upper_case, StringType())
df = df.withColumn("upper_name", uppercase_udf(df["name"]))
```

---

**6. How do you pivot and unpivot data?**  

✅ **Pivot (rows to columns):**

```python
df.groupBy("department").pivot("month").sum("salary").show()
```

✅ **Unpivot** (melt): You can use `selectExpr()` for simple unpivoting:

```python
df_unpivoted = df.selectExpr("id", "stack(3, 'math', math, 'science', science, 'english', english) as (subject, score)")
```

---

**7. How do you sample data in PySpark?**  

You can randomly sample rows using `.sample()`:

```python
df_sample = df.sample(withReplacement=False, fraction=0.2, seed=42)
```

Or take a fixed number of rows:

```python
df.limit(10).show()
```

---

**8. How do you rename columns in PySpark?**  

You can rename columns using `.withColumnRenamed()`:

```python
df = df.withColumnRenamed("old_name", "new_name")
```

To rename multiple columns:

```python
for col_name in df.columns:
    df = df.withColumnRenamed(col_name, col_name.lower())
```

---

**9. How do you normalize or scale data in PySpark?**  

Use `MinMaxScaler` or `StandardScaler` from `pyspark.ml.feature`:

```python
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

assembler = VectorAssembler(inputCols=["salary"], outputCol="features")
df_vector = assembler.transform(df)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled")
df_scaled = scaler.fit(df_vector).transform(df_vector)
```

---

**10. How do you explode nested arrays or structs?**  

Use the `explode()` function:

```python
from pyspark.sql.functions import explode

df = df.withColumn("exploded_col", explode(df["nested_array"]))
```

For nested structs, use dot notation:

```python
df.select("user.name", "user.age").show()
```

---

**11. How do you flatten nested JSON data?**  

First, infer schema from nested JSON, then flatten:

```python
df = spark.read.json("nested.json")
df.select("user.name", "user.address.city").show()
```

You may also use `explode()` and `selectExpr()` to flatten deeply nested data.

---




### ✅ **9. Performance Tuning & Optimization**

---

**1. What is Spark Catalyst Optimizer?**  

The **Catalyst Optimizer** is a built-in query optimizer in Spark SQL that transforms logical plans into efficient physical plans. It optimizes SQL queries and DataFrame operations by applying rules like constant folding, predicate pushdown, reordering filters, and more — **improving performance without changing your code.**

---

**2. What is Tungsten in Spark?**  

**Tungsten** is a Spark execution engine enhancement for memory and CPU efficiency. It focuses on:

* Binary memory format for faster serialization.
* Off-heap memory management (avoiding JVM GC overhead).
* Whole-stage code generation to optimize physical execution plans.

It allows Spark to process data faster and with lower memory usage.

---

**3. How do you view and understand Spark execution plans?**  

Use:

```python
df.explain()
```

Or for a more detailed plan:

```python
df.explain(True)
```

This prints the physical and logical plans including optimizations. You can also view it in the Spark UI (`/SQL` tab or `Stages` view).

---

**4. How do you optimize Spark jobs?**  

Some common strategies:

* Cache intermediate DataFrames that are reused.
* Filter early (predicate pushdown).
* Avoid wide transformations like `groupBy` or `join` unless necessary.
* Use `broadcast` joins when joining small and large datasets.
* Minimize shuffles by controlling partitioning.

---

**5. What are broadcast joins?**  

Broadcast joins send a small table to all nodes so the join happens locally without shuffling the large dataset.

Usage:

```python
from pyspark.sql.functions import broadcast

df.join(broadcast(small_df), "key")
```

It improves performance dramatically when one of the DataFrames is small.

---

**6. How do you control partitioning?**  

Use:

* `.repartition(n)` – increases partitions with shuffle.
* `.coalesce(n)` – reduces partitions without shuffle (more efficient).

You can also partition while writing data:

```python
df.write.partitionBy("year", "month").parquet("path/")
```

Controlling partition size helps reduce shuffle and improve parallelism.

---

**7. How do you tune memory and executor settings?**  

Tune these Spark configurations:

```bash
--executor-memory 4G
--executor-cores 2
--num-executors 5
```

These depend on your data size, cluster capacity, and workload. Also adjust:

```bash
spark.sql.shuffle.partitions
spark.memory.fraction
```

Monitoring with Spark UI helps fine-tune.

---

**8. How do you avoid data skew?**  

Techniques:

* **Salting** the skewed key (adding a random suffix).
* Use **broadcast joins** where possible.
* Use **filter** early to reduce data volume.
* Repartition intelligently using `repartition()`.

Data skew happens when a few keys have too much data, slowing down specific tasks.

---

**9. How do you use caching and persistence effectively?**  

Cache only when:

* You reuse a DataFrame multiple times.
* It's expensive to recompute.

Usage:

```python
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)
```

Remove when no longer needed:

```python
df.unpersist()
```

---

**10. What are best practices for writing efficient Spark code?**  

* Avoid collecting data to the driver (`.collect()`).
* Use `select()` instead of `*` (read only needed columns).
* Filter early to reduce data size.
* Minimize transformations that cause shuffling (e.g., `groupBy`, `join`).
* Broadcast small tables for joins.
* Persist reusable DataFrames.

---

**11. What is whole-stage code generation?**  

Whole-stage code generation is a Spark optimization technique that compiles multiple operations into a single Java method — reducing overhead of interpreted execution. This boosts CPU efficiency and speeds up processing.

---

**12. What is predicate pushdown?**  

Predicate pushdown is the technique where filters (WHERE clause conditions) are pushed down to the data source (e.g., Parquet, JDBC) instead of applying them after reading the data.

Example:

```python
df = spark.read.parquet("data/").filter("age > 30")
```

Here, the filter is applied while reading — improving performance.

---

**13. What are wide and narrow transformations?**  

* **Narrow transformations:** No data shuffling between partitions (e.g., `map`, `filter`). Faster and more efficient.
* **Wide transformations:** Require data shuffle across partitions (e.g., `groupBy`, `join`). More resource-intensive.

Understanding these helps you optimize transformations.

---

**14. How do you debug out-of-memory (OOM) errors in Spark?**  

* Increase executor memory (`--executor-memory`).
* Avoid wide transformations on massive datasets.
* Use `persist(StorageLevel.DISK_ONLY)` if memory is limited.
* Use `.repartition()` to balance data.
* Use Spark UI to analyze stages/tasks that failed.

---

**15. What is a broadcast join in Spark and when should it be used?**
A broadcast join is an optimization technique where the **smaller DataFrame is broadcast to all executors**, avoiding shuffles.

Use when:

* One table is **much smaller** (e.g., lookup table)
* Spark can **fit the small table in memory**

Enable explicitly:

```python
from pyspark.sql.functions import broadcast

df_joined = df_large.join(broadcast(df_small), "key")
```

Benefits:

* Reduces network shuffle
* Faster for small lookups

---





### ✅ **10. Spark Streaming & Structured Streaming**

---

**1. What is Spark Streaming?**  
Spark Streaming is a Spark component that enables **real-time data processing** of live data streams. It breaks down incoming data into **micro-batches**, processes them using Spark’s core engine, and outputs the results to sinks like HDFS, databases, or dashboards.

---

**2. What is Structured Streaming?**  
Structured Streaming is a newer, high-level API built on top of Spark SQL. Unlike classic Spark Streaming’s micro-batch abstraction, Structured Streaming allows you to treat streaming data as an **unbounded table**, using familiar SQL/DataFrame operations.

Key benefits:

* Unified batch and stream processing.
* Auto management of state and checkpointing.
* Easier and more powerful APIs.

---

**3. What are the differences between DStreams and Structured Streaming?**  

| Feature            | DStreams                    | Structured Streaming            |
| ------------------ | --------------------------- | ------------------------------- |
| API Level          | RDD-based                   | DataFrame/Dataset-based         |
| Abstraction        | Micro-batch RDDs            | Unbounded tables                |
| Code Complexity    | Higher                      | Lower                           |
| Built-in Optimizer | No Catalyst                 | Uses Catalyst & Tungsten        |
| Fault Tolerance    | Manual checkpointing needed | Built-in state management       |
| Integration        | Fewer integrations          | Better integration with SQL, ML |

---

**4. How do you read streaming data in PySpark?**  

You can use `.readStream` to read streaming data from sources like Kafka, files, socket, etc.

Example:

```python
df = spark.readStream.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("path/to/input/folder")
```

---

**5. How do you write streaming results?**  

Use `.writeStream` to define how the output should be written.

Example:

```python
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
```

Common sinks: `console`, `memory`, `Kafka`, `Parquet`, etc.

---

**6. What are output modes in Structured Streaming?**  

1. **Append** – Only new rows are written.
2. **Complete** – All rows are written every time (e.g., for aggregations).
3. **Update** – Only changed rows are written.

Example:

```python
.writeStream.outputMode("complete")
```

---

**7. How do you handle late data and watermarking?**  

Watermarking allows Structured Streaming to handle **out-of-order or late events**.

```python
df.withWatermark("event_time", "10 minutes")
```

This ensures the engine waits 10 minutes before assuming no more data will arrive for a given window — avoiding duplicate aggregation.

---

**8. How do you join streaming and static data?**  

You can join a streaming DataFrame with a static DataFrame directly in PySpark.

Example:

```python
stream_df.join(static_df, "key")
```

Only certain join types are allowed (e.g., **inner join**, **left outer join**) depending on streaming direction.

---

**9. How do you monitor streaming queries?**  

* **Spark UI > Streaming Tab** shows the active queries, batch durations, state size, input/output rate, etc.
* Programmatically:

```python
query.status
query.lastProgress
```

You can also enable logging to monitor errors and progress.

---

**10. What are the checkpointing mechanisms in Spark Streaming?**  

Checkpointing helps with **fault tolerance** by storing intermediate states and offsets.

* **Metadata checkpointing**: stores info about the running query.
* **Data checkpointing**: stores intermediate RDDs or state data.

Set it using:

```python
.writeStream.option("checkpointLocation", "path/to/dir")
```

---

**11. How do you handle exactly-once processing?**  

To achieve exactly-once semantics:

* Use **idempotent sinks** (like Delta Lake, Kafka).
* Enable checkpointing.
* Avoid duplicates by maintaining state across batches (stateful operations).
  Structured Streaming supports this through **stateful transformations + watermarks**.

---

**12. What are watermarks and event-time windows?**  

* **Watermarks**: Allow the system to discard old data and handle late data gracefully.
* **Event-time windows**: Allow aggregations like `groupBy(window(col("eventTime"), "5 minutes"))`.

Together, they ensure efficient **windowed aggregations** on time-series/streaming data.

---

**13. How do you maintain state in streaming queries?**  

Using **stateful transformations** like:

```python
grouped_df = df.groupBy("user").agg(...)
```

Or with **mapGroupsWithState** (for custom state logic):

```python
df.groupByKey(...).mapGroupsWithState(...)
```

The state is stored in memory or disk (with checkpointing), and updated with each micro-batch.

---




### ✅ **11. Machine Learning with PySpark MLlib**

---

**1. What is MLlib in Spark?**  
MLlib (Machine Learning Library) is Spark’s scalable machine learning library. It provides tools for:

* Classification
* Regression
* Clustering
* Collaborative filtering
* Dimensionality reduction
* Feature engineering
* Model selection and evaluation

MLlib supports both RDD-based and DataFrame-based APIs, though the DataFrame API is now the primary interface.

---

**2. What are the main features of MLlib?**  

* Scalable and distributed processing
* Pipeline support (similar to scikit-learn)
* Built-in algorithms for classification, regression, clustering
* Feature transformation utilities (e.g., Tokenizer, VectorAssembler)
* Model persistence (save/load)
* Integration with Spark SQL and DataFrames
* Support for tuning and cross-validation

---

**3. How do you prepare data for ML in PySpark?**  

To train models in PySpark, data must be in the form of a DataFrame with:

* A **features** column: vector of features (Vector type)
* A **label** column: the target/output variable

You typically use:

* `StringIndexer` for categorical encoding
* `VectorAssembler` to combine multiple columns into one feature vector

Example:

```python
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["age", "salary"], outputCol="features")
final_df = assembler.transform(df)
```

---

**4. How do you use feature transformers?**  

Feature transformers prepare raw data for modeling. Common ones:

* `StringIndexer`: converts string labels into numeric indices
* `OneHotEncoder`: converts indices into one-hot encoded vectors
* `VectorAssembler`: merges multiple feature columns into a single vector
* `StandardScaler`: standardizes features to zero mean and unit variance
* `PCA`: for dimensionality reduction

Example:

```python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
df = indexer.fit(df).transform(df)
```

---

**5. How do you build and train ML models in PySpark?**  

You first create and fit a model using a MLlib Estimator like `LogisticRegression`, `DecisionTreeClassifier`, etc.

Example:

```python
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(training_data)
```

Then use `.transform()` on test data to make predictions:

```python
predictions = model.transform(test_data)
```

---

**6. How do you evaluate ML models?**  

You use `Evaluator` classes such as:

* `BinaryClassificationEvaluator`
* `MulticlassClassificationEvaluator`
* `RegressionEvaluator`

Example:

```python
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")
accuracy = evaluator.evaluate(predictions)
```

Metrics include:

* Area under ROC
* Precision, Recall
* RMSE, R² (for regression)

---

**7. How do you save and load ML models?**  

All models and pipelines in PySpark are **persistable** using `.save()` and `.load()`.

Example:

```python
model.save("path/to/model")
loaded_model = LogisticRegressionModel.load("path/to/model")
```

For pipelines:

```python
pipeline.save("path/to/pipeline")
```

Useful for deployment and retraining.

---

**8. What are pipelines in PySpark MLlib?**  

Pipelines help automate ML workflows. A `Pipeline` consists of a sequence of stages: Transformers and Estimators.

Example:

```python
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[indexer, assembler, lr])
model = pipeline.fit(train_data)
```

Advantages:

* Simplifies preprocessing + training
* Ensures consistency across train/test
* Supports model tuning and evaluation

---

**9. How do you tune hyperparameters in PySpark?**  

Use `ParamGridBuilder` and `CrossValidator`:

```python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(training_data)
```

---

**10. What algorithms are available in MLlib?**  

**Classification:**

* Logistic Regression
* Decision Tree
* Random Forest
* Gradient-Boosted Trees
* Naive Bayes
* Multilayer Perceptron

**Regression:**

* Linear Regression
* Decision Tree Regression
* Random Forest Regression

**Clustering:**

* KMeans
* Gaussian Mixture
* Bisecting K-Means

**Recommendation:**

* ALS (Alternating Least Squares)

**Dimensionality Reduction:**

* PCA
* ChiSqSelector

---



### ✅ **12. Databricks Platform**

---

**1. What is Databricks?**   
Databricks is a unified data analytics platform based on Apache Spark. It provides a collaborative environment for:

- Big data processing  
- Machine learning and AI  
- Real-time analytics  
- SQL-based analytics  
- Streaming analytics  

It integrates with cloud storage and other Azure/AWS services and offers auto-scaling, notebooks, workflows, and Delta Lake.

---

**2. What are the main features of Databricks?**
 
- **Interactive Notebooks** for collaborative development
- **Auto-scaling Clusters** with Apache Spark
- **Support for multiple languages**: Python, SQL, Scala, R
- **Built-in MLflow** for experiment tracking
- **Delta Lake support** for ACID transactions
- **Unity Catalog** for data governance
- **Databricks SQL & Dashboards** for BI reporting
- **Integration with Git & CI/CD tools**
- **Optimized performance using Photon engine**

---

**3. How do you create and manage Databricks notebooks?**  

- You can create notebooks in the Databricks workspace UI.
- Choose language: Python, Scala, SQL, or R.
- Notebooks can include code, visualizations, and markdown.
- You can schedule jobs from notebooks and export them to `.dbc` or `.ipynb`.

Shortcuts:
- `Shift + Enter`: Run a cell
- `%python`, `%sql`, `%scala`: Magic commands for multi-language support

---

**4. How do you run Spark jobs on Databricks?**  

- Attach your notebook to a running **Databricks cluster**.
- Use standard Spark APIs in PySpark or Scala.
- Jobs can be scheduled or triggered manually.
- For production jobs, use **Databricks Jobs** with task orchestration.

You can also submit jobs via REST API or CLI.

---

**5. What is a Databricks cluster?**  

A Databricks cluster is a set of compute resources for running Spark jobs.

Types:
- **Interactive Clusters**: Used for development (attached to notebooks).
- **Job Clusters**: Automatically created/destroyed when running scheduled jobs.

Cluster configurations:
- Node type (VM size)
- Auto-scaling
- Number of workers
- Libraries (PyPI, Maven, etc.)
- Init scripts
- Logging options

---

**6. How do you install libraries in Databricks?**  

- Via UI: Cluster > Libraries > Install New
- From PyPI, Maven, or upload your own `.whl` or `.jar` files
- Programmatically with `%pip install` or `%conda` in notebooks

You can also use **Init scripts** for installation during cluster startup.

---

**7. How do you use Databricks Jobs and Workflows?**  

Databricks Jobs allow you to run notebooks, JARs, or Python scripts as **scheduled or triggered workflows**.

Features:
- Multi-task workflows (DAG-based)
- Retry policies
- Task dependencies
- Email notifications
- Cluster re-use or per-task cluster

You can create jobs via UI, API, or Terraform.

---

**8. How do you manage data with Databricks DBFS?**  

DBFS (Databricks File System) is an abstraction over cloud storage (e.g., ADLS, S3).

- Accessible via `/dbfs/`
- Supports upload/download of files
- Accessible via `%fs` magic command or `dbutils.fs`

Example:
```python
dbutils.fs.ls("/FileStore/tables")
```

---

**9. How do you use Databricks Repos and version control?**  

- Repos allow you to integrate Git providers (GitHub, GitLab, Azure DevOps).
- Enables version control and collaboration inside Databricks.
- Use UI or Git CLI to sync code.
- Supports notebooks and .py/.scala files.

---

**10. How do you schedule jobs in Databricks?**  

Jobs can be scheduled:
- **From notebooks or workflows**
- Using **time-based schedules** (e.g., daily at 6AM)
- Based on **event triggers** (e.g., file arrival)
- Programmatically via API

You can configure retry settings, email alerts, and access control for job runs.

---

**11. How do you use Databricks REST API?**  

The REST API allows full control over:

- Jobs (create, list, run)
- Clusters (start, terminate)
- DBFS file operations
- Secrets and workspace items
- Token generation

Example using `curl`:
```bash
curl -X GET https://<databricks-instance>/api/2.0/clusters/list \
  -H "Authorization: Bearer <token>"
```

---

**12. How do you monitor and debug jobs in Databricks?**  

Use:
- **Job Run History** and logs (stdout/stderr)
- **Spark UI** (Stages, Tasks, DAG)
- **Ganglia metrics** for system-level performance
- **Driver and executor logs** for detailed debugging
- **Event Logs** and **Audit Logs** in Unity Catalog

---

**13. How do you use Databricks SQL and dashboards?**  

Databricks SQL:
- Allows SQL analysts to query data
- Supports BI integrations (Power BI, Tableau)
- Save queries and build dashboards

Dashboards:
- Visual components (bar chart, line, pie, tables)
- Schedule refresh
- Share with teams

---

**14. How do you connect Databricks to external data sources?**  

- **JDBC/ODBC** for databases
- **S3/ADLS** for cloud storage
- **Azure Synapse, SQL Server, MySQL, Snowflake, Redshift**

Connections can be managed using:
- Mount points
- `spark.read.format("jdbc")`
- Secret scopes for credentials

---

**15. What are Databricks Delta Lake and its benefits?**   

Delta Lake adds ACID transactions, schema enforcement, and time travel to Spark.

Benefits:
- **Atomicity** with merge/upsert operations
- **Schema enforcement and evolution**
- **Time travel** for historical data access
- **Efficient updates and deletes**
- **Better performance with Z-ordering and caching**

---

**16. What are the different cluster modes in Databricks   (interactive, job)?**  

- **Interactive Clusters**: For development & experimentation
- **Job Clusters**: Created by jobs and terminated after use

Choose job clusters for cost-efficiency and production tasks.

---

**17. What is Unity Catalog and how is it used?**  

Unity Catalog provides centralized metadata management and data governance.

Features:
- Fine-grained access control (RBAC)
- Centralized audit logs
- Cross-workspace catalog
- Column-level permissions
- Data lineage tracking

---

**18. How does Databricks handle user access and RBAC?**  

- **Workspace-level**: Admin, user, read-only roles
- **Cluster-level**: Who can start/attach/terminate clusters
- **Table-level** (Unity Catalog): SQL grants (`GRANT SELECT ON ...`)
- **Secret scopes**: Control access to secrets

---

**19. How do you manage compute costs in Databricks?**  

- Use **auto-termination** and **auto-scaling**
- Prefer **job clusters** over all-purpose clusters
- Monitor usage via **Cost Management Dashboards**
- Use **Photon Runtime** for performance boosts
- Turn off unused features (e.g., Unity Catalog on dev)

---



### ✅ **13. Delta Lake**

---

**1. What is Delta Lake?**   
Delta Lake is an open-source storage layer that brings **ACID transactions**, **schema enforcement**, and **time travel** to Apache Spark and big data workloads. It is built on top of Parquet files and integrates tightly with Spark to provide reliable and scalable data lakes.

---

**2. What are the benefits of Delta Lake over Parquet?**  

| Feature                    | Parquet                | Delta Lake             |
|---------------------------|------------------------|------------------------|
| ACID Transactions         | ❌ No                  | ✅ Yes                 |
| Time Travel               | ❌ No                  | ✅ Yes (`VERSION AS OF`)|
| Schema Evolution          | ❌ Manual               | ✅ Auto-supported      |
| Upsert (MERGE INTO)       | ❌ Complex              | ✅ Simple & efficient  |
| Data Lineage              | ❌ No                  | ✅ Yes (Delta Logs)     |
| CDC (Change Data Capture) | ❌ Not native          | ✅ Built-in            |

---

**3. How do you create Delta tables?**  

You can create a Delta table in Databricks or PySpark using:

```python
df.write.format("delta").save("/mnt/datalake/sales_delta")
```

Or register it as a managed table:

```sql
CREATE TABLE sales
USING DELTA
LOCATION '/mnt/datalake/sales_delta';
```

---

**4. How do you perform ACID transactions in Delta Lake?**  

Delta Lake maintains a **transaction log** (`_delta_log`) that records all changes. You can:

- **INSERT**, **UPDATE**, **DELETE**, or **MERGE** data
- Multiple users can safely read/write concurrently
- Rollbacks and retries are consistent and recoverable

Example of MERGE:
```sql
MERGE INTO target_table t
USING updates u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.name = u.name
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (u.id, u.name);
```

---

**5. How do you update and delete data in Delta tables?**  

Update Example:
```python
spark.sql("""
  UPDATE sales_delta
  SET revenue = revenue * 1.1
  WHERE region = 'East'
""")
```

Delete Example:
```python
spark.sql("""
  DELETE FROM sales_delta
  WHERE date < '2022-01-01'
""")
```

---

**6. What is time travel in Delta Lake?**  

Delta Lake supports **Time Travel**, which lets you access previous versions of data using:

- **VERSION AS OF** (by version number)
- **TIMESTAMP AS OF** (by datetime)

Example:
```sql
SELECT * FROM sales VERSION AS OF 5;
```

You can also use:
```python
spark.read.format("delta").option("versionAsOf", 3).load("path_to_table")
```

---

**7. How do you optimize Delta tables?**  

Use the `OPTIMIZE` command to compact small files into larger ones, improving query performance.

```sql
OPTIMIZE sales
```

For sorting files to enhance filter performance:

```sql
OPTIMIZE sales ZORDER BY (customer_id)
```

---

**8. How do you handle schema evolution in Delta Lake?**  

Delta Lake supports automatic schema evolution when:

- Inserting new columns
- Changing column types (in some cases)

Enable with:
```python
df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save("/path")
```

---

**9. How do you use Delta Lake with streaming data?**  

You can read and write Delta tables as streaming sources/sinks:

**Streaming read:**
```python
spark.readStream.format("delta").load("/path_to_delta_table")
```

**Streaming write:**
```python
stream_df.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/path_to_checkpoint")
  .start("/path_to_delta_table")
```

Delta provides **exactly-once** guarantees for stream processing.

---

**10. How does Delta Lake support upserts (merge)?**  

Delta Lake enables `MERGE INTO` operation for upsert logic.

Example:
```sql
MERGE INTO customers AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN
  UPDATE SET target.name = source.name
WHEN NOT MATCHED THEN
  INSERT (id, name) VALUES (source.id, source.name);
```

This simplifies CDC and real-time data warehousing use cases.

---

**11. What is Z-order clustering and why is it used?**  

`ZORDER` is a file-level optimization in Delta Lake that co-locates related data within files to speed up query filtering.

Example:
```sql
OPTIMIZE orders ZORDER BY (customer_id, order_date)
```

Used when you filter on multiple columns often – it helps minimize the number of files read during a query.

---

**12. What are `OPTIMIZE`, `VACUUM`, and `REORG TABLE` used for?**  

- **`OPTIMIZE`**: Compacts small files and organizes data (especially with `ZORDER`)
- **`VACUUM`**: Physically removes old files (default retention is 7 days)
  ```sql
  VACUUM sales RETAIN 168 HOURS;
  ```
- **`REORG TABLE`**: Alias to optimize in some enterprise platforms (not default in Databricks)

---




### ✅ **14. Security & Best Practices**

---

**1. How do you secure data in Spark and Databricks?**  
Security in Spark and Databricks can be implemented at several levels:

* **Authentication**: SSO (Single Sign-On) using Azure AD or other IdPs.
* **Authorization**: Role-Based Access Control (RBAC) to manage user privileges.
* **Data Encryption**: Data is encrypted **at rest** using storage encryption (e.g., Azure-managed keys) and **in transit** using TLS.
* **Token-based Access**: Personal Access Tokens (PAT) or Azure Active Directory tokens for APIs.

---

**2. What are access controls in Databricks?**  
Databricks supports **Unity Catalog**, which centralizes access control across:

* **Workspaces**
* **Clusters**
* **Data objects** (Tables, Views, Schemas, Catalogs)

You can assign **user roles** and manage access via:

* SQL GRANT statements
* Admin Console
* Azure/Cloud RBAC integration

Example:

```sql
GRANT SELECT ON TABLE sales TO `data_analyst`;
```

---

**3. How do you manage secrets in Databricks?**  
Secrets like credentials, API keys, or DB passwords can be stored in **Databricks Secret Scopes**:

* Use the **Databricks CLI** or UI to create secret scopes.
* Secrets are accessed in notebooks using:

  ```python
  dbutils.secrets.get(scope="my-scope", key="db-password")
  ```
* Optionally integrate with **Azure Key Vault** for external management.

---

**4. What are best practices for Spark job development?**  

* **Avoid shuffling large datasets unnecessarily**.
* **Cache** intermediate DataFrames only when reused multiple times.
* Use **DataFrames** over RDDs for optimization.
* Apply **filtering as early as possible** (pushdown predicate).
* Use **partitioning** and **bucketing** for large datasets.
* Use **broadcast joins** for small lookup tables.
* Always **test locally** on a subset before scaling to production.

---

**5. How do you monitor and log Spark applications?**  

* Use **Spark UI** to inspect jobs, stages, and tasks.
* In Databricks:

  * View **Job Runs** and their detailed metrics.
  * Use **driver and executor logs** for debugging.
* Enable **Ganglia/Spark metrics** to monitor cluster health.
* Use **log4j.properties** for custom logging configuration.

---

**6. How do you handle sensitive data in Spark?**  

* Mask or encrypt sensitive columns (e.g., PII).
* Use **column-level access controls** via Unity Catalog.
* Store secrets in **Databricks Secrets** instead of hardcoding them.
* Apply **data anonymization** techniques before storing in open datasets.
* Implement **audit logging** to track data access and operations.

---

**7. What is Unity Catalog and how does it help with governance?**  

Unity Catalog is a unified governance solution in Databricks that:

* Centralizes **data access control** and **audit logging**.
* Enforces **fine-grained permissions** across workspaces.
* Provides a single **metastore** for all your data.
* Enables **lineage tracking** for datasets and transformations.
* Supports **multi-cloud environments** with consistent policies.

---

**8. What is the principle of least privilege in Spark/Databricks?**  

This principle ensures that users are only granted **the minimum permissions necessary** to perform their tasks.

For example:

* Developers can read staging data but not production data.
* Analysts can query tables but not modify schemas.
* Jobs run with service principals having access only to needed resources.

This reduces the risk of **accidental data exposure** or **unauthorized access**.

---

**9. How do you handle logging in Databricks notebooks?**  

* Use Python’s built-in `logging` module:

  ```python
  import logging
  logging.basicConfig(level=logging.INFO)
  logging.info("Processing started...")
  ```
* Use `%sh` cell magic for system logs.
* Leverage **structured logs** via JSON log format for downstream parsing.
* Use **dbutils.fs.put** to store logs in ADLS or DBFS.

---

**10. How do you manage audit logs in Databricks?**  

* Enable **Audit Logs** via **Azure Monitor** or **AWS CloudTrail** depending on platform.
* Audit logs include:

  * Login attempts
  * Job executions
  * Notebook access
  * Table access
* Export logs to **Log Analytics**, **SIEM tools**, or **Blob Storage** for analysis.

---




### ✅ **Section 15: Advanced Topics**

---

**1. What is GraphFrames in Spark?**   
GraphFrames is a Spark API built on top of DataFrames that allows for **graph processing** and **graph-parallel computation**.

- It models data as **vertices** (nodes) and **edges** (relationships).
- Provides graph algorithms like **PageRank**, **Connected Components**, **Breadth-First Search (BFS)**.

Example:
```python
from graphframes import GraphFrame
vertices = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "name"])
edges = spark.createDataFrame([(1, 2, "knows")], ["src", "dst", "relationship"])
g = GraphFrame(vertices, edges)
g.inDegrees.show()
```

Use case: Social networks, recommendation engines, network topology.

---

**2. How do you use SparkR?**   
**SparkR** is an R package that provides a frontend to use Apache Spark with R language.

- Supports distributed **DataFrame** operations and **MLlib**.
- Can use `sparkR.session()` to start a session.
- Commonly used for **data analysis**, **ETL**, and **ML** in R environments.

Example:
```r
df <- read.df("data.csv", "csv", header="true", inferSchema="true")
head(select(df, df$column1))
```

---

**3. How do you integrate Spark with BI tools?**  

Spark can be integrated with tools like:

- **Power BI / Tableau**: via **ODBC/JDBC** connectors.
- **Databricks SQL endpoint**: expose SQL-compatible interface.
- **Apache Superset**: for open-source dashboards.
- Publish preprocessed results to **SQL warehouses** or **Delta Tables** for reporting.

---

**4. How do you use Spark with cloud storage (S3, ADLS, GCS)?**  

Spark integrates natively with cloud storage:

- **S3** (AWS): `s3a://bucket-name/path/`
- **ADLS Gen2** (Azure): via `abfss://` with OAuth credentials or managed identity.
- **GCS** (Google): via `gs://` with service account keys.

You must configure Spark with:
- Hadoop-compatible drivers
- Cloud-specific credentials
- Spark configs like `fs.s3a.access.key`, `fs.azure.account.key`, etc.

---

**5. How do you use Spark with Kubernetes?**  

Spark-on-Kubernetes allows running Spark jobs in containers:

- Use the `spark-submit` CLI with Kubernetes master:
  ```bash
  spark-submit \
    --master k8s://https://<k8s-master> \
    --deploy-mode cluster \
    --conf spark.kubernetes.container.image=<image> \
    ...
  ```
- Spark driver and executors run as pods.
- Benefits: scalability, resource isolation, and cloud-native deployment.

---

**6. What is Koalas and how does it relate to Pandas?**  

**Koalas** was a library that provided a **Pandas-like API** on top of PySpark DataFrames.

- Bridged the gap between **Pandas** and **Spark**.
- Deprecated in favor of **Pandas API on Spark**, now built-in in Spark 3.2+.
  ```python
  import pyspark.pandas as ps
  psdf = ps.read_csv("data.csv")
  ```

---

**7. How do you use Databricks Connect?**  

**Databricks Connect** allows you to:

- Develop Spark code in local IDE (VSCode, PyCharm).
- Run the code remotely on **Databricks clusters**.
- Benefits:
  - Faster local dev cycle
  - Full power of cloud compute

You configure `databricks-connect` with cluster settings and use `SparkSession` as normal.

---

**8. How do you use MLflow in Databricks?**  

**MLflow** is an open-source tool for:

- **Tracking** ML experiments (metrics, parameters)
- **Packaging** models
- **Serving** models
- Built-in with Databricks

Example:
```python
import mlflow
with mlflow.start_run():
    mlflow.log_param("alpha", 0.5)
    mlflow.log_metric("rmse", 0.9)
```

You can view runs in **MLflow UI** or export models.

---

**9. What is Apache Arrow and how does it improve performance?**  

**Apache Arrow** is a **columnar in-memory format** for efficient data exchange.

- Improves performance between **Pandas ↔ PySpark**, or **Spark ↔ JVM ↔ Python**.
- Enables **zero-copy reads**, reducing serialization cost.
- Enable it in PySpark using:
  ```python
  spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
  ```

---

**10. How do you integrate Spark with Snowflake or BigQuery?**  

Spark can read/write to Snowflake and BigQuery using connectors:

- **Snowflake Spark Connector**: Use JDBC and native connector:
  ```python
  df.write.format("snowflake").option(...).save()
  ```
- **BigQuery Connector for Spark**: Use Google-provided JARs to read/write tables:
  ```python
  df.write.format("bigquery").option("table", "my_dataset.my_table").save()
  ```

Both allow Spark to **offload computation** or **integrate data pipelines** with cloud data warehouses.

---

**11. How do you implement CI/CD pipelines for Databricks?**  

CI/CD in Databricks includes:

- **Source control**: Use GitHub, Azure Repos, GitLab with **Databricks Repos**.
- **Build pipelines**:
  - Test notebooks via `pytest`, `dbx`, or REST API
  - Use **Databricks CLI** or `databricks-connect` for deployment
- **Release pipelines**:
  - Deploy notebooks, libraries, and jobs to dev/test/prod
  - Use **Azure DevOps Pipelines** or **GitHub Actions**

Supports **automated validation**, **notebook testing**, and **multi-environment deployments**.

---
