<a href="https://colab.research.google.com/github/myakagudam/archana/blob/main/Copy_of_Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
1. Create DataFrame from Python List of Tuples (Hardcoded data)
Scenario: For quick unit testing or mocking input data before pipeline execution.


from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame Creation").getOrCreate()

data = [(1, "Alice", 5000), (2, "Bob", 6000)]
columns = ["id", "name", "salary"]

df = spark.createDataFrame(data, schema=columns)
df.show()

In [None]:
2. Create DataFrame from CSV File (Structured File Input)
Scenario: Loading transaction logs from SFTP/Blob/ADLS in daily batch jobs.

python
Copy
Edit
df = spark.read.option("header", True).csv("/mnt/raw/transactions.csv")
df.printSchema()
df.show(5)


In [None]:
3. Create DataFrame from JSON File
Scenario: Ingesting nested event data from application APIs.

python
Copy
Edit
df = spark.read.option("multiline", True).json("/mnt/raw/events.json")
df.select("eventType", "timestamp").show()

In [None]:
 4. Create DataFrame from Parquet File
Scenario: Standard optimized input/output in data lake pipelines.

python
Copy
Edit
df = spark.read.parquet("/mnt/silver/user_profile")
df.createOrReplaceTempView("user_profile")


In [None]:
5. Create DataFrame from SQL Table (JDBC)
Scenario: Read from MySQL, PostgreSQL, or SQL Server using ADF or directly in Databricks.

python
Copy
Edit
jdbc_url = "jdbc:mysql://hostname:3306/dbname"
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "transactions") \
    .option("user", "username") \
    .option("password", "password") \
    .load()
df.show()


In [None]:
 6. Create DataFrame from Delta Table
Scenario: Reading data from Bronze/Silver/Gold layers in the Medallion Architecture.

python
Copy
Edit
df = spark.read.format("delta").load("/mnt/gold/suspicious_transactions")
df.filter("amount > 10000").show()

In [None]:
 7. Create DataFrame using RDD
Scenario: Legacy Spark or custom transformation use case.

python
Copy
Edit
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob")])
df = rdd.toDF(["id", "name"])
df.show()


In [None]:
8. Create DataFrame with Schema Definition (StructType)
Scenario: Ensuring strict schema validation on ingestion.

python
Copy
Edit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True)
])

data = [(1, "Alice", 5000), (2, "Bob", 6000)]
df = spark.createDataFrame(data, schema=schema)
df.printSchema()


In [None]:
9. Create DataFrame from Streaming Source (Kafka)
Scenario: Real-time ingestion of AML events or payment messages.

python
Copy
Edit
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "transactions") \
    .load()

1. Hardcoded Data (List of Tuples)
Question:
You're designing a unit test for a transformation logic. How would you create a DataFrame with test data?
✅ Tip: Use createDataFrame(data, schema) to simulate source data.

🔹 2. CSV File
Question:
Your daily batch job loads transaction data from a CSV file on ADLS. How will you ensure schema consistency and skip corrupt records?
✅ Tip: Use .option("header", True).option("mode", "DROPMALFORMED")

🔹 3. JSON File (Nested Structure)
Question:
How do you read and flatten a nested JSON file received from a third-party API?
✅ Tip: Use df.selectExpr("payload.id", "payload.name") or explode() for arrays.

🔹 4. Parquet File
Question:
Why would you choose Parquet format over CSV in your Gold layer?
✅ Tip: Explain about columnar storage, faster queries, and compression benefits.

🔹 5. JDBC Table
Question:
How would you pull 1 million records from a MySQL database into Spark efficiently?
✅ Tip: Use .option("partitionColumn", "id") with .option("numPartitions", 4).

🔹 6. Delta Table
Question:
Your Gold layer stores high-value AML alerts. How do you query the latest snapshot with fast performance?
✅ Tip: Delta format + Z-Order by transaction_date.

🔹 7. RDD Conversion
Question:
You have an RDD from a legacy Spark job. How do you convert it to a DataFrame with schema?
✅ Tip: Use rdd.toDF(["col1", "col2"]).

🔹 8. With Defined Schema
Question:
Why is it important to define a schema when reading raw ingestion files?
✅ Tip: Prevents schema inference errors, ensures type consistency.

🔹 9. Kafka Stream
Question:
How do you read real-time transaction messages from a Kafka topic and convert the value to string?
✅ Tip:

python
Copy
Edit
df.selectExpr("CAST(value AS STRING)")

Question:
You receive CSV, JSON, and Parquet files from different sources every day. How do you handle schema differences and create a unified DataFrame?
✅ Tip: Use schema merging with .option("mergeSchema", "true") for Parquet, or define StructType and standardize before union.