### Initialization

### This notebook demostrates Lake Formation Governed Tables ACID Transactions behavior in below scenarios:
  Scenario A: User A makes changes using T1 and commit transaction. User B queries table using T2 and see changes.<br>
  Scenario B: User A makes changes using T3 but does not commit transaction. User B queries using T4 and cannot see the changes.<br>
  Scenario C: User A commits transaction T3, User B again queries using T4 but still cannot see the changes since T4 started before T3 is committed.<br>
  Scenario D: User A make changes to multiple tables using transaction T5, but if writes to one of the tables fail then transaction T5 would aborted and changes performed to none of the tables under T5 would be visible .<br>

In [1]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame


glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)

# Please replace below with your parameters
bucket_name = "lf-data-lake-162611428811"
database_name = "governed_demo"
table_name = "employee_details"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
12,application_1631086109765_0013,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Begin Transaction (txId1)

In [2]:
txId1 = glueContext.begin_transaction(read_only=False)
print(txId1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

e295b03d40c94f2b990685380cc7c1c5

### Write additional rows and commit transaction (txId1)

In [3]:
from pyspark.sql import Row
products1 = [
    {'employee_id':1022, 'first_name': 'Ellis', 'last_name': 'Dow', 'age':40, 'city':'Denver', 'salary':4250},
     {'employee_id':1023, 'first_name': 'Wesley', 'last_name': 'Harris', 'age':41, 'city':'Boston', 'salary':4135}
]
df1 = spark.createDataFrame(Row(**x) for x in products1)
df1.show()

dyf1 = DynamicFrame.fromDF(df1, glueContext, "dyf1")

sink = glueContext.getSink(
    connection_type="s3", 
    path=f"s3://{bucket_name}/target/{table_name}/",
    enableUpdateCatalog=True, 
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=txId1
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName=table_name)

try:
    sink.writeFrame(dyf1)
    glueContext.commit_transaction(txId1)
except:
    glueContext.abort_transaction(txId1)
    raise

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+-----------+----------+---------+------+
|age|  city|employee_id|first_name|last_name|salary|
+---+------+-----------+----------+---------+------+
| 40|Denver|       1022|     Ellis|      Dow|  4250|
| 41|Boston|       1023|    Wesley|   Harris|  4135|
+---+------+-----------+----------+---------+------+

<awsglue.dynamicframe.DynamicFrame object at 0x7f38de0a45c0>
JavaObject id=o110

### Begin Transaction (txId2)

In [4]:
txId2 = glueContext.begin_transaction(read_only=False)
print(txId2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30d4216cc5364b30819578b40d3fbb6e

### Write additional rows but transaction is not committed

In [5]:
from pyspark.sql import Row
products1 = [
    {'employee_id':1024, 'first_name': 'Megan', 'last_name': 'Sisco', 'age':39, 'city':'Denver', 'salary':4320},
    {'employee_id':1025, 'first_name': 'Jenny', 'last_name': 'Weaver', 'age':40, 'city':'Boston', 'salary':3935}
]
df4 = spark.createDataFrame(Row(**x) for x in products1)
df4.show()

dyf3 = DynamicFrame.fromDF(df4, glueContext, "df3dyf")

sink = glueContext.getSink(
    connection_type="s3", 
    path=f"s3://{bucket_name}/target/{table_name}/",
    enableUpdateCatalog=True, 
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=txId2
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName=table_name)

sink.writeFrame(dyf3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+-----------+----------+---------+------+
|age|  city|employee_id|first_name|last_name|salary|
+---+------+-----------+----------+---------+------+
| 39|Denver|       1024|     Megan|    Sisco|  4320|
| 40|Boston|       1025|     Jenny|   Weaver|  3935|
+---+------+-----------+----------+---------+------+

<awsglue.dynamicframe.DynamicFrame object at 0x7f38de0a4208>

### Find employees from Denver using same transaction Id (txId2) used for writing the records.

In [6]:
dyf4 = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = table_name, 
    transformation_ctx = "dyf4", 
    additional_options = {
        "transactionId": txId2
    }
)
df5 = dyf4.toDF()
df6 = df5.select('employee_id','first_name','last_name','age','city','salary')
df6.where("city == 'Denver'").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+---------+---+------+------+
|employee_id|first_name|last_name|age|  city|salary|
+-----------+----------+---------+---+------+------+
|       1013|   William|    Moore| 37|Denver|  3998|
|       1018| Frederick|   Wilson| 38|Denver|  4500|
|       1022|     Ellis|      Dow| 40|Denver|  4250|
|       1024|     Megan|    Sisco| 39|Denver|  4320|
+-----------+----------+---------+---+------+------+

### Commit transaction (txId2)

In [7]:
glueContext.commit_transaction(txId2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

JavaObject id=o146

### Working with multiple tables

In [None]:
txId3 = glueContext.begin_transaction(read_only=False)
print(txId3)

In [None]:
from pyspark.sql import Row
products1 = [
    {'employee_id':1026, 'first_name': 'Dillan', 'last_name': 'Marks', 'age':45, 'city':'Denver', 'salary':4575},
    {'employee_id':1027, 'first_name': 'Lucas', 'last_name': 'Wyman', 'age':38, 'city':'Chicago', 'salary':4102}
]
df4 = spark.createDataFrame(Row(**x) for x in products1)
df4.show()

dyf3 = DynamicFrame.fromDF(df4, glueContext, "df3dyf")

sink = glueContext.getSink(
    connection_type="s3", 
    path=f"s3://{bucket_name}/target/{table_name}/",
    enableUpdateCatalog=True, 
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=txId3
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName="employee_details")

try:
    sink.writeFrame(dyf3)
except:
    glueContext.abort_transaction(txId3)
    raise

In [None]:
dyf5 = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = table_name, 
    transformation_ctx = "dyf5", 
    additional_options = {
        "transactionId": txId3
    }
)
df7 = dyf5.toDF()
df8 = df7.select('employee_id','first_name','last_name','age','city','salary')
df8.where("city == 'Denver'").show()

In [None]:
from pyspark.sql import Row
products1 = [
    {'employee_id':1026, 'department': 'Finance'},
    {'employee_id':1027, 'department': 'IT'}
]
df5 = spark.createDataFrame(Row(**x) for x in products1)
df5.show()

dyf4 = DynamicFrame.fromDF(df5, glueContext, "df3dyf")

sink = glueContext.getSink(
    connection_type="s3", 
    path=f"s3://{bucket_name}/target/employee_department/",
    enableUpdateCatalog=True, 
    updateBehavior="UPDATE_IN_DATABASE",
    transactionId=txId3
)
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName="employee_department")

try:
    sink.writeFrame(dyf4)
    glueContext.commit_transaction(txId3)
except:
    glueContext.abort_transaction(txId3)
    raise

In [None]:
txId4 = glueContext.begin_transaction(read_only=False)

dyf6 = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = "employee_details", 
    transformation_ctx = "dyf6", 
    additional_options = {
        "transactionId": txId4
    }
)
df9 = dyf6.toDF()
df10 = df9.select('employee_id','first_name','last_name','age','city','salary')
dfEmp = df9.where("city == 'Denver'")
dfEmp.show()

dyf7 = glueContext.create_dynamic_frame.from_catalog(
    database = database_name, 
    table_name = "employee_department", 
    transformation_ctx = "dyf7", 
    additional_options = {
        "transactionId": txId4
    }
)
df11 = dyf7.toDF()
dfDep = df11.select('employee_id','department')

dfDep.join(dfEmp,dfDep.employee_id == dfEmp.employee_id).show()

glueContext.commit_transaction(txId4)