
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [1]:
# %stop_session

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 


# Step 1: Define your configurations

In [9]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 3
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
%additional_python_modules Faker

Connections to be included:
hudi-connection
Setting Glue version to: 3.0
Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 3
Previous Spark configuration: None
Setting new Spark configuration to: spark.serializer=org.apache.spark.serializer.KryoSerializer
Additional python modules to be included:
Faker


# Step 2: Define your Imports

In [1]:
try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime
    import boto3
    from functools import reduce
except Exception as e:
    pass

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 3
Session ID: dc1699e3-9e72-4935-908c-c01aeb33d39b
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--additional-python-modules Faker
Waiting for session dc1699e3-9e72-4935-908c-c01aeb33d39b to get into ready status...
Session dc1699e3-9e72-4935-908c-c01aeb33d39b has been created.



# Step 3:  Create Spark Session

In [63]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




# Step 4: Define Data Generator Class 

In [64]:
from faker import Faker
global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                x,
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                faker.random_int(min=10000, max=150000),
                faker.random_int(min=18, max=60),
                faker.random_int(min=0, max=100000),
                faker.unix_time()
            ) for x in range(3)
        ]





# Step 5: Create Spark Data frame

In [65]:
data = DataGenerator.get_data()

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
spark_df = spark.createDataFrame(data=data, schema=columns)




In [66]:
spark_df.show()

+------+----------------+----------+-----+------+---+-----+----------+
|emp_id|   employee_name|department|state|salary|age|bonus|        ts|
+------+----------------+----------+-----+------+---+-----+----------+
|     0|Michael Schwartz|        HR|   NY|101517| 59|20692| 510575555|
|     1|    Antonio Hale|        IT|   RJ| 18697| 58| 7399| 318481455|
|     2|  Lauren Oconnor| Marketing|   IL| 32029| 58|62591|1483855109|
+------+----------------+----------+-----+------+---+-----+----------+


# Step 7:  Define your HUDI Settings 

In [67]:
db_name = "hudidb"
table_name="hudi_table"

recordkey = 'emp_id'
precombine = 'ts'

path = "s3://soumil-dms-learn/hudi/hudi_table/"
method = 'upsert'
table_type = "COPY_ON_WRITE"




In [68]:
connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,


    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
}




# Step 8: Write to HUDI 

In [69]:
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




# Step 9 : Read from HUDI Table

In [70]:
ReadGlueDF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="ReadGlueDF",
    )
)
glue_to_spark_df = ReadGlueDF.toDF()




In [71]:
glue_to_spark_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------------+----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|   employee_name|department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------------+----------+-----+------+---+-----+----------+
|  20221221162242455|20221221162242455...|                 2|                      |451fc7ed-0b7e-427...|     2|  Lauren Oconnor| Marketing|   IL| 32029| 58|62591|1483855109|
|  20221221162242455|20221221162242455...|                 1|                      |451fc7ed-0b7e-427...|     1|    Antonio Hale|        IT|   RJ| 18697| 58| 7399| 318481455|
|  20221221162242455|20221221162242455...|                 0|                      |451fc7ed-0b7e-427...|     0|Michael Schwa

# Step 10 Append into HUDI

In [72]:
impleDataUpd = [
    (3, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
    (4, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
spark_df = spark.createDataFrame(data=impleDataUpd, schema=columns)

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




In [73]:
ReadGlueDF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="ReadGlueDF",
    )
)
glue_to_spark_df = ReadGlueDF.toDF()
glue_to_spark_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+----------------+-----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|   employee_name| department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+----------------+-----------+-----+------+---+-----+----------+
|  20221221162242455|20221221162242455...|                 2|                      |451fc7ed-0b7e-427...|     2|  Lauren Oconnor|  Marketing|   IL| 32029| 58|62591|1483855109|
|  20221221162242455|20221221162242455...|                 1|                      |451fc7ed-0b7e-427...|     1|    Antonio Hale|         IT|   RJ| 18697| 58| 7399| 318481455|
|  20221221162254254|20221221162254254...|                 3|                      |451fc7ed-0b7e-427...|     3|  This i

# Step 11: Update in HUDI table

In [74]:
impleDataUpd = [
    (3, "this is update on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
]
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
spark_df = spark.createDataFrame(data=impleDataUpd, schema=columns)

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)

ReadGlueDF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="ReadGlueDF",
    )
)
glue_to_spark_df = ReadGlueDF.toDF()
glue_to_spark_df.show()


+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+-----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|       employee_name| department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+-----------+-----+------+---+-----+----------+
|  20221221162242455|20221221162242455...|                 2|                      |451fc7ed-0b7e-427...|     2|      Lauren Oconnor|  Marketing|   IL| 32029| 58|62591|1483855109|
|  20221221162242455|20221221162242455...|                 1|                      |451fc7ed-0b7e-427...|     1|        Antonio Hale|         IT|   RJ| 18697| 58| 7399| 318481455|
|  20221221162254254|20221221162254254...|                 4|                      |451fc7ed-0b7e-42

