In [0]:
from pyspark.sql import SparkSession
import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("Read Latest Bronze File").getOrCreate()

# Define Bronze Layer path
bronze_path = "dbfs:/FileStore/tables/bronze_layer1/"

# List all files in Bronze Layer
files = dbutils.fs.ls(bronze_path)

# Check if there are files
if not files:
    print("No files found in Bronze Layer.")
else:
    # Extract filenames and modification timestamps
    file_paths = [(file.path, file.modificationTime) for file in files]

    # Sort files based on modification time (latest first)
    file_paths.sort(key=lambda x: x[1], reverse=True)

    # Get latest file path
    latest_file_path = file_paths[0][0]
    print(f"Latest file in Bronze Layer: {latest_file_path}")

    # Read the latest file into a DataFrame
    df_bronze = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("quote", '"') \
        .option("escape", '"') \
        .option("multiLine", "true") \
        .load(latest_file_path)

    # Show DataFrame
    df_bronze.show()


In [0]:
df_bronze.display()

In [0]:
df_bronze.count()

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, explode, from_json
from pyspark.sql.types import StructType, ArrayType
from typing import List, Tuple

def flatten_df_generic(df_bronze: DataFrame) -> DataFrame:
    """
    Dynamically flatten a DataFrame while properly handling nested JSON and arrays.

    Args:
        df_bronze: Input DataFrame (Raw Data from the Bronze Layer).

    Returns:
        Flattened DataFrame with all nested structures expanded into columns.
    """
    
    def process_json_columns(df: DataFrame) -> Tuple[DataFrame, List[str]]:
        """Convert JSON string columns into structured columns and track column order."""
        json_columns = []
        for column in df.columns:
            sample = df.select(column).first()[0]
            if sample is not None:
                try:
                    if isinstance(sample, str):
                        if sample[0] == '{':
                            # Convert JSON object string to a structured column
                            schema = spark.read.json(df.rdd.map(lambda x: x[column])).schema
                            df = df.withColumn(column, from_json(col(column), schema))
                            json_columns.append(column)
                        elif sample[0] == '[':
                            json_columns.append(column)
                except Exception:
                    continue
        return df, json_columns

    def explode_arrays(df: DataFrame) -> DataFrame:
        """Explode array columns into multiple rows."""
        for column in df.columns:
            if isinstance(df.schema[column].dataType, ArrayType):
                df = df.withColumn(column, explode(col(column)))
        return df

    def flatten_structs(df: DataFrame) -> DataFrame:
        """Flatten struct columns into individual fields and track column order."""
        new_columns = []
        for field in df.schema.fields:
            if isinstance(field.dataType, StructType):
                field_name = field.name
                for nested_field in field.dataType.fields:
                    nested_name = f"{field_name}_{nested_field.name}"
                    df = df.withColumn(nested_name, col(f"{field_name}.{nested_field.name}"))
                    new_columns.append(nested_name)
                df = df.drop(field_name)
            else:
                new_columns.append(field.name)
        return df, new_columns

    def extract_json_paths(df: DataFrame, json_columns: List[str]) -> DataFrame:
        """Extract fields from JSON string columns."""
        for column in json_columns:
            sample = df.select(column).first()[0]
            if sample and sample[0] == '[':
                array_schema = spark.read.json(df.rdd.map(lambda x: x[column])).schema
                df = df.withColumn(column, from_json(col(column), ArrayType(array_schema)))
            elif sample and sample[0] == '{':
                object_schema = spark.read.json(df.rdd.map(lambda x: x[column])).schema
                df = df.withColumn(column, from_json(col(column), object_schema))
        return df

    # Step 1: Process JSON columns and track column order
    df_bronze, json_columns = process_json_columns(df_bronze)

    # Step 2: Extract JSON paths for identified columns
    df_bronze = extract_json_paths(df_bronze, json_columns)

    # Step 3: Explode arrays into individual rows
    df_bronze = explode_arrays(df_bronze)

    # Step 4: Flatten all structs and track column order
    df_bronze, final_columns = flatten_structs(df_bronze)

    # Repeat Steps 1-4 until no more nested columns
    nested_columns = [field.name for field in df_bronze.schema.fields if isinstance(field.dataType, (StructType, ArrayType))]
    while nested_columns:
        df_bronze = explode_arrays(flatten_structs(df_bronze)[0])
        nested_columns = [field.name for field in df_bronze.schema.fields if isinstance(field.dataType, (StructType, ArrayType))]

    # Select columns in the order they were processed
    return df_bronze.select([col(c) for c in final_columns])

