### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [1]:
# Write your code from here
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType
from pyspark.sql.functions import col, lit

# Step 1: Create Spark session
spark = SparkSession.builder.appName("HandleSchemaMismatches").getOrCreate()

# Sample data with potential schema mismatches (imagine this comes from a file)
data_mismatched = [
    ("Alice", "30", "true", "60000.5"),
    ("Bob", 25, "false", "55000"),
    ("Charlie", "40", 1, 70000.0),
    (None, "28", "TRUE", "62000.75"),
]

# Define the schema of the mismatched data (as Spark might infer it)
schema_mismatched = ["name", "age_str", "is_active_str_int", "salary_str"]
df_mismatched = spark.createDataFrame(data_mismatched, schema_mismatched)

# Step 2: Load dataframe (already done above for demonstration)
print("DataFrame with potential schema mismatches:")
df_mismatched.show()
df_mismatched.printSchema()

# Step 3: Define the expected schema
expected_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("salary", FloatType(), True)
])

print("\nExpected Schema:")
expected_schema.printTreeString()

# Step 4: Handle schema mismatches
df_corrected = df_mismatched.select(
    col("name"),
    col("age_str").cast(IntegerType()).alias("age"),
    (col("is_active_str_int") == "true").cast(BooleanType()).alias("is_active"),
    col("salary_str").cast(FloatType()).alias("salary")
)

# Handle potential null values or parsing errors more robustly
df_corrected = df_mismatched.select(
    col("name"),
    (col("age_str").cast(IntegerType())).alias("age"),
    (
        (col("is_active_str_int") == "true") | (col("is_active_str_int") == "TRUE") | (col("is_active_str_int") == 1)
    ).cast(BooleanType()).alias("is_active"),
    (col("salary_str").cast(FloatType())).alias("salary")
)

# Add a new column if it's missing in the source
if "city_str" not in df_mismatched.columns:
    df_with_city = df_mismatched.withColumn("city", lit(None).cast(StringType()))
else:
    df_with_city = df_mismatched.withColumnRenamed("city_str", "city")

# Select and cast columns to match the expected schema, handling missing columns
df_corrected = df_with_city.select(
    col("name"),
    (col("age_str").cast(IntegerType())).alias("age"),
    (
        (col("is_active_str_int") == "true") | (col("is_active_str_int") == "TRUE") | (col("is_active_str_int") == 1)
    ).cast(BooleanType()).alias("is_active"),
    (col("salary_str").cast(FloatType())).alias("salary"),
    col("city")
)

# Ensure the final DataFrame has exactly the columns of the expected schema (and in the right order if needed)
expected_columns = [field.name for field in expected_schema]
if "city" in df_corrected.columns and "city" not in expected_columns:
    expected_schema_with_city = StructType(expected_schema.fields + [StructField("city", StringType(), True)])
    expected_columns = [field.name for field in expected_schema_with_city]

final_df = df_corrected.select(*expected_columns)

# Step 5: Show corrected data
print("\nCorrected DataFrame matching the expected schema:")
final_df.show()
final_df.printSchema()

# Stop Spark session
spark.stop()

ModuleNotFoundError: No module named 'pyspark'

### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [2]:
# Write your code from here
import pandas as pd

