
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_configuration     |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |

In [1]:
%glue_version 3.0
%idle_timeout 60
%connections iceberg-connection
%%configure
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.32 and you have 0.30 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Setting Glue version to: 3.0
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
Connections to be included:
iceberg-connection
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions'}


In [2]:
%session_id

There is no current session.


In [3]:
RAW_S3_PATH = 's3://aws-glue-input-parquet-atq4q5u/full-load/human_resources/employee_details/'
CATALOG = 'glue_catalog'
ICEBERG_S3_PATH = 's3://aws-glue-output-iceberg-atq4q5u'
DATABASE = 'human_resources'
TABLE_NAME = 'employee_details_iceberg'
PK = 'emp_no'
PARTITION = 'department'
DYNAMODB_LOCK_TABLE = 'employee_details_lock'

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::123456789012:role/GlueJobRole
Attempting to use existing AssumeRole session credentials.
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 8f744449-499c-4f28-88b9-cfd432357b15
Applying the following default arguments:
--glue_kernel_version 0.30
--enable-glue-datacatalog true
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Waiting for session 8f744449-499c-4f28-88b9-cfd432357b15 to get into ready status...
Session 8f744449-499c-4f28-88b9-cfd432357b15 has been created




In [4]:
%session_id

Current active Session ID: 8f744449-499c-4f28-88b9-cfd432357b15


In [1]:
spark.stop()
sc.stop()




In [2]:
import sys
from datetime import datetime

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

from pyspark.conf import SparkConf
from pyspark.sql.window import Window
from pyspark.sql.functions import (
  concat,
  col,
  lit,
  max,
  rank,
  to_timestamp
)

conf = SparkConf()

