In [2]:
import sys
import os
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import subprocess

# Define parameters in a dictionary
parameters = {
    'JOB_NAME': 'FullLoadAndCDCProcesserJob',
    'RAW_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/01_raw/',
    'STAGING_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/02_staging/',
    'PREPROCESSED_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/03_preprocessed/',
    'MASTER_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/04_master/',
    'CURATED_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/06_curated/',
    'TABLE_NAME': 'orders',
    'PROCESS_TYPE': 'full_load',
    'SOURCE': 'netSuite'
}

# Mock function to simulate getResolvedOptions
def get_resolved_options(args, keys):
    return {key: parameters[key] for key in keys}

# Mock sys.argv to simulate Glue job arguments
sys.argv = [
    '--JOB_NAME', parameters['JOB_NAME'],
    '--RAW_S3_PATH', parameters['RAW_S3_PATH'],
    '--STAGING_S3_PATH', parameters['STAGING_S3_PATH'],
    '--PREPROCESSED_S3_PATH', parameters['PREPROCESSED_S3_PATH'],
    '--MASTER_S3_PATH', parameters['MASTER_S3_PATH'],
    '--CURATED_S3_PATH', parameters['CURATED_S3_PATH'],
    '--TABLE_NAME', parameters['TABLE_NAME'],
    '--PROCESS_TYPE', parameters['PROCESS_TYPE'],
    '--SOURCE', parameters['SOURCE']
]

# Use the mock get_resolved_options function
args = get_resolved_options(sys.argv, [
    'JOB_NAME', 
    'RAW_S3_PATH', 
    'STAGING_S3_PATH', 
    'PREPROCESSED_S3_PATH', 
    'MASTER_S3_PATH',
    'CURATED_S3_PATH', 
    'TABLE_NAME', 
    'PROCESS_TYPE',
    'SOURCE'
])

# Set environment variables
os.environ['SPARK_VERSION'] = '3.1'

# Path to the Iceberg JAR file
iceberg_jar_path = "/opt/glue/jars/iceberg-spark3-runtime-0.12.0.jar"

# Path to the PyDeequ JAR file
pydeequ_jar_path = "/opt/glue/jars/deequ-glue-1.0-SNAPSHOT-jar-with-dependencies.jar"

# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
    .config("spark.jars", iceberg_jar_path) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.master_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.master_catalog.type", "hadoop") \
    .config("spark.sql.catalog.master_catalog.warehouse", "s3://ecommerce-data-lake-us-east-1-dev/04_master/") \
    .config("spark.sql.catalog.curated_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.curated_catalog.type", "hadoop") \
    .config("spark.sql.catalog.curated_catalog.warehouse", "s3://ecommerce-data-lake-us-east-1-dev/06_curated/") \
    .getOrCreate()

glueContext = SparkSession.builder.getOrCreate()
#GlueContext(SparkContext.getOrCreate())
print(args)


{'JOB_NAME': 'FullLoadAndCDCProcesserJob', 'RAW_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/01_raw/', 'STAGING_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/02_staging/', 'PREPROCESSED_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/03_preprocessed/', 'MASTER_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/04_master/', 'CURATED_S3_PATH': 's3://ecommerce-data-lake-us-east-1-dev/06_curated/', 'TABLE_NAME': 'orders', 'PROCESS_TYPE': 'full_load', 'SOURCE': 'netSuite'}


In [3]:
# Install Deequ
subprocess.check_call([sys.executable, "-m", "pip", "install", "--target=/tmp", "pydeequ"])
sys.path.insert(0, '/tmp')

Collecting pydeequ
  Using cached pydeequ-1.3.0-py3-none-any.whl.metadata (9.5 kB)
Collecting numpy>=1.14.1 (from pydeequ)
  Using cached numpy-2.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
Collecting pandas>=0.23.0 (from pydeequ)
  Using cached pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting python-dateutil>=2.8.2 (from pandas>=0.23.0->pydeequ)
  Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting pytz>=2020.1 (from pandas>=0.23.0->pydeequ)
  Using cached pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas>=0.23.0->pydeequ)
  Using cached tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting six>=1.5 (from python-dateutil>=2.8.2->pandas>=0.23.0->pydeequ)
  Using cached six-1.16.0-py2.py3-none-any.whl.metadata (1.8 kB)
Using cached pydeequ-1.3.0-py3-none-any.whl (37 kB)
Using cached numpy-2.0.0-cp310-cp310-manylinu

