In [None]:
# Import necessary libraries
import sys
from awsglue.transforms import *  # AWS Glue built-in transformations
from awsglue.utils import getResolvedOptions  # Utility to retrieve job parameters
from pyspark.context import SparkContext  # Entry point to Spark functionality
from awsglue.context import GlueContext  # AWS Glue-specific context extending Spark
from awsglue.job import Job  # AWS Glue Job class for managing job lifecycle
from awsgluedq.transforms import EvaluateDataQuality  # Optional: For data quality checks
from awsglue import DynamicFrame  # AWS Glue DynamicFrame abstraction
import gs_derived  # Custom transformation script/module (ensure this is included in your Glue script dependencies)

In [None]:
# Define a helper function to run Spark SQL on Glue DynamicFrames
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    """
    Executes a Spark SQL query on a mapping of DynamicFrames and returns the result as a new DynamicFrame.

    Parameters:
        glueContext (GlueContext): The Glue context object.
        query (str): The SQL query to execute.
        mapping (dict): A dictionary where keys are table aliases and values are DynamicFrames.
        transformation_ctx (str): The transformation context name (used for job bookmarks and debugging).

    Returns:
        DynamicFrame: The result of the SQL query as a DynamicFrame.
    """
    for alias, frame in mapping.items():
        # Register each DynamicFrame as a temporary view for Spark SQL
        frame.toDF().createOrReplaceTempView(alias)

    # Execute SQL query using Spark
    result = spark.sql(query)

    # Convert resulting DataFrame back to a Glue DynamicFrame
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)



In [None]:
# Retrieve job parameters passed at runtime (in this case, JOB_NAME)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Initialize Spark and Glue contexts
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Initialize the Glue job with the provided JOB_NAME
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


# Define a default data quality ruleset as a multi-line string
# This ruleset can be reused across multiple nodes in the pipeline
# It ensures that the input dataset has at least one column, i.e., it is not empty schema-wise

DEFAULT_DATA_QUALITY_RULESET = """
    Rules = [
        ColumnCount > 0  # Basic check to ensure the data has at least one column
    ]
"""




In [None]:
# Load 'Product Subcategories' dataset from AWS Glue Data Catalog
# This data represents sub-categories under which products are grouped
# The source is a CSV file registered in the Glue catalog under 'mydatabase'
aw_prod_subcat_node1743761467401 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_product_subcategories_csv",
    transformation_ctx="aw_prod_subcat_node1743761467401"
)

In [None]:
# Load 'Sales 2015' dataset from AWS Glue Data Catalog
# This data contains transactional sales records for the year 2015
aw_sales_2015_node1743763329845 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_sales_2015_csv",
    transformation_ctx="aw_sales_2015_node1743763329845"
)



In [None]:
# Load 'Products' dataset from AWS Glue Data Catalog
# This table contains the master list of products sold by AdventureWorks
aw_products_node1743762111550 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_products_csv",
    transformation_ctx="aw_products_node1743762111550"
)



In [None]:
# Load 'Sales 2016' dataset from AWS Glue Data Catalog
# This data contains transactional sales records for the year 2016
aw_sales_2016_node1743764120348 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_sales_2016_csv",
    transformation_ctx="aw_sales_2016_node1743764120348"
)



In [None]:
# Load 'Product Categories' dataset from AWS Glue Data Catalog
# This table contains the high-level product categories that group various product subcategories
# Essential for building a product hierarchy or classification
aw_product_category_node1743746696358 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_product_categories_csv",
    transformation_ctx="aw_product_category_node1743746696358"
)



In [None]:
# Load 'Returns' dataset from AWS Glue Data Catalog
# This table includes records of returned items, useful for return rate analysis or customer behavior insights
aw_returns_node1743762470029 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_returns_csv",
    transformation_ctx="aw_returns_node1743762470029"
)



In [None]:
# Load 'Sales 2017' dataset from AWS Glue Data Catalog
# This dataset contains transactional sales data for the year 2017, continuing from 2015 and 2016
# Can be combined with previous years for multi-year sales trend analysis
aw_sales_2017_node1743829333023 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_sales_2017_csv",
    transformation_ctx="aw_sales_2017_node1743829333023"
)



