<a href="https://colab.research.google.com/github/shreyasomkuwar26/oracle-db-object-dependency/blob/main/Data_Engineering_Questionnarire.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Technical Interview Questionnaire
**Target: 4-7 Years Experience | Data Engineering & Cloud Technologies**

---

## **SQL & PL/SQL Oracle**

### **Easy**

**Q1. What is the difference between DELETE, TRUNCATE, and DROP commands?**
- **DELETE**: DML command, removes rows based on WHERE clause, can be rolled back, triggers fire, slower
- **TRUNCATE**: DDL command, removes all rows, cannot be rolled back (in most cases), triggers don't fire, faster, resets high water mark
- **DROP**: DDL command, removes the entire table structure and data, cannot be rolled back

**Q2. Write a query to find the second highest salary from an Employee table.**
```sql
SELECT MAX(salary)
FROM Employee
WHERE salary < (SELECT MAX(salary) FROM Employee);

-- Alternative using ROW_NUMBER
SELECT salary FROM (
  SELECT salary, ROW_NUMBER() OVER (ORDER BY salary DESC) as rn
  FROM Employee
) WHERE rn = 2;
```

**Q3. What are the different types of joins in SQL? Explain with examples.**
- **INNER JOIN**: Returns matching rows from both tables
- **LEFT JOIN**: Returns all rows from left table and matching rows from right
- **RIGHT JOIN**: Returns all rows from right table and matching rows from left
- **FULL OUTER JOIN**: Returns all rows from both tables
- **CROSS JOIN**: Cartesian product of both tables

### **Medium**

**Q4. Explain the difference between RANK(), DENSE_RANK(), and ROW_NUMBER() window functions.**
- **ROW_NUMBER()**: Assigns unique sequential integers (1,2,3,4...)
- **RANK()**: Assigns rank with gaps for ties (1,2,2,4...)
- **DENSE_RANK()**: Assigns rank without gaps for ties (1,2,2,3...)

Example: For salaries [100, 90, 90, 80]
- ROW_NUMBER: 1,2,3,4
- RANK: 1,2,2,4
- DENSE_RANK: 1,2,2,3

**Q5. Write a PL/SQL block to handle exceptions when inserting duplicate records.**
```sql
DECLARE
  v_emp_id NUMBER := 101;
  duplicate_error EXCEPTION;
  PRAGMA EXCEPTION_INIT(duplicate_error, -00001);
BEGIN
  INSERT INTO employees VALUES (v_emp_id, 'John Doe', 50000);
  COMMIT;
EXCEPTION
  WHEN duplicate_error THEN
    DBMS_OUTPUT.PUT_LINE('Employee ID already exists');
  WHEN OTHERS THEN
    DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
    ROLLBACK;
END;
```

**Q6. Scenario: You need to find employees who haven't made any sales in the last 6 months. Write the query.**
```sql
SELECT e.emp_id, e.emp_name
FROM employees e
WHERE NOT EXISTS (
  SELECT 1
  FROM sales s
  WHERE s.emp_id = e.emp_id
  AND s.sale_date >= ADD_MONTHS(SYSDATE, -6)
);
```

### **Hard**

**Q7. Explain the concept of Materialized Views and when would you use them over regular views?**
- **Materialized Views**: Physical copy of query results stored on disk, can be refreshed on demand or schedule
- **Use Cases**:
  - Complex aggregations on large datasets
  - Queries with heavy joins
  - Reports requiring fast response times
  - Data warehouse scenarios with periodic refreshes
- **Advantages**: Faster query performance, reduced computation
- **Disadvantages**: Storage overhead, data may be stale between refreshes

**Q8. Write a PL/SQL procedure with bulk collect and FORALL to efficiently update 1 million records.**
```sql
CREATE OR REPLACE PROCEDURE update_salary_bulk IS
  TYPE emp_tab IS TABLE OF employees%ROWTYPE;
  v_employees emp_tab;
  
  CURSOR c_emp IS
    SELECT * FROM employees WHERE department_id = 10;
BEGIN
  OPEN c_emp;
  LOOP
    FETCH c_emp BULK COLLECT INTO v_employees LIMIT 10000;
    EXIT WHEN v_employees.COUNT = 0;
    
    FORALL i IN 1..v_employees.COUNT
      UPDATE employees
      SET salary = v_employees(i).salary * 1.1
      WHERE emp_id = v_employees(i).emp_id;
    
    COMMIT;
  END LOOP;
  CLOSE c_emp;
END;
```

**Q9. Scenario: Design a solution to capture and log all DML operations on a critical table without impacting performance.**

**Answer**: Implement Database Triggers with Autonomous Transactions
```sql
CREATE TABLE audit_log (
  table_name VARCHAR2(50),
  operation VARCHAR2(10),
  old_value CLOB,
  new_value CLOB,
  username VARCHAR2(50),
  change_date DATE
);

CREATE OR REPLACE TRIGGER trg_employee_audit
AFTER INSERT OR UPDATE OR DELETE ON employees
FOR EACH ROW
DECLARE
  PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
  IF INSERTING THEN
    INSERT INTO audit_log VALUES ('EMPLOYEES', 'INSERT', NULL,
      :NEW.emp_id||','||:NEW.salary, USER, SYSDATE);
  ELSIF UPDATING THEN
    INSERT INTO audit_log VALUES ('EMPLOYEES', 'UPDATE',
      :OLD.emp_id||','||:OLD.salary, :NEW.emp_id||','||:NEW.salary,
      USER, SYSDATE);
  ELSIF DELETING THEN
    INSERT INTO audit_log VALUES ('EMPLOYEES', 'DELETE',
      :OLD.emp_id||','||:OLD.salary, NULL, USER, SYSDATE);
  END IF;
  COMMIT;
END;
```

---

## **Python**

### **Easy**

**Q10. What is the difference between list and tuple in Python?**
- **List**: Mutable, defined with [], slower, methods like append/remove
- **Tuple**: Immutable, defined with (), faster, used as dictionary keys
- **Use Case**: Lists for collections that change, tuples for fixed data

**Q11. Explain list comprehension with an example.**
```python
# Traditional approach
squares = []
for i in range(10):
    squares.append(i**2)

# List comprehension
squares = [i**2 for i in range(10)]

# With condition
even_squares = [i**2 for i in range(10) if i % 2 == 0]
```