# Process the raw data from Bronze Layer
flattened_df = flatten_df_generic(df_bronze)




In [0]:
# Flatten the input DataFrame
flattened_df.display()


In [0]:
flattened_df.count()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import re

class DataCleaner:
    def __init__(self, flattened_df):
        self.flattened_df = flattened_df
        # Automatically categorize columns by data type
        self.schema = {f.name: f.dataType for f in flattened_df.schema.fields}
        self.numeric_cols = [f.name for f in flattened_df.schema.fields if isinstance(f.dataType, (DoubleType, LongType, IntegerType))]
        self.string_cols = [f.name for f in flattened_df.schema.fields if isinstance(f.dataType, StringType)]
        self.date_cols = [col for col in self.string_cols if 'date' in col.lower() or 'time' in col.lower()]

    def clean_numeric_columns(self):
        """Clean numeric columns - handle nulls, outliers, and retain original data types."""
        for col_name in self.numeric_cols:
            # Get original data type
            original_type = self.schema[col_name]

            # Calculate statistics for outlier detection
            stats = self.flattened_df.select(
                mean(col(col_name)).alias('mean'),
                stddev(col(col_name)).alias('stddev'),
                percentile_approx(col(col_name), 0.25).alias('q1'),
                percentile_approx(col(col_name), 0.75).alias('q3')
            ).collect()[0]

            mean_val = stats['mean']
            q1 = stats['q1']
            q3 = stats['q3']
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr

            # Apply transformations
            cleaned_col = when(col(col_name).isNull(), mean_val) \
                .when(col(col_name) < lower_bound, lower_bound) \
                .when(col(col_name) > upper_bound, upper_bound) \
                .otherwise(col(col_name))

            # Cast the cleaned column back to its original type
            if isinstance(original_type, IntegerType):
                cleaned_col = cleaned_col.cast(IntegerType())
            elif isinstance(original_type, LongType):
                cleaned_col = cleaned_col.cast(LongType())
            elif isinstance(original_type, DoubleType):
                cleaned_col = cleaned_col.cast(DoubleType())

            # Replace the column with the cleaned version
            self.flattened_df = self.flattened_df.withColumn(col_name, cleaned_col)

        return self

    def clean_string_columns(self):
        """Clean string columns - handle nulls, standardize case, remove special characters"""
        for col_name in self.string_cols:
            if col_name not in self.date_cols:  # Skip date/time columns
                self.flattened_df = self.flattened_df.withColumn(
                    col_name,
                    when(col(col_name).isNull() | (trim(col(col_name)) == ""), "unknown")
                    .otherwise(
                        regexp_replace(  # Remove special characters except space and hyphen
                            trim(lower(col(col_name))),  # Standardize case and remove whitespace
                            '[^a-z0-9\\s-]', ''
                        )
                    )
                )
        return self

    def clean_date_time_columns(self):
        """Clean and standardize date and time columns, ensuring action_ts is properly formatted"""
        for col_name in self.date_cols:
            if 'date' in col_name.lower():
                # Convert to standard date format YYYY-MM-DD
                self.flattened_df = self.flattened_df.withColumn(
                    col_name,
                    when(to_date(col(col_name), 'yyyy-MM-dd').isNull(), current_date())
                    .otherwise(to_date(col(col_name), 'yyyy-MM-dd'))
                )
            elif 'time' in col_name.lower() and col_name != "action_ts":
                # Convert to standard time format HH:mm:ss
                self.flattened_df = self.flattened_df.withColumn(
                    col_name,
                    when(
                        regexp_extract(col(col_name), '^(?:[01]\\d|2[0-3]):[0-5]\\d:[0-5]\\d$', 0) == '',
                        lit('00:00:00')
                    ).otherwise(col(col_name))
                )
            elif col_name == "action_ts":
                # Ensure action_ts is properly formatted as a timestamp
                self.flattened_df = self.flattened_df.withColumn(
                    col_name,
                    when(to_timestamp(col(col_name), 'yyyy-MM-dd HH:mm:ss').isNull(), lit("1970-01-01 00:00:00"))
                    .otherwise(to_timestamp(col(col_name), 'yyyy-MM-dd HH:mm:ss'))
                )
        return self

    def standardize_categorical_columns(self):
        """Standardize categorical columns generically"""
        for col_name in self.string_cols:
            if col_name not in self.date_cols:
                self.flattened_df = self.flattened_df.withColumn(
                    col_name,
                    lower(trim(col(col_name)))  # Convert to lowercase and trim spaces
                )
        return self

    def handle_duplicates(self, subset=None):
        """Remove duplicate records based on a subset of columns or the entire dataframe"""
        if subset:
            self.flattened_df = self.flattened_df.dropDuplicates(subset)
        else:
            self.flattened_df = self.flattened_df.dropDuplicates()
        return self    

    def apply_all_transformations(self):
        """Apply all cleaning and transformation steps"""
        return (self
                .clean_numeric_columns()
                .clean_string_columns()
                .clean_date_time_columns()
                .standardize_categorical_columns()
                .handle_duplicates()
                .flattened_df)

