## Retrieve current AWS account and Region

In [1]:
%%local
import boto3
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']
current_region = sts_client.meta.region_name

print(f"Current AWS Account ID: {account_id}")
print(f"Current region : {current_region}")

Current AWS Account ID: 975050082261
Current region : us-west-2
Connection: project.iam | Run start time: 2025-04-01 19:00:22.668306 | Run duration : 0:00:05.798763s.


## Configure our Spark Session to use a RMS backed catalog

This code initializes a PySpark session with specific configurations for working with Apache Iceberg tables through AWS Glue catalog. It sets up:

1. A custom catalog named 'salesrep'
2. AWS Glue as the catalog implementation
3. Iceberg extensions for Spark
The session is configured to use Iceberg as the table format and AWS Glue as the metadata store, enabling data lake operations

In [2]:
%%pyspark project.spark.compatibility
from pyspark.sql import SparkSession
import logging

spark = SparkSession.builder.appName('rms_salesrep') \
.config('spark.sql.catalog.salesrep', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.defaultCatalog', 'salesrep') \
.config('spark.sql.catalog.salesrep.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
.config('spark.sql.catalog.salesrep.glue.id','<AWS_ACCOUNT_ID>:salesrep-managed-rmscatalog/dev') \
.config('spark.sql.catalog.salesrep.client.region','<AWS_REGION>') \
.config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.getOrCreate()

Executing for connection type: SPARK_GLUE, connection name: project.spark.compatibility
Creating Glue session...


'Session 484mug4zuyzq4w-8e35c621-1225-4391-a778-19916c42ca01 has been created.'

Id,Spark UI,Driver logs
484mug4zuyzq4w-8e35c621-1225-4391-a778-19916c42ca01,link,link


Session created for connection: project.spark.compatibility.

Connection: project.spark.compatibility | Run start time: 2025-04-01 19:03:43.558807 | Run duration : 0:01:24.677047s.


## Let's configure the spark session to use the public namespace/database


In [3]:
%%pyspark project.spark.compatibility
spark.sql("use public")


DataFrame[]
Connection: project.spark.compatibility | Run start time: 2025-04-01 19:09:33.529532 | Run duration : 0:00:20.343055s.


## Create a dataframe from the csv

This code sets up a data processing pipeline using PySpark. It defines a structured schema for a sales dataset. The code then reads a CSV file from an S3 location using this schema, handles date formatting, and loads it into a DataFrame. It performs basic data exploration by displaying sample rows, verifying the schema structure, and calculating statistical summaries of numerical columns. The code also counts the total number of records and creates a temporary SQL view named "view_salesrep" for potential SQL queries. This setup enables further analysis of sales representative performance data.

In [None]:
%%pyspark project.spark.compatibility
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType


# Define the schema
schema = StructType([
    StructField("date", DateType(), True),
    StructField("sales_rep_id", StringType(), True),
    StructField("total_sales", DoubleType(), True),
    StructField("deals_closed", IntegerType(), True),
    StructField("customer_satisfaction_score", DoubleType(), True)
])

# Read CSV file with defined schema
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .schema(schema) \
    .load("copy-s3-path-from-previous-section")

# Show the first few rows
df.show(5)

# Print the schema to verify
df.printSchema()

# Get basic statistics of the numerical columns
df.describe().show()

# Count total rows
print(f"Total number of records: {df.count()}")

df.createOrReplaceTempView("view_salesrep")

+----------+------------+-----------+------------+---------------------------+
|      date|sales_rep_id|total_sales|deals_closed|customer_satisfaction_score|
+----------+------------+-----------+------------+---------------------------+
|2023-01-17|     REP-332|  218797.71|           4|                        4.6|
|2023-02-16|     REP-816|  441543.91|          43|                        2.9|
|2023-02-06|     REP-918|  462372.08|          34|                        4.4|
|2022-08-20|     REP-567|  383212.67|          49|                        4.4|
|2023-02-10|     REP-530|  437005.54|          14|                        4.6|
+----------+------------+-----------+------------+---------------------------+
only showing top 5 rows

root
 |-- date: date (nullable = true)
 |-- sales_rep_id: string (nullable = true)
 |-- total_sales: double (nullable = true)
 |-- deals_closed: integer (nullable = true)
 |-- customer_satisfaction_score: double (nullable = true)

+-------+------------+-----------

## Create Iceberg table in RMS catalog

This code creates a new table named 'salesrep_performance' if it doesn't already exist. The table uses the Apache Iceberg format and is designed to store sales representative performance metrics. It includes columns for date, sales representative ID, total sales amount, number of closed deals, and customer satisfaction scores. The table is configured with RMS as the AWS write format through table properties.

In [5]:
%%pyspark project.spark.compatibility
spark.sql("""
CREATE TABLE IF NOT EXISTS salesrep_performance (
    date DATE,
    sales_rep_id  STRING,
    total_sales FLOAT,
    deals_closed INTEGER,
    customer_satisfaction_score FLOAT
)USING iceberg TBLPROPERTIES ('aws.write.format'='RMS')
""")


DataFrame[]
Connection: project.spark.compatibility | Run start time: 2025-04-01 19:10:53.648018 | Run duration : 0:00:14.810925s.


## Add data into iceberg table from temporary view

In [7]:
%%pyspark project.spark.compatibility
spark.sql("insert into salesrep_performance select * from view_salesrep")


DataFrame[]
Connection: project.spark.compatibility | Run start time: 2025-04-01 19:11:23.054979 | Run duration : 0:00:17.088057s.
