# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session. 
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %glue_ray           String        Sets the session type to Glue Ray.
    %session_type       String        Specify a session_type to be used. Supported values: streaming, etl and glue_ray. 
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----
                                      
## Magic for Ray Session

----
    %min_workers        Int           The minimum number of workers that are allocated to a Ray session. 
                                      Default: 1.
    %object_memory_head Int           The percentage of free memory on the instance head node after a warm start. 
                                      Minimum: 0. Maximum: 100.
    %object_memory_worker Int         The percentage of free memory on the instance worker nodes after a warm start. 
                                      Minimum: 0. Maximum: 100.
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
----


## Amazon Q Magic 

----

    %%chat              String        Query Amazon Q. All lines after the initial %%chat magic will be passed
                                      to Amazon Q as part of your prompt. (For PySpark Kernel only)
----


In [2]:
%%configure -f
{
    "datalake-formats": "delta"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.2 
The following configurations have been updated: {'datalake-formats': 'delta'}


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 60
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
import delta

  
sc = (
        SparkSession.builder.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        # Config to allow Spark to read/write dates from parquet correctly after version Spark 3.0, details in SPARK-31404
        .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
        .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
        .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
        .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
        .config("spark.sql.avro.datetimeRebaseModeInWrite", "CORRECTED")
        .config("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")
        .config("spark.databricks.delta.fixSchema.GlueCatalog", "true")
        .getOrCreate()
    )
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 2
Session ID: 210dac77-28b9-4401-8857-db5145d65d4f
Applying the following default arguments:
--glue_kernel_version 1.0.2
--enable-glue-datacatalog true
--datalake-formats delta
Waiting for session 210dac77-28b9-4401-8857-db5145d65d4f to get into ready status...
Session 210dac77-28b9-4401-8857-db5145d65d4f has been created.



#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:
# Read Delta Table as Dynamic Frame
old_delta_table_path = "s3://dl-staging-full-data-lake-test/postgres/staging_db_postgres_newschema/addr/"
new_delta_table_path = "s3://dl-staging-full-data-lake-test/postgres/staging_db_postgres_newschema/new_addr_from_notebook/"
new_table_name = "new_addr"
database="staging_db_postgres_newschema"
dyf = glueContext.create_data_frame.from_catalog(
    database=database,
    table_name="addr",
    additional_options={
        "path": old_delta_table_path
    }
)

dyf.printSchema()

root
 |-- Op: string (nullable = true)
 |-- cdc_timestamp: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- field_a: string (nullable = true)
 |-- field_b: string (nullable = true)
 |-- partition_0: string (nullable = true)



In [3]:
dyf.show(5)

+---+--------------------+---+-------+-------+-----------+
| Op|       cdc_timestamp| id|field_a|field_b|partition_0|
+---+--------------------+---+-------+-------+-----------+
|  I|2023-07-04 16:08:...|  4|     DD|     DD| 2002-01-01|
|  I|2023-07-04 16:08:...|  6|     FF|     FF| 2002-01-01|
|  I|2023-07-04 16:08:...|  7|     GG|     GG| 2002-01-01|
|  I|2023-07-04 16:08:...|  3|     CC|     CC| 2002-01-01|
|  I|2023-07-04 16:08:...|  9|     II|     II| 2002-01-01|
+---+--------------------+---+-------+-------+-----------+
only showing top 5 rows


In [4]:
# Convert Dynamic frame to PySpark DF
df = dyf.toDF(*dyf.columns)

df.show(5)

+---+--------------------+---+-------+-------+-----------+
| Op|       cdc_timestamp| id|field_a|field_b|partition_0|
+---+--------------------+---+-------+-------+-----------+
|  I|2023-07-04 16:08:...|  4|     DD|     DD| 2002-01-01|
|  I|2023-07-04 16:08:...|  6|     FF|     FF| 2002-01-01|
|  I|2023-07-04 16:08:...|  7|     GG|     GG| 2002-01-01|
|  I|2023-07-04 16:08:...|  3|     CC|     CC| 2002-01-01|
|  I|2023-07-04 16:08:...|  9|     II|     II| 2002-01-01|
+---+--------------------+---+-------+-------+-----------+
only showing top 5 rows


In [9]:
# Write PySpark df as new delta table - BROKEN
new_table_name = "new_addr"
database="staging_db_postgres_newschema"
additional_options = {
    "path": new_delta_table_path,
    # "mergeSchema": True
}
df.coalesce(1).write \
    .format("delta") \
    .options(**additional_options) \
    .mode("overwrite") \
    .saveAsTable(f"{database}.{new_table_name}")

Py4JError: An error occurred while calling o153.toString. Trace:
java.lang.IllegalArgumentException: object is not an instance of declaring class
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)




In [10]:
# Write Delta table with Glue API - Broken
new_table_name = "new_addr"
database="staging_db_postgres_newschema"
new_delta_table_path = "s3://dl-staging-full-data-lake-test/postgres/staging_db_postgres_newschema/new_addr_from_notebook/"
glueContext.write_dynamic_frame.from_catalog(
    frame=dyf,
    database = database, 
    table_name = new_table_name, 
    transformation_ctx = "datasource0", 
    additional_options={
        "path": new_delta_table_path
    })

Py4JJavaError: An error occurred while calling o83.getCatalogSink.
: com.amazonaws.services.glue.model.EntityNotFoundException: Entity Not Found (Service: AWSGlue; Status Code: 400; Error Code: EntityNotFoundException; Request ID: d8c0f43f-2425-4a06-a269-c00b98fc2be3; Proxy: null)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
	at com.amazonaws.h

In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)