In [None]:
%%pyspark project.spark.compatibility
%streaming -f
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.sql.functions import when, lit
from pyspark.sql import SparkSession
import logging
import boto3

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def get_secret(parameter_name):
    """Retrieve secret from AWS Parameter Store"""
    try:
        ssm = boto3.client('ssm', region_name='sa-east-1')  # adjust region as needed
        response = ssm.get_parameter(
            Name=parameter_name,
            WithDecryption=True
        )
        return response['Parameter']['Value']
    except Exception as e:
        logger.error(f"Error retrieving parameter {parameter_name}: {str(e)}")
        raise

# Fetch credentials from Parameter Store
try:
    kds_arn = get_secret('/itau/kds/cdc_arn')
    logger.info("Successfully retrieved database credentials from Parameter Store")
except Exception as e:
    logger.error("Failed to retrieve credentials from Parameter Store")
    raise

spark = SparkSession.builder \
                    .appName("PostgreSQL to S3 ETL") \
                    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
                    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
                    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://amazon-sagemaker-825765423553-sa-east-1-dac29af4cfb9/iceberg_catalog/") \
                    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
                    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
                    .config("spark.sql.iceberg.handle-timestamp-without-timezone", True) \
                    .getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)

schema = StructType([
    StructField("data", StructType([
        StructField("customer_id", LongType()),
        StructField("first_name", StringType()),
        StructField("last_name", StringType()),
        StructField("date_of_birth", DateType()),
        StructField("email", StringType()),
        StructField("phone_number", StringType()),
        StructField("employment_status", StringType()),
        StructField("annual_income", LongType()),
        StructField("created_at", TimestampType()),
        StructField("updated_at", TimestampType())
    ])),
    StructField("metadata", StructType([
        StructField("timestamp", StringType()),
        StructField("record-type", StringType()),
        StructField("operation", StringType()),
        StructField("partition-key-type", StringType()),
        StructField("schema-name", StringType()),
        StructField("table-name", StringType())
    ]))
])

raw_df = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "streamARN": kds_arn,
        "initialPosition": "LATEST",
        "inferSchema": "true",
        "classification": "json"
    },
    transformation_ctx="raw_df"
)

def process_batch(df, epoch_id):
    ssc_df = df.select(
        from_json(col("$json$data_infer_schema$_temporary$"), schema).alias("parsed_data")
    ).select("parsed_data.*") \
    .select(
        col("data.customer_id").alias("customer_id"),
        col("data.first_name").alias("first_name"),
        col("data.last_name").alias("last_name"),
        col("data.date_of_birth").alias("date_of_birth"),
        col("data.email").alias("email"),
        col("data.phone_number").alias("phone_number"),
        col("data.employment_status").alias("employment_status"),
        col("data.annual_income").alias("annual_income"),
        col("data.created_at").alias("created_at"),
        col("data.updated_at").alias("updated_at"),
        col("metadata.timestamp").alias("event_timestamp"),
        col("metadata.record-type").alias("record_type"),
        col("metadata.operation").alias("operation")
    )
    
    ssc_df.createOrReplaceGlobalTempView('ssc_table')
    
    merge_sql = """
        MERGE INTO glue_db_aw53flfpa5qkyj.customer AS t
        USING global_temp.ssc_table AS s
        ON t.customer_id = s.customer_id
        WHEN MATCHED AND s.operation = 'update' THEN
            UPDATE SET
                first_name = s.first_name,
                last_name = s.last_name,
                date_of_birth = s.date_of_birth,
                email = s.email,
                phone_number = s.phone_number,
                employment_status = s.employment_status,
                annual_income = s.annual_income,
                created_at = s.created_at,
                updated_at = s.updated_at
        WHEN MATCHED AND s.operation = 'delete' THEN
            DELETE
        WHEN NOT MATCHED AND s.operation = 'insert' THEN
            INSERT *
    """
    
    spark.sql(merge_sql)
    spark.catalog.dropGlobalTempView("ssc_table")

final_df = raw_df \
    .writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "s3://itau-sm-demo-825765423553/streaming_checkpoint") \
    .trigger(processingTime="1 second") \
    .start() \
    .awaitTermination()

Executing for connection type: SPARK_GLUE, connection name: project.spark.compatibility


"The following configurations have been updated: {'session_type': 'streaming'}"

Creating Glue session...


'Session 4cmygnn241c1zv-d12ec11c-f515-4121-8567-d93b1abb6cdf has been created.'

Id,Spark UI,Driver logs
4cmygnn241c1zv-d12ec11c-f515-4121-8567-d93b1abb6cdf,link,link


Session created for connection: project.spark.compatibility.


In [None]:
%%pyspark project.spark.compatibility