**Q12. What are *args and **kwargs? Provide examples.**
```python
# *args - variable positional arguments
def sum_all(*args):
    return sum(args)

print(sum_all(1, 2, 3, 4))  # Output: 10

# **kwargs - variable keyword arguments
def print_info(**kwargs):
    for key, value in kwargs.items():
        print(f"{key}: {value}")

print_info(name="John", age=30, city="NYC")
```

### **Medium**

**Q13. Explain the difference between deep copy and shallow copy.**
```python
import copy

original = [[1, 2, 3], [4, 5, 6]]

# Shallow copy - copies object reference
shallow = copy.copy(original)
shallow[0][0] = 99
print(original)  # [[99, 2, 3], [4, 5, 6]] - original changed!

# Deep copy - creates independent copy
original = [[1, 2, 3], [4, 5, 6]]
deep = copy.deepcopy(original)
deep[0][0] = 99
print(original)  # [[1, 2, 3], [4, 5, 6]] - original unchanged
```

**Q14. What are Python decorators? Write a decorator to measure function execution time.**
```python
import time
from functools import wraps

def timing_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"{func.__name__} took {end - start:.4f} seconds")
        return result
    return wrapper

@timing_decorator
def process_data():
    time.sleep(2)
    return "Done"
```

**Q15. Scenario: You have a list of dictionaries representing employee records. Filter employees with salary > 50000 and sort by name.**
```python
employees = [
    {'name': 'John', 'salary': 60000, 'dept': 'IT'},
    {'name': 'Alice', 'salary': 45000, 'dept': 'HR'},
    {'name': 'Bob', 'salary': 75000, 'dept': 'IT'}
]

# Solution
filtered = sorted(
    [emp for emp in employees if emp['salary'] > 50000],
    key=lambda x: x['name']
)

# Using filter and sorted
filtered = sorted(
    filter(lambda x: x['salary'] > 50000, employees),
    key=lambda x: x['name']
)
```

### **Hard**

**Q16. Explain Python's GIL (Global Interpreter Lock) and its implications.**
- **GIL**: Mutex that protects access to Python objects, prevents multiple threads from executing Python bytecode simultaneously
- **Implications**:
  - CPU-bound multi-threaded programs don't benefit from multiple cores
  - I/O-bound operations can still benefit from threading
  - Use multiprocessing for CPU-intensive tasks
  - Use asyncio for I/O-intensive tasks

**Q17. Implement a context manager for database connection handling.**
```python
class DatabaseConnection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connection = None
    
    def __enter__(self):
        # Establish connection
        self.connection = self._connect()
        print(f"Connected to {self.host}:{self.port}")
        return self.connection
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        # Close connection
        if self.connection:
            self.connection.close()
            print("Connection closed")
        if exc_type:
            print(f"Exception occurred: {exc_val}")
        return False  # Propagate exceptions
    
    def _connect(self):
        # Simulated connection
        return {"status": "connected"}

# Usage
with DatabaseConnection("localhost", 5432) as conn:
    # Work with connection
    pass
```

**Q18. Scenario: Design a solution to process a 10GB CSV file that doesn't fit in memory.**
```python
import pandas as pd

def process_large_file(file_path, chunk_size=100000):
    """
    Process large CSV in chunks
    """
    results = []
    
    # Read in chunks
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # Process each chunk
        processed = chunk[chunk['amount'] > 1000]
        aggregated = processed.groupby('category')['amount'].sum()
        results.append(aggregated)
    
    # Combine results
    final_result = pd.concat(results).groupby(level=0).sum()
    return final_result

# Alternative: Using generators
def process_with_generator(file_path):
    with open(file_path, 'r') as f:
        header = next(f)
        for line in f:
            # Process line by line
            yield process_line(line)
```

---

## **PySpark**

### **Easy**

**Q19. What is the difference between transformation and action in Spark?**
- **Transformations**: Lazy operations that create new RDD/DataFrame (map, filter, select, groupBy)
  - Not executed immediately
  - Create DAG (Directed Acyclic Graph)
- **Actions**: Trigger execution and return results (collect, count, show, save)
  - Execute all transformations in lineage
- **Example**:
  - `df.filter()` - transformation
  - `df.count()` - action

**Q20. Write PySpark code to read a CSV file and display first 10 rows.**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ReadCSV") \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("s3://bucket/data.csv")

df.show(10)
```

**Q21. What is the difference between repartition() and coalesce()?**
- **repartition()**: Full shuffle, can increase or decrease partitions, distributes data evenly
- **coalesce()**: Minimizes shuffle, only decreases partitions, moves data to fewer partitions
- **Use Case**: Use coalesce for reducing partitions efficiently, repartition when need even distribution

### **Medium**

**Q22. Explain the difference between map() and flatMap() transformations.**
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize(["Hello World", "PySpark Tutorial"])

# map - one-to-one mapping
mapped = rdd.map(lambda x: x.split())
print(mapped.collect())  
# [['Hello', 'World'], ['PySpark', 'Tutorial']]

# flatMap - one-to-many, flattens result
flat_mapped = rdd.flatMap(lambda x: x.split())
print(flat_mapped.collect())  
# ['Hello', 'World', 'PySpark', 'Tutorial']
```

**Q23. Write PySpark code to perform an inner join between two DataFrames.**
```python
# Sample DataFrames
employees = spark.createDataFrame([
    (1, "John", 10),
    (2, "Alice", 20),
    (3, "Bob", 10)
], ["emp_id", "name", "dept_id"])

departments = spark.createDataFrame([
    (10, "IT"),
    (20, "HR"),
    (30, "Finance")
], ["dept_id", "dept_name"])

# Inner Join
result = employees.join(
    departments,
    employees.dept_id == departments.dept_id,
    "inner"
).select(
    employees.emp_id,
    employees.name,
    departments.dept_name
)

result.show()
```

**Q24. Scenario: You need to find duplicate records in a dataset based on multiple columns. Write the code.**
```python
from pyspark.sql import Window
from pyspark.sql.functions import col, count

# Method 1: Using groupBy
duplicates = df.groupBy("col1", "col2", "col3") \
    .count() \
    .filter(col("count") > 1)

# Method 2: Using window function
window_spec = Window.partitionBy("col1", "col2", "col3")

df_with_count = df.withColumn(
    "duplicate_count",
    count("*").over(window_spec)
)

duplicates = df_with_count.filter(col("duplicate_count") > 1)
```

### **Hard**

