In [None]:
%profile profile_for_article
%iam_role arn:aws:iam::<<your account ID>>:role/iamr-glueintsessionsdemo
%idle_timeout 10
%number_of_workers 2

In [None]:
%%configure
{
    "--s3_bucket_name" : "s3-glueintsessionsdemo-data"
}

In [None]:
%%configure
{
    "--datalake-formats" : "iceberg",
    "--conf" : "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://s3-glueintsessionsdemo-data/iceberg/ --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO"
}

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

In [None]:
from awsglue.utils import getResolvedOptions
parameter_name = 's3_bucket_name'
args = getResolvedOptions(sys.argv,[parameter_name])
s3_bucket_name = args[parameter_name]

In [None]:
# 1. Read dataframe directly from s3 object using the variable
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3://{s3_bucket_name}/in/lakes/")

In [None]:
# 2. Convert it into parquet format and writes to specified S3 path using the variable
df.write.mode("overwrite").parquet(f"s3://{s3_bucket_name}/out/lakes/")

In [None]:
# 3. Prepare aggregated dataframe
agg_df = df.groupBy("continent").count().withColumnRenamed("count", "number_of_lakes")

# Outputs dataframe
agg_df.show()

In [None]:
# 4. Write aggregated dataframe to Iceberg table
agg_df.createOrReplaceTempView("tmp_lakes")

query = f"""
CREATE TABLE IF NOT EXISTS glue_catalog.default.lakes_iceberg
USING iceberg
AS SELECT * FROM tmp_lakes
"""
spark.sql(query)

In [None]:
%stop_session