
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

# Step 1: Define your configurations

In [11]:
%stop_session

Stopping session: 77731768-e4db-416c-b63f-4dbf12e4b1df
Stopped session.


In [8]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 2
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
%additional_python_modules Faker

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Connections to be included:
hudi-connection
Setting Glue version to: 3.0
Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 2
Previous Spark configuration: None
Setting new Spark configuration to: spark.serializer=org.ap

# Step 2: Define your Imports

In [1]:
try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime
    import boto3
    from functools import reduce
    from datetime import datetime, timezone
    from random import randint
    import datetime
    from faker import Faker
    import random

except Exception as e:
    pass

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: e92cb3e1-2d31-44f9-9b7b-e5e1868311a6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--additional-python-modules Faker
Waiting for session e92cb3e1-2d31-44f9-9b7b-e5e1868311a6 to get into ready status...
Session e92cb3e1-2d31-44f9-9b7b-e5e1868311a6 has been created.



# Step 3: Create Spark Session

In [27]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




# Step 4: Read Data from S3 and Performing Upsert on Datalake 
### To practise Concepts we shall generate fake data and work off that 

In [48]:
def generate_data():
    states = ("AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN",
              "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
              "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA",
              "WA", "WV", "WI", "WY")
    shipping_types = ("Free", "3-Day", "2-Day")

    product_categories = ("Garden", "Kitchen", "Office", "Household")
    referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad")

    data_array = []

    for i in range(0, 100):
        item_id = random.randint(1, 100)
        state = states[random.randint(0, len(states) - 1)]
        shipping_type = shipping_types[random.randint(0, len(shipping_types) - 1)]
        product_category = product_categories[random.randint(0, len(product_categories) - 1)]
        quantity = random.randint(1, 4)
        referral = referrals[random.randint(0, len(referrals) - 1)]
        price = random.randint(1, 100)
        order_date = datetime.date(2016, random.randint(1, 12), random.randint(1, 28)).isoformat()
        invoiceid = random.randint(1, 20000)
        data_order = (invoiceid, item_id, product_category, price, quantity, order_date, state, shipping_type, referral)

        data_array.append(
            data_order
        )
    columns = ["invoiceid", "itemid", "category", "price", "quantity", "orderdate", "destinationstate",
                   "shippingtype", "referral"]

    return data_array, columns




In [29]:
data_array, columns = generate_data()




In [30]:
spark_df = spark.createDataFrame(data=data_array, schema=columns)




In [31]:
spark_df.show(3)

+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|invoiceid|itemid| category|price|quantity| orderdate|destinationstate|shippingtype|        referral|
+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|     6142|    36|Household|   12|       1|2016-12-02|              HI|       3-Day|           Other|
|    18321|    53|   Office|    7|       2|2016-12-07|              AL|       3-Day|Friend/Colleague|
|     4684|     1|   Office|   58|       3|2016-07-24|              DE|        Free|Friend/Colleague|
+---------+------+---------+-----+--------+----------+----------------+------------+----------------+


In [32]:
spark_df.count()

3


In [33]:
spark_df.printSchema()

root
 |-- invoiceid: long (nullable = true)
 |-- itemid: long (nullable = true)
 |-- category: string (nullable = true)
 |-- price: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- orderdate: string (nullable = true)
 |-- destinationstate: string (nullable = true)
 |-- shippingtype: string (nullable = true)
 |-- referral: string (nullable = true)


# Step 5: Define Hudi Settings 

In [39]:
db_name = "hudidb"
table_name="order"

recordkey = 'invoiceid'
precombine = 'itemid'

path = f"s3://soumil-dms-learn/hudi/{db_name}/{table_name}/"
method = 'upsert'                                 
table_type = "COPY_ON_WRITE"

curr_session = boto3.session.Session()
curr_region = curr_session.region_name





In [40]:
connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,


    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    
    
    
    'hoodie.write.concurrency.mode' : 'optimistic_concurrency_control'
    ,'hoodie.cleaner.policy.failed.writes' : 'LAZY'
    ,'hoodie.write.lock.provider' : 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider'
    ,'hoodie.write.lock.dynamodb.table' : 'hudi-lock-table'
    ,'hoodie.write.lock.dynamodb.partition_key' : 'tablename'
    ,'hoodie.write.lock.dynamodb.region' : '{0}'.format(curr_region)
    ,'hoodie.write.lock.dynamodb.endpoint_url' : 'dynamodb.{0}.amazonaws.com'.format(curr_region)
    ,'hoodie.write.lock.dynamodb.billing_mode' : 'PAY_PER_REQUEST'
    ,'hoodie.bulkinsert.shuffle.parallelism': 2000
    
    
    
}




# Step 5: Write data into HUDI Table

In [41]:
spark_df.show()

+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|invoiceid|itemid| category|price|quantity| orderdate|destinationstate|shippingtype|        referral|
+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|     6142|    36|Household|   12|       1|2016-12-02|              HI|       3-Day|           Other|
|    18321|    53|   Office|    7|       2|2016-12-07|              AL|       3-Day|Friend/Colleague|
|     4684|     1|   Office|   58|       3|2016-07-24|              DE|        Free|Friend/Colleague|
+---------+------+---------+-----+--------+----------+----------------+------------+----------------+


In [42]:
import datetime 
start = datetime.datetime.now()
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)
end = datetime.datetime.now()
print("Execution Time: {} ".format(end-start))

Execution Time: 0:00:09.373432


In [43]:
print("ok")

ok


# Append Operations

In [49]:
data_array, columns = generate_data()
spark_df = spark.createDataFrame(data=data_array, schema=columns)




In [50]:
spark_df.show()

+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|invoiceid|itemid| category|price|quantity| orderdate|destinationstate|shippingtype|        referral|
+---------+------+---------+-----+--------+----------+----------------+------------+----------------+
|    15386|    54|   Office|   74|       4|2016-09-24|              AK|        Free|           Other|
|    17111|    42|  Kitchen|   56|       4|2016-04-28|              WI|       3-Day|           Other|
|    13383|    55|   Garden|   84|       1|2016-07-10|              CO|       3-Day|Friend/Colleague|
|     8424|    25|   Garden|   60|       2|2016-11-18|              MN|       3-Day|Friend/Colleague|
|     5213|    58|Household|   56|       3|2016-11-13|              WY|       3-Day| Repeat Customer|
|     5240|    80|   Office|   17|       1|2016-08-20|              KY|        Free|           Other|
|     6075|    20|   Office|   22|       3|2016-03-01|              OK|        Fre

In [51]:
spark_df.count()

100


In [52]:
import datetime 
start = datetime.datetime.now()
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)
end = datetime.datetime.now()
print("Execution Time: {} ".format(end-start))

Execution Time: 0:00:09.009001