**Q25. Explain Spark's execution model: Driver, Executors, and how tasks are distributed.**
- **Driver**: Main program, creates SparkContext, converts user program to tasks, schedules tasks
- **Cluster Manager**: Allocates resources (YARN, Mesos, Kubernetes, Standalone)
- **Executors**: Worker nodes, execute tasks, store data for caching
- **Execution Flow**:
  1. Driver creates logical plan (DAG)
  2. DAG Scheduler divides into stages
  3. Task Scheduler assigns tasks to executors
  4. Executors run tasks and return results

**Q26. Write optimized PySpark code to handle slowly changing dimensions (SCD Type 2).**
```python
from pyspark.sql.functions import col, lit, current_timestamp, when

def process_scd_type2(source_df, target_df, key_cols, compare_cols):
    """
    Implements SCD Type 2 logic
    """
    # Join source and target
    joined = source_df.alias("src").join(
        target_df.alias("tgt"),
        key_cols,
        "left"
    )
    
    # Identify changes
    change_condition = None
    for col_name in compare_cols:
        condition = col(f"src.{col_name}") != col(f"tgt.{col_name}")
        change_condition = condition if change_condition is None else change_condition | condition
    
    # New records
    new_records = joined.filter(col("tgt.id").isNull()) \
        .select("src.*") \
        .withColumn("effective_date", current_timestamp()) \
        .withColumn("end_date", lit(None)) \
        .withColumn("is_current", lit(True))
    
    # Changed records - expire old
    expired_records = joined.filter(change_condition) \
        .select("tgt.*") \
        .withColumn("end_date", current_timestamp()) \
        .withColumn("is_current", lit(False))
    
    # Changed records - insert new
    updated_records = joined.filter(change_condition) \
        .select("src.*") \
        .withColumn("effective_date", current_timestamp()) \
        .withColumn("end_date", lit(None)) \
        .withColumn("is_current", lit(True))
    
    # Union all
    result = new_records.union(updated_records).union(expired_records)
    
    return result
```

**Q27. Scenario: Optimize a PySpark job that's running slow due to data skew in a join operation.**
```python
from pyspark.sql.functions import rand, concat, lit

def handle_data_skew(df1, df2, join_key):
    """
    Handle skewed join using salting technique
    """
    # Add salt to larger DataFrame
    salt_range = 10
    df1_salted = df1.withColumn(
        "salt",
        (rand() * salt_range).cast("int")
    ).withColumn(
        "salted_key",
        concat(col(join_key), lit("_"), col("salt"))
    )
    
    # Replicate smaller DataFrame
    df2_replicated = df2.withColumn(
        "salt",
        explode(array([lit(i) for i in range(salt_range)]))
    ).withColumn(
        "salted_key",
        concat(col(join_key), lit("_"), col("salt"))
    )
    
    # Perform join on salted key
    result = df1_salted.join(
        df2_replicated,
        "salted_key"
    ).drop("salt", "salted_key")
    
    return result

# Additional optimizations:
# 1. Broadcast smaller table if < 10MB
# result = df1.join(broadcast(df2), join_key)

# 2. Repartition before join
# df1 = df1.repartition(200, join_key)

# 3. Cache frequently used DataFrames
# df1.cache()
```

---

## **AWS Cloud**

### **Easy**

**Q28. What is the difference between S3 Standard and S3 Glacier storage classes?**
- **S3 Standard**: Frequent access, millisecond latency, higher cost, 99.99% availability
- **S3 Glacier**: Archive storage, minutes-hours retrieval time, lower cost, long-term retention
- **Use Case**: Standard for active data, Glacier for compliance/archival

**Q29. Explain the difference between IAM Roles and IAM Users.**
- **IAM Users**: Permanent credentials, for specific people/applications, long-term access
- **IAM Roles**: Temporary credentials, can be assumed by users/services, short-term access
- **Best Practice**: Use roles for applications, services, and cross-account access

**Q30. What are the different EC2 instance types and their use cases?**
- **General Purpose (t3, m5)**: Balanced compute/memory/network
- **Compute Optimized (c5)**: High-performance processors
- **Memory Optimized (r5)**: Large datasets in memory
- **Storage Optimized (i3)**: High sequential read/write
- **GPU Instances (p3)**: Machine learning, graphics rendering

### **Medium**

**Q31. Design a solution for secure data transfer from on-premises to S3.**

**Answer**:
- **Option 1: AWS Direct Connect** - Dedicated network connection, consistent performance
- **Option 2: AWS DataSync** - Automated data transfer, encryption in transit
- **Option 3: AWS Snowball** - Physical device for large-scale migration
- **Security**:
  - Use SSL/TLS for encryption in transit
  - Enable S3 server-side encryption (SSE-S3, SSE-KMS)
  - Implement bucket policies and IAM roles
  - Enable S3 versioning and MFA delete

**Q32. Explain VPC components: Subnets, Route Tables, Internet Gateway, NAT Gateway.**
- **VPC**: Isolated network in AWS cloud
- **Subnets**: IP address ranges within VPC (public/private)
- **Internet Gateway**: Allows internet access for public subnets
- **NAT Gateway**: Allows private subnet instances to access internet
- **Route Tables**: Define traffic routing rules
- **Security Groups**: Instance-level firewall
- **NACLs**: Subnet-level firewall

**Q33. Scenario: Your application on EC2 needs to read from S3. What's the secure way to grant access?**

**Answer**: Use IAM Roles
```python
# Steps:
# 1. Create IAM role with S3 read policy
{
    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
        "Action": [
            "s3:GetObject",
            "s3:ListBucket"
        ],
        "Resource": [
            "arn:aws:s3:::my-bucket/*",
            "arn:aws:s3:::my-bucket"
        ]
    }]
}

# 2. Attach role to EC2 instance
# 3. Application uses AWS SDK with default credentials
import boto3

s3 = boto3.client('s3')  # Automatically uses instance role
response = s3.get_object(Bucket='my-bucket', Key='data.csv')
```

### **Hard**

**Q34. Design a highly available and fault-tolerant architecture for a data processing application.**

**Answer**:
```
Architecture Components:
1. Multi-AZ deployment across 3 availability zones
2. Application Load Balancer (ALB) for traffic distribution
3. Auto Scaling Group for EC2 instances (min: 2, max: 10)
4. RDS Multi-AZ for database high availability
5. S3 for data storage with versioning enabled
6. ElastiCache for caching layer
7. CloudWatch for monitoring and alarms
8. Route 53 for DNS with health checks

Data Flow:
Users → Route 53 → ALB → EC2 (Multi-AZ) → RDS Multi-AZ
                              ↓
                        ElastiCache → S3

Disaster Recovery:
- Automated backups to S3
- Cross-region replication for critical data
- RTO: 1 hour, RPO: 15 minutes
```

