### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, when, lit
import pandas as pd
import numpy as np
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ------------------ Task 1: Spark – Handle Schema Mismatch with Logging ------------------

def handle_schema_with_logging():
    logger.info("🚀 Starting Spark session...")
    spark = SparkSession.builder.appName("SchemaMismatchHandler").getOrCreate()

    logger.info("📄 Creating sample raw data with schema mismatch...")
    raw_data = [
        {"id": "1", "name": "Alice", "age": "25"},
        {"id": "2", "name": "Bob", "age": "thirty"},
        {"id": "3", "name": "Charlie", "age": "40"},
        {"id": "4", "name": "David", "age": None}
    ]
    df_raw = spark.createDataFrame(raw_data)

    logger.info("📋 Defining expected schema...")
    expected_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ])

    # Cast and log invalid entries
    df_casted = df_raw.withColumn("id_casted", col("id").cast("int")) \
                      .withColumn("age_casted", col("age").cast("int"))

    df_valid = df_casted.filter(col("age_casted").isNotNull())
    df_invalid = df_casted.filter(col("age_casted").isNull())

    logger.warning("⚠️ Invalid records detected during casting:")
    df_invalid.show()

    logger.info("✅ Valid cleaned records:")
    df_valid.select(col("id_casted").alias("id"),
                    "name",
                    col("age_casted").alias("age")).show()

    # Simple test: Ensure that invalid entries were caught
    assert df_invalid.count() > 0, "Test failed: No invalid rows detected"
    assert df_valid.count() > 0, "Test failed: No valid rows left"

    spark.stop()

# ------------------ Task 2: Pandas – Handle Missing Values with Assertion ------------------

def clean_pandas_data():
    logger.info("🧪 Creating sample Pandas dataset with missing values...")
    data = {
        "CustomerID": [101, 102, 103, 104, 105],
        "Name": ["Alice", "Bob", "Charlie", "David", None],
        "PurchaseAmount": [250.5, None, 300.0, 150.0, None]
    }
    df = pd.DataFrame(data)

    logger.info("🔍 Missing value report before cleaning:\n%s", df.isnull().sum())

    # Imputation strategy
    df['Name'].fillna("Unknown", inplace=True)
    df['PurchaseAmount'].fillna(df['PurchaseAmount'].median(), inplace=True)

    logger.info("✅ Cleaned DataFrame:\n%s", df)

    # Basic tests (simulating unit tests)
    assert df.isnull().sum().sum() == 0, "Test failed: Missing values remain"
    assert df['Name'].iloc[-1] == "Unknown", "Test failed: Name imputation failed"
    assert df['PurchaseAmount'].dtype in [float, np.float64], "Test failed: Wrong data type"

# ------------------ Run Both Tasks ------------------

if __name__ == "__main__":
    handle_schema_with_logging()
    clean_pandas_data()
    logger.info("✅ All tasks completed with validations.")

INFO:__main__:🚀 Starting Spark session...
JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [6]:
# Write your code from here

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col
import pandas as pd
import numpy as np

# --------------------- Task 1: Handle Schema Mismatch (Spark) ---------------------

# Step 1: Create Spark Session
spark = SparkSession.builder.appName("SchemaMismatchHandler").getOrCreate()

# Step 2: Simulated Spark DataFrame with wrong schema (e.g., age as string)
raw_data = [
    {"id": "1", "name": "Alice", "age": "25"},
    {"id": "2", "name": "Bob", "age": "thirty"},  # invalid numeric value
    {"id": "3", "name": "Charlie", "age": "40"}
]
raw_df = spark.createDataFrame(raw_data)

# Step 3: Define expected schema
expected_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Step 4: Handle mismatches by casting and filtering out bad rows
corrected_df = raw_df.withColumn("id", col("id").cast("int")) \
                     .withColumn("age", col("age").cast("int"))

print("✅ Task 1: Corrected Spark DataFrame with expected schema:")
corrected_df.show()

# --------------------- Task 2: Handle Incomplete Data (Pandas) ---------------------

# Step 1: Simulated Pandas DataFrame with missing values
data = {
    "CustomerID": [101, 102, 103, 104, 105],
    "Name": ["Alice", "Bob", "Charlie", "David", None],
    "PurchaseAmount": [250.5, None, 300.0, 150.0, None]
}
df = pd.DataFrame(data)

# Step 2: Detect incomplete data
print("\n🔍 Task 2: Missing Value Report:")
print(df.isnull().sum())

# Step 3: Fill missing values
# - Fill 'Name' with placeholder
# - Fill 'PurchaseAmount' with median estimate
df['Name'].fillna("Unknown", inplace=True)
df['PurchaseAmount'].fillna(df['PurchaseAmount'].median(), inplace=True)

# Step 4: Report after changes
print("\n✅ Task 2: Cleaned Pandas DataFrame:")
print(df)

# Optional: Summary of changes
missing_after = df.isnull().sum().sum()
print(f"\n📊 Total missing values after imputation: {missing_after}")


JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [None]:
# Write your code from here