# Step 12: Query with Spark SQL

In [75]:
spark.sql("select * from  hudidb.hudi_table where emp_id = 3 ").show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+----------+-----+------+---+-----+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|       employee_name|department|state|salary|age|bonus|       ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+----------+-----+------+---+-----+---------+
|  20221221162303624|20221221162303624...|                 3|                      |451fc7ed-0b7e-427...|     3|this is update on...|     Sales|   RJ| 81000| 30|23000|827307999|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+----------+-----+------+---+-----+---------+


# Step 13: Hard Delete

In [76]:
spark_df = spark.sql("SELECT * FROM hudidb.hudi_table where emp_id='4' ")

print("********************")
print(spark_df.show())
print("********************")

db_name = "hudidb"
table_name="hudi_table"

recordkey = 'emp_id'
precombine = 'ts'

path = "s3://soumil-dms-learn/hudi/hudi_table/"
method = 'delete'
table_type = "COPY_ON_WRITE"

connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,


    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
}



WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)

ReadGlueDF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="ReadGlueDF",
    )
)
glue_to_spark_df = ReadGlueDF.toDF()
glue_to_spark_df.show()

********************
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+-----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id| employee_name| department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+-----------+-----+------+---+-----+----------+
|  20221221162254254|20221221162254254...|                 4|                      |451fc7ed-0b7e-427...|     4|This is APPEND|Engineering|   RJ| 79000| 53|15000|1627694678|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+-----------+-----+------+---+-----+----------+

None
********************
+-------------------+--------------------+------------------+---------------------

# Step 14: Clustering and compaction

 ![image](https://user-images.githubusercontent.com/39345855/208302373-6624c2fc-b2ed-4fee-bb60-984d7cb0ec20.png)


In [77]:
db_name = "hudidb"
table_name="hudi_table"
recordkey = 'emp_id'
precombine = 'ts'
path = "s3://soumil-dms-learn/hudi/hudi_table/"
method = 'upsert'
table_type = "COPY_ON_WRITE"

connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,


    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    
    "hoodie.clustering.plan.strategy.sort.columns":"state",
    "hoodie.clustering.plan.strategy.max.bytes.per.group" : '107374182400',
    'hoodie.clustering.plan.strategy.max.num.groups' : '1',
    'hoodie.cleaner.policy' : 'KEEP_LATEST_FILE_VERSIONS',
    
}