**Q35. Explain AWS Lambda cold start and strategies to minimize it.**

**Answer**:
- **Cold Start**: Initialization time when new container is created (package loading, runtime initialization)
- **Impact**: 100ms - 3+ seconds delay

**Mitigation Strategies**:
1. **Provisioned Concurrency**: Pre-initialized execution environments
2. **Keep functions warm**: Scheduled CloudWatch Events every 5 minutes
3. **Reduce package size**: Remove unnecessary dependencies, use layers
4. **Choose optimal runtime**: Lower-level languages (Go, Java) after warmup are faster
5. **Optimize code**: Minimize initialization code, lazy load dependencies
6. **Increase memory**: More memory = more CPU = faster startup

```python
# Example: Lambda with optimization
import json

# Global scope - initialized once per container
import boto3
s3_client = boto3.client('s3')

def lambda_handler(event, context):
    # Handler code - runs on every invocation
    bucket = event['bucket']
    key = event['key']
    
    response = s3_client.get_object(Bucket=bucket, Key=key)
    return {
        'statusCode': 200,
        'body': json.dumps('Success')
    }
```

**Q36. Scenario: Design a cost-optimized data lake architecture on AWS.**

**Answer**:
```
Architecture:
1. Data Ingestion:
   - AWS Kinesis Firehose → S3 (streaming data)
   - AWS DataSync (batch from on-premises)
   - S3 Transfer Acceleration for fast uploads

2. Storage Layers:
   - S3 Standard: Hot data (last 30 days)
   - S3 Standard-IA: Warm data (30-90 days)
   - S3 Glacier: Cold data (>90 days)
   - S3 Intelligent-Tiering: Unknown access patterns

3. Processing:
   - AWS Glue: ETL jobs, data catalog
   - Amazon Athena: Interactive queries
   - EMR with Spot Instances: Large-scale processing

4. Optimization:
   - Lifecycle policies for automatic tiering
   - Parquet/ORC format for better compression
   - Partition data by date/region
   - Use Glue Data Catalog for metadata
   - S3 Select/Glacier Select for filtered retrieval

5. Cost Monitoring:
   - AWS Cost Explorer
   - S3 Storage Lens
   - CloudWatch metrics
```

---

## **Databricks**

### **Easy**

**Q37. What is the difference between Databricks clusters and SQL Warehouses?**
- **Clusters**: General-purpose compute for notebooks, jobs, streaming (Spark clusters)
- **SQL Warehouses**: Optimized for SQL queries and BI tools, auto-scaling, serverless option
- **Use Case**: Clusters for data engineering/ML, SQL Warehouses for analytics/BI

**Q38. Explain Delta Lake and its key features.**
- **Delta Lake**: Open-source storage layer on top of data lakes
- **Features**:
  - ACID transactions
  - Time travel (data versioning)
  - Schema enforcement and evolution
  - Unified batch and streaming
  - Audit history
- **Benefits**: Reliability, performance, governance

**Q39. Write code to create a Delta table in Databricks.**
```python
# Create Delta table from DataFrame
df = spark.read.csv("/mnt/data/sales.csv", header=True)

df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/delta/sales")

# Create managed Delta table
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("sales")

# SQL syntax
spark.sql("""
    CREATE TABLE sales
    USING DELTA
    LOCATION '/mnt/delta/sales'
""")
```

### **Medium**

**Q40. Explain the difference between Managed and External tables in Databricks.**
- **Managed Tables**:
  - Databricks manages both metadata and data
  - Data stored in DBFS default location
  - DROP TABLE deletes both metadata and data
  
- **External Tables**:
  - Databricks manages only metadata
  - Data stored in user-specified location
  - DROP TABLE deletes only metadata, data remains

**Q41. Write code to perform MERGE operation (UPSERT) in Delta Lake.**
```python
from delta.tables import DeltaTable

# Target Delta table
target = DeltaTable.forPath(spark, "/mnt/delta/customers")

# Source updates
updates = spark.read.csv("/mnt/updates/customers.csv", header=True)

# MERGE operation
target.alias("target").merge(
    updates.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    set = {
        "name": "source.name",
        "email": "source.email",
        "updated_date": "current_timestamp()"
    }
).whenNotMatchedInsert(
    values = {
        "customer_id": "source.customer_id",
        "name": "source.name",
        "email": "source.email",
        "created_date": "current_timestamp()"
    }
).execute()
```

**Q42. Scenario: You need to recover data that was accidentally deleted 2 hours ago. How would you do it?**
```python
# Time Travel in Delta Lake

# Method 1: Using version number
df = spark.read.format("delta") \
    .option("versionAsOf", 10) \
    .load("/mnt/delta/sales")

# Method 2: Using timestamp
df = spark.read.format("delta") \
    .option("timestampAsOf", "2024-12-10 14:00:00") \
    .load("/mnt/delta/sales")

# Restore deleted data
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/mnt/delta/sales")

# Restore to previous version
delta_table.restoreToVersion(10)

# Or restore to timestamp
delta_table.restoreToTimestamp("2024-12-10 14:00:00")

# View history
spark.sql("DESCRIBE HISTORY delta.`/mnt/delta/sales`").show()
```

### **Hard**

**Q43. Explain Databricks' Adaptive Query Execution (AQE) and its benefits.**

**Answer**:
- **AQE**: Runtime optimization of query plans based on actual data statistics
- **Key Features**:
  1. **Dynamically coalescing shuffle partitions**: Reduces partitions post-shuffle
  2. **Dynamically switching join strategies**: Changes sort-merge to broadcast join
  3. **Dynamically optimizing skew joins**: Splits skewed partitions

```python
# Enable AQE (enabled by default in Databricks Runtime 7.3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Benefits:
# - Improved query performance
# - Better resource utilization
# - Automatic optimization without manual tuning
```

**Q44. Design an incremental data processing pipeline using Delta Lake and Structured Streaming.**
```python
from pyspark.sql.functions import col, current_timestamp

# Read streaming data from source
streaming_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/mnt/schema/") \
    .load("/mnt/incoming/")

# Transform data
transformed_df = streaming_df \
    .withColumn("processed_time", current_timestamp()) \
    .filter(col("amount") > 0)

# Write to Delta Lake with checkpointing
query = transformed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/sales") \
    .trigger(processingTime="5 minutes") \
    .start("/mnt/delta/sales")

# Downstream processing with Delta streaming
delta_stream = spark.readStream \
    .format("delta") \
    .load("/mnt/delta/sales")

# Aggregate and write to aggregated table
aggregated = delta_stream \
    .groupBy("category", window(col("processed_time"), "1 hour")) \
    .agg({"amount": "sum", "quantity": "sum"})

aggregated.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .option("checkpointLocation", "/mnt/checkpoints/agg") \
    .start("/mnt/delta/sales_hourly")
```