In [None]:
# Load 'Territories' dataset from AWS Glue Data Catalog
# This table contains geographical sales regions or territories
# Useful for regional performance analysis, sales distribution, or customer segmentation
aw_territories_node1743765379454 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_territories_csv",
    transformation_ctx="aw_territories_node1743765379454"
)



In [None]:
# Load 'Customers' dataset from AWS Glue Data Catalog
# This table holds customer details, crucial for mapping transactions to customer profiles
# Enables customer-level analytics, lifetime value calculations, and segmentation
aw_customers_node1743675465432 = glueContext.create_dynamic_frame.from_catalog(
    database="mydatabase",
    table_name="adventureworks_customers_csv",
    transformation_ctx="aw_customers_node1743675465432"
)



In [None]:
# Convert 'Product Subcategories' schema: Cast long keys to int for consistency and efficiency
# Helps ensure compatibility when joining with other tables that use 'int' keys
longint_node1743761564692 = ApplyMapping.apply(
    frame=aw_prod_subcat_node1743761467401,
    mappings=[
        ("productsubcategorykey", "long", "productsubcategorykey", "int"),
        ("subcategoryname", "string", "subcategoryname", "string"),
        ("productcategorykey", "long", "productcategorykey", "int")
    ],
    transformation_ctx="longint_node1743761564692"
)



In [None]:
# Convert 'Products' schema: Cast long keys to int and retain original types for product attributes
# Ensures all foreign keys align with other tables, and prepares data for downstream analytics or joins
longint_node1743762226688 = ApplyMapping.apply(
    frame=aw_products_node1743762111550,
    mappings=[
        ("productkey", "long", "productkey", "int"),
        ("productsubcategorykey", "long", "productsubcategorykey", "int"),
        ("productsku", "string", "productsku", "string"),
        ("productname", "string", "productname", "string"),
        ("modelname", "string", "modelname", "string"),
        ("productdescription", "string", "productdescription", "string"),
        ("productcolor", "string", "productcolor", "string"),
        ("productsize", "string", "productsize", "string"),
        ("productstyle", "string", "productstyle", "string"),
        ("productcost", "double", "productcost", "double"),
        ("productprice", "double", "productprice", "double")
    ],
    transformation_ctx="longint_node1743762226688"
)



In [None]:
# Convert 'Product Categories' schema: Change long key to int and retain category name
# Keeps the schema lightweight and ensures proper foreign key matching during joins
productcategory_to_int_node1743746865206 = ApplyMapping.apply(
    frame=aw_product_category_node1743746696358,
    mappings=[
        ("productcategorykey", "long", "productcategorykey", "int"),
        ("categoryname", "string", "categoryname", "string")
    ],
    transformation_ctx="productcategory_to_int_node1743746865206"
)




In [None]:
# Define SQL query to clean and standardize the 'returndate' column in the 'aw_returns' dataset
# This step:
#   1. Filters rows where 'returndate' follows the expected MM/dd/yyyy format using regex (RLIKE)
#   2. Converts valid 'returndate' strings into standard DATE type in yyyy-MM-dd format using Spark SQL's to_date()

SqlQuery0 = '''
SELECT
    *,
    to_date(returndate, 'MM/dd/yyyy') AS returndate
FROM
    aw_returns
WHERE
    returndate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'
'''



In [None]:
# Apply the SQL query to the 'aw_returns' DynamicFrame using a temporary Spark SQL view
# The result is returned as a new DynamicFrame with cleaned 'returndate' column
returndateyyyymmdd_node1743762532832 = sparkSqlQuery(
    glueContext,
    query=SqlQuery0,
    mapping={"aw_returns": aw_returns_node1743762470029},
    transformation_ctx="returndateyyyymmdd_node1743762532832"
)


In [None]:
# Define SQL query to standardize 'orderdate' and 'stockdate' fields across 2015, 2016, and 2017 sales datasets
# Key tasks:
#   - Filters records with correctly formatted dates using regex (MM/dd/yyyy)
#   - Converts string dates to standard DATE type using TO_DATE()
#   - Adds a 'sales_year' column to track origin year after merging datasets
#   - Uses UNION ALL to vertically concatenate cleaned data from all three years