def handle_incomplete_data(file_path, imputation_strategy='mean', fill_value=None, columns_to_impute=None):
    """
    Loads data from a CSV file, detects incomplete (missing) data,
    fills missing values using a specified strategy, and reports the changes.

    Args:
        file_path (str): The path to the CSV file.
        imputation_strategy (str, optional): The strategy to use for filling missing values.
                                             Options: 'mean', 'median', 'mode', 'constant'.
                                             Defaults to 'mean'.
        fill_value (any, optional): The constant value to use if imputation_strategy is 'constant'.
                                    Defaults to None.
        columns_to_impute (list, optional): A list of column names to apply imputation to.
                                           If None, imputation will be applied to all columns
                                           with missing values that support the chosen strategy.
                                           Defaults to None.
    """
    try:
        # Step 1: Detect incomplete data
        df = pd.read_csv(file_path)
        print(f"Data loaded successfully from: {file_path}\n")

        initial_missing_values = df.isnull().sum()
        print("Initial missing values per column:")
        print(initial_missing_values[initial_missing_values > 0])
        initial_total_missing = initial_missing_values.sum()
        print(f"\nTotal initial missing values: {initial_total_missing}")

        df_before_imputation = df.copy()

        # Step 2: Fill missing values
        if imputation_strategy == 'mean':
            if columns_to_impute:
                for col in columns_to_impute:
                    if df[col].dtype in ['int64', 'float64']:
                        df[col].fillna(df[col].mean(), inplace=True)
                    else:
                        print(f"Warning: Cannot apply mean imputation to non-numeric column '{col}'. Skipping.")
            else:
                for col in df.columns:
                    if df[col].dtype in ['int64', 'float64']:
                        df[col].fillna(df[col].mean(), inplace=True)
        elif imputation_strategy == 'median':
            if columns_to_impute:
                for col in columns_to_impute:
                    if df[col].dtype in ['int64', 'float64']:
                        df[col].fillna(df[col].median(), inplace=True)
                    else:
                        print(f"Warning: Cannot apply median imputation to non-numeric column '{col}'. Skipping.")
            else:
                for col in df.columns:
                    if df[col].dtype in ['int64', 'float64']:
                        df[col].fillna(df[col].median(), inplace=True)
        elif imputation_strategy == 'mode':
            if columns_to_impute:
                for col in columns_to_impute:
                    df[col].fillna(df[col].mode()[0], inplace=True)
            else:
                for col in df.columns:
                    df[col].fillna(df[col].mode()[0], inplace=True)
        elif imputation_strategy == 'constant':
            if fill_value is not None:
                if columns_to_impute:
                    for col in columns_to_impute:
                        df[col].fillna(fill_value, inplace=True)
                else:
                    df.fillna(fill_value, inplace=True)
            else:
                print("Error: 'fill_value' must be specified when using 'constant' imputation.")
                return None
        else:
            print(f"Error: Invalid imputation strategy '{imputation_strategy}'. Options are: 'mean', 'median', 'mode', 'constant'.")
            return None

        # Step 3: Report changes
        final_missing_values = df.isnull().sum()
        print("\nMissing values per column after imputation:")
        print(final_missing_values[final_missing_values > 0])
        final_total_missing = final_missing_values.sum()
        print(f"\nTotal missing values after imputation: {final_total_missing}")

        num_filled = initial_total_missing - final_total_missing
        print(f"\nNumber of missing values filled: {num_filled}")

        return df

    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Example Usage:
file_path = 'your_data_with_missing.csv'  # Replace with the actual path

# Create a sample CSV file with missing values for testing (optional)
data = {'col1': [1, 2, None, 4, 5],
        'col2': [None, 6, 7, None, 9],
        'col3': ['A', 'B', None, 'A', 'C'],
        'col4': [1.1, None, 3.3, 4.4, None]}
df_sample = pd.DataFrame(data)
df_sample.to_csv(file_path, index=False)

# Handle incomplete data using mean imputation for numeric columns
df_filled_mean = handle_incomplete_data(file_path)
if df_filled_mean is not None:
    print("\nDataFrame after mean imputation:")
    print(df_filled_mean)

# Handle incomplete data using median imputation for specific numeric columns
df_filled_median = handle_incomplete_data(file_path, imputation_strategy='median', columns_to_impute=['col1', 'col4'])
if df_filled_median is not None:
    print("\nDataFrame after median imputation (for col1 and col4):")
    print(df_filled_median)

# Handle incomplete data using mode imputation for a categorical column
df_filled_mode = handle_incomplete_data(file_path, imputation_strategy='mode', columns_to_impute=['col3'])
if df_filled_mode is not None:
    print("\nDataFrame after mode imputation (for col3):")
    print(df_filled_mode)

# Handle incomplete data using constant value imputation
df_filled_constant = handle_incomplete_data(file_path, imputation_strategy='constant', fill_value='Missing', columns_to_impute=['col3'])
if df_filled_constant is not None:
    print("\nDataFrame after constant imputation (for col3):")
    print(df_filled_constant)

Data loaded successfully from: your_data_with_missing.csv

Initial missing values per column:
col1    1
col2    2
col3    1
col4    2
dtype: int64

Total initial missing values: 6

Missing values per column after imputation:
col3    1
dtype: int64

Total missing values after imputation: 1

Number of missing values filled: 5

DataFrame after mean imputation:
   col1      col2 col3      col4
0   1.0  7.333333    A  1.100000
1   2.0  6.000000    B  2.933333
2   3.0  7.000000  NaN  3.300000
3   4.0  7.333333    A  4.400000
4   5.0  9.000000    C  2.933333
Data loaded successfully from: your_data_with_missing.csv

Initial missing values per column:
col1    1
col2    2
col3    1
col4    2
dtype: int64

Total initial missing values: 6

Missing values per column after imputation:
col2    2
col3    1
dtype: int64

Total missing values after imputation: 3

Number of missing values filled: 3

DataFrame after median imputation (for col1 and col4):
   col1  col2 col3  col4
0   1.0   NaN    A   1.1