**Q45. Scenario: Optimize a slow-performing Delta table with 500 million records. What strategies would you use?**

**Answer**:
```python
# 1. OPTIMIZE - Compaction
spark.sql("OPTIMIZE delta.`/mnt/delta/large_table`")

# 2. Z-Ordering for frequently filtered columns
spark.sql("""
    OPTIMIZE delta.`/mnt/delta/large_table`

```python
    ZORDER BY (date, customer_id, region)
""")

# 3. Partition by high-cardinality column
df.write.format("delta") \
    .partitionBy("date") \
    .mode("overwrite") \
    .save("/mnt/delta/large_table_partitioned")

# 4. VACUUM to remove old files (after optimization)
spark.sql("""
    VACUUM delta.`/mnt/delta/large_table`
    RETAIN 168 HOURS
""")

# 5. Data skipping with statistics
spark.sql("""
    ANALYZE TABLE delta.`/mnt/delta/large_table`
    COMPUTE STATISTICS FOR ALL COLUMNS
""")

# 6. Enable Auto Optimize
spark.sql("""
    ALTER TABLE delta.`/mnt/delta/large_table`
    SET TBLPROPERTIES (
        delta.autoOptimize.optimizeWrite = true,
        delta.autoOptimize.autoCompact = true
    )
""")

# 7. Cache frequently accessed data
spark.sql("CACHE TABLE large_table")

# 8. Use liquid clustering (Databricks Runtime 13.3+)
spark.sql("""
    CREATE TABLE large_table_clustered
    USING DELTA
    CLUSTER BY (date, customer_id)
    AS SELECT * FROM large_table
""")

# Performance Monitoring
display(spark.sql("DESCRIBE DETAIL delta.`/mnt/delta/large_table`"))
display(spark.sql("DESCRIBE HISTORY delta.`/mnt/delta/large_table`"))
```

---

## **Cross-Technology Integration Questions**

### **Medium**

**Q46. Scenario: Design an end-to-end pipeline to ingest data from Oracle database to Databricks, transform it, and load to S3 as Parquet.**

**Answer**:
```python
# Step 1: Extract from Oracle using PySpark JDBC
jdbc_url = "jdbc:oracle:thin:@hostname:1521:ORCL"
connection_properties = {
    "user": "username",
    "password": "password",
    "driver": "oracle.jdbc.driver.OracleDriver"
}

# Incremental load using watermark
query = """(
    SELECT * FROM employees
    WHERE last_modified_date > TO_DATE('2024-12-01', 'YYYY-MM-DD')
) emp"""

df_oracle = spark.read.jdbc(
    url=jdbc_url,
    table=query,
    properties=connection_properties,
    numPartitions=10,
    column="emp_id",
    lowerBound=1,
    upperBound=100000
)

# Step 2: Transform in Databricks
from pyspark.sql.functions import col, when, upper, trim

df_transformed = df_oracle \
    .withColumn("email", upper(trim(col("email")))) \
    .withColumn("salary_band",
        when(col("salary") < 50000, "Low")
        .when(col("salary") < 100000, "Medium")
        .otherwise("High")
    ) \
    .filter(col("status") == "ACTIVE")

# Step 3: Write to Delta Lake (staging)
df_transformed.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/delta/employees_staging")

# Step 4: Apply business logic and write to S3 as Parquet
df_final = spark.read.format("delta") \
    .load("/mnt/delta/employees_staging")

df_final.write \
    .mode("overwrite") \
    .partitionBy("department", "hire_year") \
    .parquet("s3://my-bucket/employees/")

# Step 5: Create external table in Athena for querying
spark.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS employees_athena (
        emp_id INT,
        name STRING,
        email STRING,
        salary DECIMAL(10,2),
        salary_band STRING
    )
    PARTITIONED BY (department STRING, hire_year INT)
    STORED AS PARQUET
    LOCATION 's3://my-bucket/employees/'
""")

# Orchestration with Databricks Workflows
# Create job to run daily at 2 AM
```

**Q47. Write a Python script to automate the creation of AWS Glue crawlers for multiple S3 buckets.**

**Answer**:
```python
import boto3
import json

def create_glue_crawler(crawler_name, database_name, s3_path, iam_role):
    """
    Create AWS Glue crawler for S3 data source
    """
    glue_client = boto3.client('glue', region_name='us-east-1')
    
    try:
        response = glue_client.create_crawler(
            Name=crawler_name,
            Role=iam_role,
            DatabaseName=database_name,
            Targets={
                'S3Targets': [
                    {
                        'Path': s3_path,
                        'Exclusions': ['**.tmp', '**/_temporary/**']
                    }
                ]
            },
            Schedule='cron(0 2 * * ? *)',  # Daily at 2 AM
            SchemaChangePolicy={
                'UpdateBehavior': 'UPDATE_IN_DATABASE',
                'DeleteBehavior': 'LOG'
            },
            RecrawlPolicy={
                'RecrawlBehavior': 'CRAWL_NEW_FOLDERS_ONLY'
            },
            Configuration=json.dumps({
                "Version": 1.0,
                "CrawlerOutput": {
                    "Partitions": {"AddOrUpdateBehavior": "InheritFromTable"}
                }
            })
        )
        print(f"Crawler {crawler_name} created successfully")
        return response
    except Exception as e:
        print(f"Error creating crawler: {str(e)}")
        return None

# Create crawlers for multiple buckets
s3_buckets = [
    {'name': 'sales-crawler', 'path': 's3://data-lake/sales/'},
    {'name': 'customers-crawler', 'path': 's3://data-lake/customers/'},
    {'name': 'products-crawler', 'path': 's3://data-lake/products/'}
]

iam_role = 'arn:aws:iam::123456789012:role/GlueServiceRole'
database = 'data_lake_db'

for bucket in s3_buckets:
    create_glue_crawler(
        crawler_name=bucket['name'],
        database_name=database,
        s3_path=bucket['path'],
        iam_role=iam_role
    )

# Start all crawlers
def start_all_crawlers(crawler_names):
    glue_client = boto3.client('glue')
    for crawler_name in crawler_names:
        try:
            glue_client.start_crawler(Name=crawler_name)
            print(f"Started crawler: {crawler_name}")
        except Exception as e:
            print(f"Error starting crawler {crawler_name}: {str(e)}")