SqlQuery1 = '''
SELECT
    *,
    TO_DATE(orderdate, 'MM/dd/yyyy') AS orderdate,
    TO_DATE(stockdate, 'MM/dd/yyyy') AS stockdate,
    2015 AS sales_year  -- Distinguishing column for source year
FROM
    aw_sales_2015
WHERE
    orderdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'
    AND stockdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'

UNION ALL

SELECT
    *,
    TO_DATE(orderdate, 'MM/dd/yyyy') AS orderdate,
    TO_DATE(stockdate, 'MM/dd/yyyy') AS stockdate,
    2016 AS sales_year  -- Distinguishing column for source year
FROM
    aw_sales_2016
WHERE
    orderdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'
    AND stockdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'

UNION ALL

SELECT
    *,
    TO_DATE(orderdate, 'MM/dd/yyyy') AS orderdate,
    TO_DATE(stockdate, 'MM/dd/yyyy') AS stockdate,
    2017 AS sales_year  -- Distinguishing column for source year
FROM
    aw_sales_2017
WHERE
    orderdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'
    AND stockdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$';
'''

# Execute the SQL query using temporary views created for each sales dataset
# Outputs a unified DynamicFrame with standardized date formats and year tagging
orderdatestockdatedate_node1743763366006 = sparkSqlQuery(
    glueContext,
    query=SqlQuery1,
    mapping={
        "aw_sales_2015": aw_sales_2015_node1743763329845,
        "aw_sales_2016": aw_sales_2016_node1743764120348,
        "aw_sales_2017": aw_sales_2017_node1743829333023
    },
    transformation_ctx="orderdatestockdatedate_node1743763366006"
)


In [None]:
# Convert long-type fields to int in the 'aw_territories' dataset
# Ensures compatibility with Redshift or downstream schema that expects 'int'
longint_node1743765434879 = ApplyMapping.apply(
    frame=aw_territories_node1743765379454,
    mappings=[
        ("salesterritorykey", "long", "salesterritorykey", "int"),
        ("region", "string", "region", "string"),
        ("country", "string", "country", "string"),
        ("continent", "string", "continent", "string")
    ],
    transformation_ctx="longint_node1743765434879"
)



In [None]:
# Derive a new 'CustomerName' column by concatenating Prefix, FirstName, and LastName
# Format: "Mr John Doe" — improves usability for reporting and analytics
ConcatenateNames_node1743675560783 = aw_customers_node1743675465432.gs_derived(
    colName="CustomerName",
    expr="concat(Prefix, ' ', FirstName, ' ', LastName)"
)



In [None]:
# Convert bigint columns to int in the cleaned returns dataset
# Ensures type consistency across pipeline and prepares data for joins/exports
longint_node1743762747732 = ApplyMapping.apply(
    frame=returndateyyyymmdd_node1743762532832,
    mappings=[
        ("returndate", "date", "returndate", "date"),
        ("territorykey", "bigint", "territorykey", "int"),
        ("productkey", "bigint", "productkey", "int"),
        ("returnquantity", "bigint", "returnquantity", "int")
    ],
    transformation_ctx="longint_node1743762747732"
)



In [None]:
# Convert types and retain necessary columns from unified sales dataset (2015–2017)
# Converts bigints to int for compatibility and retains orderlineitem + quantity as long
longintremoveorderlineitemorderquantity_node1743763724020 = ApplyMapping.apply(
    frame=orderdatestockdatedate_node1743763366006,
    mappings=[
        ("orderdate", "date", "orderdate", "date"),
        ("stockdate", "date", "stockdate", "stockdate"),
        ("ordernumber", "string", "ordernumber", "string"),
        ("productkey", "bigint", "productkey", "int"),
        ("customerkey", "bigint", "customerkey", "int"),
        ("territorykey", "bigint", "territorykey", "int"),
        ("orderlineitem", "bigint", "orderlineitem", "long"),
        ("orderquantity", "bigint", "orderquantity", "long"),
        ("sales_year", "int", "sales_year", "int")
    ],
    transformation_ctx="longintremoveorderlineitemorderquantity_node1743763724020"
)