[0m

In [4]:
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite

In [5]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, lit, current_date, expr, when
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [6]:
# Read source data
source_df = spark.read.format("csv").option("header", "true").load(args['RAW_S3_PATH'] + args['SOURCE'] + '/' + args['TABLE_NAME'] + '/' + args['PROCESS_TYPE'])


24/06/25 21:21:13 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3://ecommerce-data-lake-us-east-1-dev/01_raw/netSuite/orders/full_load.
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.

Py4JJavaError: An error occurred while calling o58.load.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)


In [None]:

# Data Quality Checks
def perform_data_quality_checks(df, table_name):
    # Example checks
    row_count = df.count()
    if row_count == 0:
        raise ValueError(f"Data quality check failed for {table_name}: No records found")
    else:
        print(f"Data quality check passed for {table_name}: {row_count} records found")
    return df    

# Apply data quality checks
clean_df = perform_data_quality_checks(source_df, args['TABLE_NAME'])

# Write to staging layer
clean_df.write.format("parquet").mode("overwrite").partitionBy("ingestion_date").save(args['STAGING_S3_PATH'] + args['SOURCE'] + '/' + args['TABLE_NAME'] + '/' + args['PROCESS_TYPE'])

# Define Iceberg table path
master_iceberg_table = args['MASTER_S3_PATH'] + args['TABLE_NAME']
curated_iceberg_table = args['CURATED_S3_PATH'] + args['TABLE_NAME']

if args['TABLE_NAME'] == 'customers':
    if args['PROCESS_TYPE'] == 'full':
        # Full load processing for customers
        clean_df = clean_df.withColumn("start_date", current_date()) \
                           .withColumn("end_date", lit(None).cast("date")) \
                           .withColumn("is_current", lit(True))
        
        # Mastering Customers - Type 2 Dimension
        window_spec = Window.partitionBy("customer_id").orderBy(F.desc("effective_date"))
        customers_mastered_df = clean_df.withColumn(
            "row_number", F.row_number().over(window_spec)
        ).withColumn(
            "is_current", when(col("row_number") == 1, lit(True)).otherwise(lit(False))
        ).drop("row_number")

        # Write Mastered Customers to Iceberg Table
        customers_mastered_df.write.mode("overwrite").format("iceberg").save(master_iceberg_table)

        # Write Curated Customers to Iceberg Table
        clean_df.write.format("iceberg").mode("overwrite").partitionBy("region", "effective_date").save(curated_iceberg_table)
    elif args['PROCESS_TYPE'] == 'cdc':
        # CDC processing for customers
        existing_df = spark.read.format("iceberg").load(curated_iceberg_table)
        
        # Mark existing records as not current
        updated_existing_df = existing_df.join(clean_df, "customer_id") \
                                         .withColumn("end_date", current_date()) \
                                         .withColumn("is_current", lit(False))
        
        # Insert new records
        new_records_df = clean_df.withColumn("start_date", current_date()) \
                                 .withColumn("end_date", lit(None).cast("date")) \
                                 .withColumn("is_current", lit(True))
        
        # Merge datasets
        final_df = updated_existing_df.union(new_records_df)
        final_df.write.format("iceberg").mode("append").partitionBy("region", "effective_date").save(curated_iceberg_table)
else:
    partition_column = "category" if args['TABLE_NAME'] == 'products' else "order_date"
    
    # Write to staging layer
    clean_df.write.format("parquet").mode("overwrite").partitionBy("ingestion_date").save(args['STAGING_S3_PATH'] + args['SOURCE'] + '/' + args['TABLE_NAME'] + '/' + args['PROCESS_TYPE'])
    
    # Processing for products and orders
    if args['TABLE_NAME'] == 'products':
        # Mastering Products
        products_mastered_df = clean_df.dropDuplicates(["product_id"])

        # Write Mastered Products to Iceberg Table
        products_mastered_df.write.mode("overwrite").format("iceberg").save(master_iceberg_table)

        # Write Curated Products to Iceberg Table
        products_mastered_df.write.format("iceberg").mode("overwrite").save(curated_iceberg_table)
    elif args['TABLE_NAME'] == 'orders':
        # Calculate total_amount
        augmented_df = clean_df.withColumn("total_amount", col("quantity") * col("price"))

        # Aggregated Sales
        aggregated_sales_df = augmented_df.groupBy("product_id", "order_date").agg(
            F.sum("quantity").alias("total_quantity"),
            F.sum("total_amount").alias("total_sales")
        )

        # Write Aggregated Sales to Parquet
        aggregated_sales_df.write.mode("overwrite").partitionBy("order_date").parquet(args['CURATED_S3_PATH'] + 'aggregated_sales')

        # Write Orders to Iceberg Table
        clean_df.write.format("iceberg").mode("overwrite").save(curated_iceberg_table)

# Commit job
job.commit()