crawler_names = [b['name'] for b in s3_buckets]
start_all_crawlers(crawler_names)
```

### **Hard**

**Q48. Design a Lambda architecture for real-time and batch processing using AWS, PySpark, and Databricks.**

**Answer**:
```
Architecture Overview:
┌─────────────────────────────────────────────────────────────┐
│                       Data Sources                           │
│  (IoT Devices, APIs, Databases, Log Files)                  │
└────────────────────┬────────────────────────────────────────┘
                     │
         ┌───────────┴───────────┐
         │                       │
    ┌────▼─────┐          ┌─────▼────┐
    │ Kinesis  │          │    S3    │
    │ Stream   │          │  Bucket  │
    └────┬─────┘          └─────┬────┘
         │                      │
         │ Speed Layer          │ Batch Layer
         │                      │
    ┌────▼─────────┐      ┌─────▼──────────┐
    │  Lambda +    │      │  EMR/Databricks│
    │  Kinesis     │      │  Batch Jobs    │
    │  Analytics   │      │                │
    └────┬─────────┘      └─────┬──────────┘
         │                      │
         │                      │
    ┌────▼──────────────────────▼─────┐
    │      Serving Layer               │
    │   (DynamoDB + S3 + Athena)      │
    └────┬─────────────────────────────┘
         │
    ┌────▼─────┐
    │   API    │
    │ Gateway  │
    └──────────┘

Implementation:
```

```python
# 1. Speed Layer - Real-time processing with Lambda
import json
import boto3
from datetime import datetime