In [None]:
# Clean and standardize 'annualincome' column from string currency to integer
# Removes $ and commas, then casts to int — essential for numerical analysis
ChangetheAnnualIncomeRepresentation_node1743676089294 = ConcatenateNames_node1743675560783.gs_derived(
    colName="annualincome",
    expr="cast(replace(replace(annualincome, '$', ''), ',', '') as int)"
)


In [None]:
# Convert 'birthdate' from string (MM/dd/yyyy) to date type
# Ensures consistent date format and enables time-based analytics (e.g., age grouping)
# Filters out invalid date formats using regex before conversion
SqlQuery2 = '''
SELECT
    *,
    to_date(birthdate, 'MM/dd/yyyy') AS birthdate
FROM
    aw_customers
WHERE
    birthdate RLIKE '^[0-9]{2}/[0-9]{2}/[0-9]{4}$'
'''

birthdatestringdate_node1743751737621 = sparkSqlQuery(
    glueContext,
    query=SqlQuery2,
    mapping={"aw_customers": ChangetheAnnualIncomeRepresentation_node1743676089294},
    transformation_ctx="birthdatestringdate_node1743751737621"
)


In [None]:
# Drop unnecessary columns after concatenating customer name
RemovePrefixFirstNameLastName_node1743675908587 = DropFields.apply(
    frame=birthdatestringdate_node1743751737621,
    paths=["prefix", "firstname", "lastname"],
    transformation_ctx="RemovePrefixFirstNameLastName_node1743675908587"
)



In [None]:
# Cast 'totalchildren' from bigint to int and finalize customer schema
TotalChildrenint_node1743760550959 = ApplyMapping.apply(
    frame=RemovePrefixFirstNameLastName_node1743675908587,
    mappings=[
        ("customerkey", "bigint", "customerkey", "int"),
        ("birthdate", "date", "birthdate", "date"),
        ("maritalstatus", "string", "maritalstatus", "string"),
        ("gender", "string", "gender", "string"),
        ("emailaddress", "string", "emailaddress", "string"),
        ("annualincome", "int", "annualincome", "int"),
        ("totalchildren", "bigint", "totalchildren", "int"),
        ("educationlevel", "string", "educationlevel", "string"),
        ("occupation", "string", "occupation", "string"),
        ("homeowner", "string", "homeowner", "string"),
        ("CustomerName", "string", "CustomerName", "string")
    ],
    transformation_ctx="TotalChildrenint_node1743760550959"
)



In [None]:
# ==============================
# EXPORTING TO S3 w/ QUALITY CHECKS
# ==============================

# Each export block evaluates data quality using default rules,
# then writes cleaned DataFrames to versioned S3 folders with Snappy compression

exports = [
    (longint_node1743761564692, "aw_product_subcategories", "ExporttoS3_node1743761671860"),
    (longint_node1743762226688, "aw_products", "ExporttoS3_node1743762306649"),
    (productcategory_to_int_node1743746865206, "aw_product_category_cleaned", "ExporttoS3_node1743746915045"),
    (longint_node1743765434879, "aw_territories", "ExportS3_node1743765449831"),
    (longint_node1743762747732, "aw_returns", "ExporttoS3_node1743763059295"),
    (longintremoveorderlineitemorderquantity_node1743763724020, "aw_sales", "ExporttoS3_node1743763820156"),
    (TotalChildrenint_node1743760550959, "aw_customers_cleaned", "ExporttoS3_node1743677309923")
]

for df, folder, node in exports:
    EvaluateDataQuality().process_rows(
        frame=df,
        ruleset=DEFAULT_DATA_QUALITY_RULESET,
        publishing_options={
            "dataQualityEvaluationContext": f"EvaluateDataQuality_{node}",
            "enableDataQualityResultsPublishing": True
        },
        additional_options={
            "dataQualityResultsPublishing.strategy": "BEST_EFFORT",
            "observations.scope": "ALL"
        }
    )
    glueContext.write_dynamic_frame.from_options(
        frame=df,
        connection_type="s3",
        format="glueparquet",
        connection_options={"path": f"s3://your-s3-bucket-name/cleaned_data/{folder}/", "partitionKeys": []},
        format_options={"compression": "snappy"},
        transformation_ctx=node
    )

# Finalize the job
job.commit()