In [78]:
cluster_df = spark.sql("SELECT * FROM hudidb.hudi_table ")

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(cluster_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




 ![image](https://user-images.githubusercontent.com/39345855/208302689-cd37ea24-6b65-411d-bc7f-146a3f979c7a.png)


# Step 15: Time Travel Query

In [79]:
df = spark.read. \
  format("hudi"). \
  option("as.of.instant", "2022-18-12"). \
  load(path)




In [80]:
df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|       employee_name|department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+----------+-----+------+---+-----+----------+
|  20221221162325769|20221221162325769...|                 0|                      |451fc7ed-0b7e-427...|     0|    Michael Schwartz|        HR|   NY|101517| 59|20692| 510575555|
|  20221221162325769|20221221162325769...|                 2|                      |451fc7ed-0b7e-427...|     2|      Lauren Oconnor| Marketing|   IL| 32029| 58|62591|1483855109|
|  20221221162325769|20221221162325769...|                 1|                      |451fc7ed-0b7e-427...|

# Incremental query

In [84]:
spark. \
      read. \
      format("hudi"). \
      load(path). \
      createOrReplaceTempView("hudi_snapshot")




In [85]:
commits = list(map(lambda row: row[0], spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime").limit(50).collect()))
beginTime = commits[len(commits) - 2] # commit time we are interested in




In [86]:
beginTime

'20221221162325769'


In [87]:
commits

['20221221162325769']


In [88]:
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}




In [89]:
IncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(path)

IncrementalDF.createOrReplaceTempView("hudi_incremental")




In [90]:
spark.sql("select * from  hudi_incremental").show()


+-------------------+--------------------+------------------+----------------------+-----------------+------+-------------+----------+-----+------+---+-----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|emp_id|employee_name|department|state|salary|age|bonus| ts|
+-------------------+--------------------+------------------+----------------------+-----------------+------+-------------+----------+-----+------+---+-----+---+
+-------------------+--------------------+------------------+----------------------+-----------------+------+-------------+----------+-----+------+---+-----+---+


# Appending data for incremental data processing 

In [91]:
impleDataUpd = [
    (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
    (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
]

columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
spark_df = spark.createDataFrame(data=impleDataUpd, schema=columns)

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




In [92]:
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': beginTime,
}




In [93]:
IncrementalDF = spark.read.format("hudi"). \
  options(**incremental_read_options). \
  load(path)

IncrementalDF.createOrReplaceTempView("hudi_incremental")




In [94]:
spark.sql("select * from  hudi_incremental").show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+-----------+-----+------+---+-----+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id| employee_name| department|state|salary|age|bonus|        ts|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+-----------+-----+------+---+-----+----------+
|  20221221162345162|20221221162345162...|                 6|                      |451fc7ed-0b7e-427...|     6|This is APPEND|      Sales|   RJ| 81000| 30|23000| 827307999|
|  20221221162345162|20221221162345162...|                 7|                      |451fc7ed-0b7e-427...|     7|This is APPEND|Engineering|   RJ| 79000| 53|15000|1627694678|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------+---

# Schema Evolution

###  Adding New Column

In [95]:
spark. \
      read. \
      format("hudi"). \
      load(path). \
      createOrReplaceTempView("hudi_snapshot")




In [96]:
spark.sql("desc hudi_snapshot").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|              emp_id|   bigint|   null|
|       employee_name|   string|   null|
|          department|   string|   null|
|               state|   string|   null|
|              salary|   bigint|   null|
|                 age|   bigint|   null|
|               bonus|   bigint|   null|
|                  ts|   bigint|   null|
+--------------------+---------+-------+


#### Defining new Schema 

In [98]:
new_schema = StructType(
    [
        StructField("emp_id",LongType(),True),
        StructField("employee_name",StringType(),True),
        StructField("department",StringType(),True),
        StructField("state",StringType(),True),
        StructField("salary",LongType(),True),
        StructField("age",LongType(),True),
        StructField("bonus",LongType(),True),
        StructField("ts",LongType(),True),
        StructField("newField",IntegerType(),True),

      ])




In [99]:
impleDataUpd = [
    (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999, 11),
]

spark_df = spark.createDataFrame(data=impleDataUpd, schema=new_schema)

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




#### Recreating Snapshot

In [100]:
spark. \
      read. \
      format("hudi"). \
      load(path). \
      createOrReplaceTempView("hudi_snapshot")




In [101]:
spark.sql("desc hudi_snapshot").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|              emp_id|   bigint|   null|
|       employee_name|   string|   null|
|          department|   string|   null|
|               state|   string|   null|
|              salary|   bigint|   null|
|                 age|   bigint|   null|
|               bonus|   bigint|   null|
|                  ts|   bigint|   null|
|            newField|      int|   null|
+--------------------+---------+-------+


In [102]:
spark.sql("select * from  hudi_snapshot").show()

+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+-----------+-----+------+---+-----+----------+--------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|emp_id|       employee_name| department|state|salary|age|bonus|        ts|newField|
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------------------+-----------+-----+------+---+-----+----------+--------+
|  20221221162325769|20221221162325769...|                 0|                      |451fc7ed-0b7e-427...|     0|    Michael Schwartz|         HR|   NY|101517| 59|20692| 510575555|    null|
|  20221221162325769|20221221162325769...|                 2|                      |451fc7ed-0b7e-427...|     2|      Lauren Oconnor|  Marketing|   IL| 32029| 58|62591|1483855109|    null|
|  20221221162325769|20221221162325769...|             

## Changes to Data Types in Hudi table

![image](https://user-images.githubusercontent.com/39345855/208951966-0f845cf4-a949-49de-8b6e-330aa6644ba6.png)


In [104]:
new_schema_new_dtypes = StructType(
    [
        StructField("emp_id",LongType(),True),
        StructField("employee_name",StringType(),True),
        StructField("department",StringType(),True),
        StructField("state",StringType(),True),
        StructField("salary",LongType(),True),
        StructField("age",LongType(),True),
        StructField("bonus",LongType(),True),
        StructField("ts",LongType(),True),
        StructField("newField",LongType(),True),
        
    ])

impleDataUpd = [
    (7, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999,  22),
]

spark_df = spark.createDataFrame(data=impleDataUpd, schema=new_schema_new_dtypes)
spark_df.show(1)

+------+--------------+----------+-----+------+---+-----+---------+--------+
|emp_id| employee_name|department|state|salary|age|bonus|       ts|newField|
+------+--------------+----------+-----+------+---+-----+---------+--------+
|     7|This is APPEND|     Sales|   RJ| 81000| 30|23000|827307999|      22|
+------+--------------+----------+-----+------+---+-----+---------+--------+


In [105]:
WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)





In [106]:
spark. \
      read. \
      format("hudi"). \
      load(path). \
      createOrReplaceTempView("hudi_snapshot")




In [107]:
spark.sql("desc hudi_snapshot").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|              emp_id|   bigint|   null|
|       employee_name|   string|   null|
|          department|   string|   null|
|               state|   string|   null|
|              salary|   bigint|   null|
|                 age|   bigint|   null|
|               bonus|   bigint|   null|
|                  ts|   bigint|   null|
|            newField|   bigint|   null|
+--------------------+---------+-------+