conf.set(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog")
conf.set(f"spark.sql.catalog.{CATALOG}.warehouse", ICEBERG_S3_PATH)
conf.set(f"spark.sql.catalog.{CATALOG}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set(f"spark.sql.catalog.{CATALOG}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set(f"spark.sql.catalog.{CATALOG}.lock-impl", "org.apache.iceberg.aws.glue.DynamoLockManager")
conf.set(f"spark.sql.catalog.{CATALOG}.lock.table", DYNAMODB_LOCK_TABLE)
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

glueContext = GlueContext(SparkContext(conf=conf))
spark = glueContext.spark_session




In [3]:
cdcDynamicFrame = glueContext.create_dynamic_frame_from_options(
  connection_type='s3',
  connection_options={
    'paths': [f'{RAW_S3_PATH}'],
    'groupFiles': 'none',
    'recurse': True
  },
  format='parquet',
  transformation_ctx='cdcDyf')




In [4]:
print(f"Count of CDC data after last job bookmark:{cdcDynamicFrame.count()}")

Count of CDC data after last job bookmark:5


In [5]:
cdcDF = cdcDynamicFrame.toDF()
cdcDF.printSchema()

root
 |-- Op: string (nullable = true)
 |-- emp_no: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- m_time: timestamp (nullable = true)


In [6]:
cdcDF = cdcDF.withColumn('m_time', to_timestamp(col('m_time')))
cdcDF.printSchema()

root
 |-- Op: string (nullable = true)
 |-- emp_no: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- m_time: timestamp (nullable = true)


In [7]:
IDWindowDF = Window.partitionBy(cdcDF.emp_no).orderBy(cdcDF.m_time).rangeBetween(-sys.maxsize, sys.maxsize)
inputDFWithTS = cdcDF.withColumn("max_op_date", max(cdcDF.m_time).over(IDWindowDF))
inputDFWithTS.printSchema()

root
 |-- Op: string (nullable = true)
 |-- emp_no: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- m_time: timestamp (nullable = true)
 |-- max_op_date: timestamp (nullable = true)


In [8]:
newInsertedDF = inputDFWithTS.filter("m_time=max_op_date").filter("Op='I'")
updatedOrDeletedDF = inputDFWithTS.filter("m_time=max_op_date").filter("Op IN ('U', 'D')")
finalInputDF = newInsertedDF.unionAll(updatedOrDeletedDF)
finalInputDF.printSchema()

root
 |-- Op: string (nullable = true)
 |-- emp_no: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- m_time: timestamp (nullable = true)
 |-- max_op_date: timestamp (nullable = true)


In [9]:
CURRENT_DATETIME = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
finalInputDF = finalInputDF.withColumn('last_applied_date', to_timestamp(lit(CURRENT_DATETIME)))
finalInputDF.printSchema()

root
 |-- Op: string (nullable = true)
 |-- emp_no: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- m_time: timestamp (nullable = true)
 |-- max_op_date: timestamp (nullable = true)
 |-- last_applied_date: timestamp (nullable = true)


In [10]:
cdcInsertCount = finalInputDF.filter("Op = 'I'").count()
cdcUpdateCount = finalInputDF.filter("Op = 'U'").count()
cdcDeleteCount = finalInputDF.filter("Op = 'D'").count()
totalCDCCount = finalInputDF.count()

print(f"Inserted count:  {cdcInsertCount}")
print(f"Updated count:   {cdcUpdateCount}")
print(f"Deleted count:   {cdcDeleteCount}")
print(f"Total CDC count: {totalCDCCount}")

Inserted count:  2
Updated count:   2
Deleted count:   1
Total CDC count: 5


In [11]:
tablesDF = spark.sql(f"SHOW TABLES IN {DATABASE}")
table_list = tablesDF.select('tableName').rdd.flatMap(lambda x: x).collect()
table_list

['employee_details_iceberg']


In [12]:
dropColumnList = ['Op', 'schema_name', 'table_name', 'max_op_date']




In [13]:
upsertedDF = finalInputDF.filter("Op != 'D'").drop(*dropColumnList)
upsertedDF.count()

4


In [14]:
upsertedDF.createOrReplaceTempView(f"{TABLE_NAME}_upsert")




In [15]:
spark.sql(f"""MERGE INTO {CATALOG}.{DATABASE}.{TABLE_NAME} t
        USING {TABLE_NAME}_upsert s ON s.{PK} = t.{PK}
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """)

DataFrame[]


In [16]:
deletedDF = finalInputDF.filter("Op = 'D'").drop(*dropColumnList)
deletedDF.count()

1


In [17]:
deletedDF.createOrReplaceTempView(f"{TABLE_NAME}_delete")




In [18]:
spark.sql(f"""MERGE INTO {CATALOG}.{DATABASE}.{TABLE_NAME} t
        USING {TABLE_NAME}_delete s ON s.{PK} = t.{PK}
        WHEN MATCHED THEN DELETE
        """)

DataFrame[]


In [19]:
spark.sql(f"SELECT * FROM {CATALOG}.{DATABASE}.{TABLE_NAME} limit 5").show()

+------+-----+-------------+---------+------+-------------------+-------------------+
|emp_no| name|   department|     city|salary|             m_time|  last_applied_date|
+------+-----+-------------+---------+------+-------------------+-------------------+
|     9|  Eli|   Purchasing|  Chicago| 90000|2022-07-29 07:44:34|2022-08-21 05:55:04|
|     2|Susan|        Sales|New Delhi| 60000|2022-07-24 15:17:36|2022-08-21 05:55:04|
|     5|  Joe|           IT|  Chicago| 70000|2022-07-24 15:18:04|2022-08-21 05:55:04|
|     8| John|        Sales|      SFO| 90000|2022-07-29 07:44:20|2022-08-21 05:55:04|
|     4| Bill|Manufacturing|New Delhi| 70000|2022-07-24 15:17:54|2022-08-14 13:15:59|
+------+-----+-------------+---------+------+-------------------+-------------------+


In [20]:
print(f"Total count of {TABLE_NAME} Table Results:\n")
countDF = spark.sql(f"SELECT count(*) FROM {CATALOG}.{DATABASE}.{TABLE_NAME}")
print(f"{countDF.show()}")

Total count of employee_details_iceberg Table Results:

+--------+
|count(1)|
+--------+
|       8|
+--------+

None