# Usage Example
def process_dataframe(flattened_df):
    try:
        # Initialize cleaner
        cleaner = DataCleaner(flattened_df)
        
        # Apply all transformations
        cleaned_df = cleaner.apply_all_transformations()
        
        # Cache the cleaned dataframe
        cleaned_df.cache()
        
        # Return success status and cleaned dataframe
        return {
            "status": "success",
            "data": cleaned_df,
            "row_count": cleaned_df.count()
        }
        
    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e)
        }

# Process the DataFrame and display results
result = process_dataframe(flattened_df)

if result["status"] == "success":
    cleaned_df = result["data"]
    cleaned_df.display()
else:
    print(f"Error: {result['error_message']}")


In [0]:
cleaned_df.count()


In [0]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Configure Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Initialize Spark session with Delta Lake support
spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,com.amazon.redshift:redshift-jdbc42:2.1.0.18") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Redshift connection details
redshift_url = "jdbc:redshift://clusterid.end-point.aws-region.redshift.amazonaws.com:port-number/redshift-database-name"
redshift_properties = {
    "user": "redshift_user_name",
    "password": "redshift_password",
    "driver": "com.amazon.redshift.jdbc.Driver"
}

table_name = "bronze_to_stagging"
etl_tracker_table = "gold.etl_tracker"

# Define S3 paths
aws_bucket_name = "clm-case-study"
s3_folder = "silver-layer"
s3_output_path = f"s3a://{aws_bucket_name}/{s3_folder}/"

# Set AWS credentials
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "access-key")
hadoop_conf.set("fs.s3a.secret.key", "access-secret-key")
hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

def get_latest_last_modified():
    """Fetch the latest processed timestamp from Redshift ETL tracker."""
    query = f"""
        (SELECT MAX(timestamp) AS last_modified_at 
         FROM {etl_tracker_table} WHERE table_name = '{table_name}') AS latest_timestamp"""
    etl_tracker_df = spark.read.jdbc(url=redshift_url, table=query, properties=redshift_properties)
    
    latest_timestamp = etl_tracker_df.collect()[0][0]
    return latest_timestamp if latest_timestamp else "1900-01-01 00:00:00"

def update_etl_tracker(new_max_timestamp):
    """Update the ETL tracker in Redshift using Spark JDBC."""
    update_data = [(table_name, new_max_timestamp, "silver", "success")]
    update_df = spark.createDataFrame(update_data, ["table_name", "timestamp", "layer", "status"])
    
    update_df.write \
        .format("jdbc") \
        .option("url", redshift_url) \
        .option("dbtable", etl_tracker_table) \
        .options(**redshift_properties) \
        .mode("append") \
        .save()

try:
    # Get latest processed timestamp from ETL tracker
    latest_timestamp = get_latest_last_modified()
    logger.info(f"Latest processed timestamp from ETL tracker: {latest_timestamp}")
    
    # Filter new data based on last_modified_at
    new_data_df = cleaned_df.filter(col("last_modified_at") > latest_timestamp).coalesce(1)
    new_records_count = new_data_df.count()
    
    if new_records_count == 0:
        logger.info("No new data to load")
    else:
        logger.info(f"Loading {new_records_count} new records")
        
        # Append new data to Delta Lake with coalesce
        new_data_df.write \
            .format("delta") \
            .mode("append") \
            .save(s3_output_path)
        
        # Update ETL tracker with new max timestamp
        new_max_timestamp = new_data_df.agg(max("last_modified_at")).collect()[0][0]
        update_etl_tracker(new_max_timestamp)
        logger.info(f"Updated ETL tracker with new max timestamp: {new_max_timestamp}")
except Exception as e:
    logger.error(f"Error during ETL process: {str(e)}")
    raise