def lambda_realtime_processor(event, context):
    """
    Process streaming data from Kinesis
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('realtime_metrics')
    
    for record in event['Records']:
        # Decode Kinesis data
        payload = json.loads(
            base64.b64decode(record['kinesis']['data'])
        )
        
        # Process and aggregate
        metric = {
            'metric_id': payload['sensor_id'],
            'timestamp': datetime.now().isoformat(),
            'value': payload['temperature'],
            'ttl': int(time.time()) + 86400  # 24 hour TTL
        }
        
        # Write to DynamoDB for real-time queries
        table.put_item(Item=metric)
    
    return {'statusCode': 200}

# 2. Batch Layer - Historical processing with Databricks
from pyspark.sql.functions import col, window, avg, max, min
from delta.tables import DeltaTable

def batch_processor():
    """
    Daily batch processing of historical data
    """
    # Read raw data from S3
    df_raw = spark.read \
        .format("json") \
        .load("s3://data-lake/raw/sensors/")
    
    # Complex aggregations
    df_aggregated = df_raw \
        .groupBy(
            "sensor_id",
            window("timestamp", "1 hour")
        ).agg(
            avg("temperature").alias("avg_temp"),
            max("temperature").alias("max_temp"),
            min("temperature").alias("min_temp"),
            count("*").alias("reading_count")
        )
    
    # Write to Delta Lake
    df_aggregated.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("date") \
        .save("s3://data-lake/processed/sensor_hourly/")
    
    # Merge with existing aggregations
    delta_table = DeltaTable.forPath(
        spark, "s3://data-lake/serving/sensor_aggregates/"
    )
    
    delta_table.alias("target").merge(
        df_aggregated.alias("source"),
        """target.sensor_id = source.sensor_id AND
           target.hour = source.window.start"""
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

# 3. Serving Layer - Unified query interface
def query_unified_view(sensor_id, start_time, end_time):
    """
    Query combining real-time and batch data
    """
    # Query batch data (historical)
    batch_query = f"""
        SELECT * FROM sensor_aggregates
        WHERE sensor_id = '{sensor_id}'
        AND timestamp BETWEEN '{start_time}' AND '{end_time}'
    """
    df_batch = spark.sql(batch_query)
    
    # Query real-time data from DynamoDB
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('realtime_metrics')
    
    response = table.query(
        KeyConditionExpression=Key('metric_id').eq(sensor_id) &
        Key('timestamp').between(start_time, end_time)
    )
    
    # Combine results
    df_realtime = spark.createDataFrame(response['Items'])
    df_unified = df_batch.union(df_realtime)
    
    return df_unified

# 4. Orchestration with Step Functions
step_function_definition = {
    "Comment": "Lambda Architecture Orchestration",
    "StartAt": "TriggerBatchJob",
    "States": {
        "TriggerBatchJob": {
            "Type": "Task",
            "Resource": "arn:aws:states:::databricks:startRun.sync",
            "Parameters": {
                "JobId": "batch_processor_job_id"
            },
            "Next": "UpdateServingLayer"
        },
        "UpdateServingLayer": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:us-east-1:123456789:function:update-serving",
            "End": True
        }
    }
}
```

**Q49. Implement a data quality framework that validates data across Oracle, S3, and Databricks.**

**Answer**:
```python
from pyspark.sql.functions import col, count, when, isnan, isnull
from datetime import datetime
import boto3

class DataQualityFramework:
    """
    Unified data quality validation framework
    """
    
    def __init__(self, spark_session):
        self.spark = spark_session
        self.results = []
        
    def validate_completeness(self, df, column_name, threshold=0.95):
        """
        Check for null/missing values
        """
        total_count = df.count()
        non_null_count = df.filter(
            col(column_name).isNotNull()
        ).count()
        
        completeness_ratio = non_null_count / total_count
        
        result = {
            'check': 'completeness',
            'column': column_name,
            'total_records': total_count,
            'valid_records': non_null_count,
            'ratio': completeness_ratio,
            'threshold': threshold,
            'status': 'PASS' if completeness_ratio >= threshold else 'FAIL',
            'timestamp': datetime.now().isoformat()
        }
        
        self.results.append(result)
        return result
    
    def validate_uniqueness(self, df, column_name):
        """
        Check for duplicate values
        """
        total_count = df.count()
        distinct_count = df.select(column_name).distinct().count()
        
        result = {
            'check': 'uniqueness',
            'column': column_name,
            'total_records': total_count,
            'distinct_records': distinct_count,
            'duplicates': total_count - distinct_count,
            'status': 'PASS' if total_count == distinct_count else 'FAIL',
            'timestamp': datetime.now().isoformat()
        }
        
        self.results.append(result)
        return result
    
    def validate_range(self, df, column_name, min_val, max_val):
        """
        Check if values are within expected range
        """
        invalid_count = df.filter(
            (col(column_name) < min_val) |
            (col(column_name) > max_val)
        ).count()
        
        total_count = df.count()
        
        result = {
            'check': 'range',
            'column': column_name,
            'min': min_val,
            'max': max_val,
            'invalid_count': invalid_count,
            'total_count': total_count,
            'status': 'PASS' if invalid_count == 0 else 'FAIL',
            'timestamp': datetime.now().isoformat()
        }
        
        self.results.append(result)
        return result
    
    def validate_referential_integrity(self, df1, df2, key_column):
        """
        Check foreign key relationships
        """
        orphaned = df1.join(
            df2,
            df1[key_column] == df2[key_column],
            "left_anti"
        )
        
        orphaned_count = orphaned.count()
        
        result = {
            'check': 'referential_integrity',
            'key_column': key_column,
            'orphaned_records': orphaned_count,
            'status': 'PASS' if orphaned_count == 0 else 'FAIL',
            'timestamp': datetime.now().isoformat()
        }
        
        self.results.append(result)
        return result
    
    def validate_schema(self, df, expected_schema):
        """
        Validate DataFrame schema
        """
        actual_columns = set(df.columns)
        expected_columns = set(expected_schema.keys())
        
        missing = expected_columns - actual_columns
        extra = actual_columns - expected_columns
        
        result = {
            'check': 'schema',
            'missing_columns': list(missing),
            'extra_columns': list(extra),
            'status': 'PASS' if len(missing) == 0 and len(extra) == 0 else 'FAIL',
            'timestamp': datetime.now().isoformat()
        }
        
        self.results.append(result)
        return result
    
    def run_all_checks(self, df, config):
        """
        Run all configured data quality checks
        """
        for check in config['checks']:
            check_type = check['type']
            
            if check_type == 'completeness':
                self.validate_completeness(
                    df, check['column'], check.get('threshold', 0.95)
                )
            elif check_type == 'uniqueness':
                self.validate_uniqueness(df, check['column'])
            elif check_type == 'range':
                self.validate_range(
                    df, check['column'], check['min'], check['max']
                )
        
        # Write results to Delta Lake
        results_df = self.spark.createDataFrame(self.results)
        results_df.write.format("delta") \
            .mode("append") \
            .save("/mnt/delta/data_quality_results")
        
        return self.results
    
    def send_alerts(self, failed_checks):
        """
        Send SNS alerts for failed checks
        """
        sns = boto3.client('sns')
        
        for check in failed_checks:
            if check['status'] == 'FAIL':
                message = f"""
                Data Quality Check Failed:
                Check Type: {check['check']}
                Column: {check.get('column', 'N/A')}
                Details: {check}
                """
                
                sns.publish(
                    TopicArn='arn:aws:sns:us-east-1:123456789:dq-alerts',
                    Subject='Data Quality Alert',
                    Message=message
                )

# Usage Example
dq_framework = DataQualityFramework(spark)

# Configuration
config = {
    'checks': [
        {'type': 'completeness', 'column': 'customer_id', 'threshold': 0.99},
        {'type': 'uniqueness', 'column': 'transaction_id'},
        {'type': 'range', 'column': 'amount', 'min': 0, 'max': 1000000}
    ]
}

# Read data from various sources
df_oracle = spark.read.jdbc(jdbc_url, "transactions", properties=conn_props)
df_s3 = spark.read.parquet("s3://bucket/transactions/")
df_delta = spark.read.format("delta").load("/mnt/delta/transactions")

# Run checks on all sources
results_oracle = dq_framework.run_all_checks(df_oracle, config)
results_s3 = dq_framework.run_all_checks(df_s3, config)
results_delta = dq_framework.run_all_checks(df_delta, config)

# Send alerts for failures
all_results = results_oracle + results_s3 + results_delta
failed = [r for r in all_results if r['status'] == 'FAIL']
dq_framework.send_alerts(failed)
```

**Q50. Design a disaster recovery and business continuity plan for a mission-critical data platform using AWS and Databricks.**

**Answer**:
```
Disaster Recovery Strategy:
═══════════════════════════════════════════════════════════

1. RTO (Recovery Time Objective): 4 hours
2. RPO (Recovery Point Objective): 15 minutes

Architecture Components:
────────────────────────────────────────────────────────────

Primary Region (us-east-1):
├── Databricks Workspace (Primary)
├── S3 Buckets with Versioning
├── RDS Multi-AZ (Metadata)
├── Delta Lake Tables
└── AWS Glue Data Catalog

DR Region (us-west-2):
├── Databricks Workspace (Standby)
├── S3 Cross-Region Replication
├── RDS Read Replica → Promotable
├── Delta Lake Replicas
└── AWS Glue Data Catalog Replica

Implementation:
```

```python
# 1. Automated S3 Cross-Region Replication
import boto3

def setup_cross_region_replication():
    """
    Configure S3 replication to DR region
    """
    s3_client = boto3.client('s3')
    
    replication_config = {
        'Role': 'arn:aws:iam::123456789:role/S3ReplicationRole',
        'Rules': [{
            'ID': 'ReplicateAll',
            'Priority': 1,
            'Filter': {'Prefix': ''},
            'Status': 'Enabled',
            'Destination': {
                'Bucket': 'arn:aws:s3:::dr-bucket-us-west-2',
                'ReplicationTime': {
                    'Status': 'Enabled',
                    'Time': {'Minutes': 15}
                },
                'Metrics': {
                    'Status': 'Enabled',
                    'EventThreshold': {'Minutes': 15}
                }
            },
            'DeleteMarkerReplication': {'Status': 'Enabled'}
        }]
    }
    
    s3_client.put_bucket_replication(
        Bucket='primary-bucket-us-east-1',
        ReplicationConfiguration=replication_config
    )

# 2. Delta Lake Snapshot and Replication
def replicate_delta_tables():
    """
    Replicate Delta Lake tables to DR region
    """
    from delta.tables import DeltaTable
    
    # List of critical tables
    critical_tables = [
        '/mnt/delta/transactions',
        '/mnt/delta/customers',
        '/mnt/delta/products'
    ]
    
    for table_path in critical_tables:
        # Create deep clone in DR location
        spark.sql(f"""
            CREATE OR REPLACE TABLE delta.`/mnt/dr/delta{table_path}`
            DEEP CLONE delta.`{table_path}`
        """)
        
        # Sync to S3 DR bucket
        df = spark.read.format("delta").load(table_path)
        df.write.format("delta") \
            .mode("overwrite") \
            .save(f"s3://dr-bucket-us-west-2{table_path}")

# 3. RDS Automated Backups and Read Replica
def setup_rds_dr():
    """
    Configure RDS for disaster recovery
    """
    rds_client = boto3.client('rds')
    
    # Create read replica in DR region
    response = rds_client.create_db_instance_read_replica(
        DBInstanceIdentifier='metadata-db-replica-dr',
        SourceDBInstanceIdentifier='metadata-db-primary',
        DBInstanceClass='db.r5.large',
        AvailabilityZone='us-west-2a',
        MultiAZ=True,
        AutoMinorVersionUpgrade=True,
        CopyTagsToSnapshot=True
    )
    
    # Enable automated backups
    rds_client.modify_db_instance(
        DBInstanceIdentifier='metadata-db-primary',
        BackupRetentionPeriod=35,
        PreferredBackupWindow='03:00-04:00',
        CopyTagsToSnapshot=True
    )

# 4. Databricks Workspace Backup
def backup_databricks_workspace():
    """
    Backup Databricks notebooks, jobs, and configurations
    """
    import requests
    
    databricks_token = "your_token"
    workspace_url = "https://your-workspace.cloud.databricks.com"
    
    headers = {'Authorization': f'Bearer {databricks_token}'}
    
    # Export all notebooks
    notebooks_response = requests.get(
        f"{workspace_url}/api/2.0/workspace/list",
        headers=headers,
        params={'path': '/'}
    )
    
    for notebook in notebooks_response.json().get('objects', []):
        # Export each notebook
        export_response = requests.get(
            f"{workspace_url}/api/2.0/workspace/export",
            headers=headers,
            params={
                'path': notebook['path'],
                'format': 'SOURCE'
            }
        )
        
        # Save to S3 (which replicates to DR)
        s3_client = boto3.client('s3')
        s3_client.put_object(
            Bucket='databricks-backup',
            Key=f"notebooks{notebook['path']}",
            Body=export_response.content
        )
    
    # Backup job configurations
    jobs_response = requests.get(
        f"{workspace_url}/api/2.1/jobs/list",
        headers=headers
    )
    
    s3_client.put_object(
        Bucket='databricks-backup',
        Key='jobs/configurations.json',
        Body=json.dumps(jobs_response.json())
    )

# 5. Failover Orchestration with Lambda
def failover_orchestrator(event, context):
    """
    Automated failover orchestration
    """
    import boto3
    from datetime import datetime
    
    # 1. Promote RDS read replica
    rds = boto3.client('rds', region_name='us-west-2')
    rds.promote_read_replica(
        DBInstanceIdentifier='metadata-db-replica-dr'
    )
    
    # 2. Update Route53 to point to DR region
    route53 = boto3.client('route53')
    route53.change_resource_record_sets(
        HostedZoneId='Z1234567890ABC',
        ChangeBatch={
            'Changes': [{
                'Action': 'UPSERT',
                'ResourceRecordSet': {
                    'Name': 'api.example.com',
                    'Type': 'A',
                    'AliasTarget': {
                        'HostedZoneId': 'Z0987654321XYZ',
                        'DNSName': 'dr-alb.us-west-2.elb.amazonaws.com',
                        'EvaluateTargetHealth': True
                    }
                }
            }]
        }
    )
    
    # 3. Start DR Databricks cluster
    databricks_api = "https://dr-workspace.cloud.databricks.com"
    requests.post(
        f"{databricks_api}/api/2.0/clusters/start",
        headers={'Authorization': f'Bearer {dr_token}'},
        json={'cluster_id': 'dr-cluster-id'}
    )
    
    # 4. Send notifications
    sns = boto3.client('sns', region_name='us-west-2')
    sns.publish(
        TopicArn='arn:aws:sns:us-west-2:123456789:dr-alerts',
        Subject='DISASTER RECOVERY ACTIVATED',
        Message=f"""
        Disaster Recovery Failover Initiated
        Time: {datetime.now().isoformat()}
        Primary Region: us-east-1 (DOWN)
        DR Region: us-west-2 (ACTIVE)
        
        Actions Taken:
        1. RDS Read Replica Promoted
        2. Route53 Updated
        3. DR Databricks Cluster Started
        4. Applications redirected to DR
        
        Estimated Recovery Time: 30 minutes
        """
    )
    
    return {'statusCode': 200, 'body': 'Failover initiated'}

# 6. Continuous DR Testing
def dr_test_schedule():
    """
    Quarterly DR test procedure
    """
    test_plan = """
    DR Test Checklist:
    ══════════════════════════════════════════════════════
    
    Pre-Test (1 week before):
    □ Notify stakeholders
    □ Verify backup integrity
    □ Check replication lag (should be < 15 min)
    □ Review runbooks
    
    During Test:
    □ Simulate primary region failure
    □ Execute failover procedures
    □ Validate DR application functionality
    □ Test data integrity
    □ Measure actual RTO/RPO
    □ Document issues
    
    Post-Test:
    □ Failback to primary
    □ Update documentation
    □ Address identified gaps
    □ Report to stakeholders
    
    Success Criteria:
    ✓ RTO < 4 hours
    ✓ RPO < 15 minutes
    ✓ 100% critical functionality restored
    ✓ Zero data loss for committed transactions
    """
    
    return test_plan

# 7. Monitoring and Alerting
def setup_dr_monitoring():
    """
    CloudWatch alarms for DR readiness
    """
    cloudwatch = boto3.client('cloudwatch')
    
    # Replication lag alarm
    cloudwatch.put_metric_alarm(
        AlarmName='S3-Replication-Lag',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='ReplicationLatency',
        Namespace='AWS/S3',
        Period=300,
        Statistic='Average',
        Threshold=900.0,  # 15 minutes
        ActionsEnabled=True,
        AlarmActions=['arn:aws:sns:us-east-1:123456789:dr-alerts']
    )
    
    # RDS replica lag alarm
    cloudwatch.put_metric_alarm(
        AlarmName='RDS-Replica-Lag',
        ComparisonOperator='GreaterThanThreshold',
        EvaluationPeriods=2,
        MetricName='ReplicaLag',
        Namespace='AWS/RDS',
        Period=60,
        Statistic='Average',
        Threshold=30.0,  # 30 seconds
        ActionsEnabled=True,
        AlarmActions=['arn:aws:sns:us-east-1:123456789:dr-alerts']
    )
```

---

This comprehensive questionnaire covers all requested technologies with a good mix of theoretical knowledge and practical scenarios. The questions progress from easy to hard, testing both conceptual understanding and hands-on skills expected from candidates with 4-7 years of experience.