# 🚀 SageMaker Lakehouse: Bridging Sales & Promotional Data for ML

## 📊 Overview

This notebook demonstrates how to leverage Amazon SageMaker Lakehouse to create a unified dataset combining sales and promotional data from disparate storage systems. The resulting table will serve as a foundation for machine learning model development.

## 🛠️ Technical Environment                                                                                                                                                    
We're utilizing Amazon SageMaker Lakehouse, which provides:

- 🔄 Unified access to data across Amazon S3 data lakes and Amazon Redshift data warehouses
- 🔍 Seamless querying capabilities using Apache Iceberg-compatible tools
- 💎 Zero data replication: Query data directly where it lives, eliminating the need for copies


## 🏢 Business Context

As members of the analytics team, we need to prepare a dataset that correlates sales performance with promotional activities. Our data sources span two departments:

- Central Operations 📦: Sales data stored in Amazon S3 data lake
- Back Office 💼: Promotional data maintained in Amazon Redshift

## 🎯 Objective

We will create a new analytical table that:

1. 🔗 Combines sales transaction records from S3 with promotional data from Redshift
2. 📈 Calculates the number of active promotions per product category and region at the time of each sale
3. 🤖 Provides a cleaned and optimized dataset ready to be shared with the Machine Learning team
    

💡 SageMaker Lakehouse's unified query interface eliminates the complexity of working with multiple storage solutions, allowing us to focus on the analytical requirements rather than data access mechanics.

## 🔧 Setup & Configuration

Let's initialize our development environment with the necessary libraries and Spark session configurations. The following setup enables seamless integration of Data Lake tables stored in Amazon S3 and Data Warehouse tables in Amazon Redshift.

In [None]:
# Import required libraries
import boto3
import json

# Function to retrieve the current AWS account ID
def get_account_id():
    sts = boto3.client('sts')
    return sts.get_caller_identity()['Account']

# Get current AWS account ID
account_id = get_account_id()

# Define Spark session configuration dictionary with:
# - Iceberg table format support
# - Spark SQL extensions and catalog settings

config_dict = {
    "--datalake-formats": "iceberg",
    "--conf": f"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.rms_federated_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.rms_federated_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.rms_federated_catalog.glue.id={account_id}:federated_redshift_catalog/enterprise_operations --conf spark.sql.catalog.rms_federated_catalog.client.region=us-east-1 --conf spark.sql.catalog.rms_federated_catalog.glue.account-id={account_id} --conf spark.sql.catalog.spark_catalog.client.region=us-east-1 --conf spark.sql.catalog.spark_catalog.glue.account-id={account_id}"
}

# Convert configuration to properly formatted JSON string
# Replace single quotes with double quotes for JSON compatibility
config_json = json.dumps(config_dict, indent=4).replace("'", '"')

# Get IPython interface and setup the configuration
ip = get_ipython()
ip.run_cell_magic('configure', '-f --name project.spark.fineGrained', config_json)

💡 Note: Running the next cell will initialize a Glue Interactive session and may require ~1 minute to complete.

In [None]:
%%pyspark project.spark.fineGrained
spark

## 📊 Data Exploration: Glue Data Catalog (S3 Data Lake)

Let's explore the data available in our S3-based data lake through the Glue default catalog.

List all available databases in the default Glue catalog

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"show databases").show(truncate=False)

Show tables in the `customer_insights_db` database 

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"show tables from customer_insights_db").show(truncate=False)

View `sales_data` table data (first 10 records)

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"select * from customer_insights_db.sales_data limit 10").show(truncate=False)

## 📊 Data Exploration: Redshift Federated Catalog

Now let's examine the data stored in the Redshift data warehouse.

List all databases available in Redshift.

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"show databases in rms_federated_catalog").show(truncate=False)

Show tables in Redshift's `public` schema

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"show tables from rms_federated_catalog.public").show(truncate=False)

View `promotions` table data (first 10 records)

In [None]:
%%pyspark project.spark.fineGrained
# Handle cold start issues with Redshift connection
try:
    spark.sql(f"select * from rms_federated_catalog.public.promotions limit 1").count()
except:
    pass

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"select * from rms_federated_catalog.public.promotions limit 10").show(truncate=False)

## 🔄 Preparing the Sales & Promotions Dataset for ML Modeling

This section combines sales data from our data lake with promotional information from our data warehouse to create a feature-rich dataset for future Marchine Learning modeling. The resulting table will include the count of active promotions for each sale's product category and region.

Save the enriched dataset as a new table in our project database.

In [None]:
%%pyspark project.spark.fineGrained

# Join sales data with promotions and calculate active promotions per sale
final_table = spark.sql(f"""
SELECT 
    s.*,
    COUNT(p.promotion_id) as active_promotions
FROM 
    customer_insights_db.sales_data s
LEFT JOIN 
    rms_federated_catalog.public.promotions p 
    ON s.region = p.region
    AND s.product_category = p.product_category
    AND s.order_date BETWEEN p.start_date AND p.end_date
GROUP BY 
    s.region,
    s.country,
    s.item_type,
    s.product_category,
    s.sales_channel,
    s.order_priority,
    s.order_date,
    s.order_id,
    s.ship_date,
    s.units_sold,
    s.unit_price,
    s.unit_cost,
    s.total_revenue,
    s.total_cost,
    s.total_profit
""")

# Get the project database name
project_db = spark.sql("show databases") \
    .filter("namespace != 'customer_insights_db' AND namespace != 'default'") \
    .collect()[0]['namespace']

# Create temporary view for final dataset
final_table.createOrReplaceTempView("temp_final_table")

# Create persistent table using CTAS (Create Table As Select)
spark.sql(f"""
    CREATE TABLE {project_db}.sales_table_enriched_w_campaigns
    USING PARQUET
    AS 
    SELECT * FROM temp_final_table
""")

Preview the newly created enriched table

In [None]:
%%pyspark project.spark.fineGrained
spark.sql(f"select * from {project_db}.sales_table_enriched_w_campaigns limit 10").show(truncate=False)