
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [16]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 3
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer

You are already connected to a glueetl session 4fb72b7d-c4af-4ecc-9154-5b1734b79c27.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Connections to be included:
hudi-connection


You are already connected to a glueetl session 4fb72b7d-c4af-4ecc-9154-5b1734b79c27.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session 4fb72b7d-c4af-4ecc-9154-5b1734b79c27.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2


You are already connected to a glueetl session 4fb72b7d-c4af-4ecc-9154-5b1734b79c27.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 4fb72b7d-c4af-4ecc-9154-5b1734b79c27.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 3
Setting new number of workers to: 3
Previous Spark configuration: spark.serializer=org.apache.spark.serializer.KryoSerializer
Setting new Spark configuration to: spark.serializer=org.apache.spark.serializer.KryoSerializer


In [18]:
%stop_session

Stopping session: 4fb72b7d-c4af-4ecc-9154-5b1734b79c27
Stopped session.


In [1]:
try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime
    import boto3
    from functools import reduce
except Exception as e:
    pass

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 3
Session ID: 4b821d65-f6c0-427c-9776-023a70fbc5c8
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
Waiting for session 4b821d65-f6c0-427c-9776-023a70fbc5c8 to get into ready status...
Session 4b821d65-f6c0-427c-9776-023a70fbc5c8 has been created.



# Step 1:  Create Spark Session

In [2]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




In [4]:
ReadDF = glueContext.create_dynamic_frame.from_catalog(
    database="dmsdb",
    table_name="raw_sales",
    transformation_ctx="ReadDF",
)





In [5]:
ChangeSchemaApplyMapping = ApplyMapping.apply(
    frame=ReadDF,
    mappings=[
        ("action", "string", "action", "string"),
        ("invoiceid", "long", "invoiceid", "long"),
        ("itemid", "long", "itemid", "long"),
        ("category", "string", "category", "string"),
        ("price", "long", "price", "long"),
        ("quantity", "long", "quantity", "long"),
        ("orderdate", "string", "orderdate", "string"),
        ("destinationstate", "string", "destinationstate", "string"),
        ("shippingtype", "string", "shippingtype", "string"),
        ("referral", "string", "referral", "string"),
    ],
    transformation_ctx="ChangeSchemaApplyMapping",
)





In [6]:
spark_df = ChangeSchemaApplyMapping.toDF()





In [7]:
spark_df.show()

+------+---------+------+---------+-----+--------+--------------------+----------------+------------+----------------+
|action|invoiceid|itemid| category|price|quantity|           orderdate|destinationstate|shippingtype|        referral|
+------+---------+------+---------+-----+--------+--------------------+----------------+------------+----------------+
|     I|     2469|     9|   Garden|   58|       3|2016-07-17 00:00:...|              IL|        Free|Friend/Colleague|
|     I|    10370|    68|   Office|   31|       4|2016-01-21 00:00:...|              NJ|        Free|           Other|
|     I|    15061|    54|   Garden|   57|       2|2016-10-17 00:00:...|              RI|       2-Day|       Online Ad|
|     I|    15425|    55|   Office|   61|       3|2016-07-18 00:00:...|              HI|       3-Day|       Online Ad|
|     I|    18229|    15|  Kitchen|   20|       2|2016-09-26 00:00:...|              PA|        Free|           Other|
|     I|    18075|    67|   Garden|    4|       

# Write into HUDI 

In [10]:
db_name = "dmsdb"
table_name="sales"

recordkey = 'invoiceid'
precombine = 'action'

path = "s3://soumil-dms-learn/hudi/sales/"




In [11]:
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options={
            "path": path,
            "connectionName": "hudi-connection",

            "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
            'className': 'org.apache.hudi',
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.recordkey.field': recordkey,
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': precombine,


            'hoodie.datasource.hive_sync.enable': 'true',
            "hoodie.datasource.hive_sync.mode":"hms",
            'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
            'hoodie.datasource.hive_sync.database': db_name,
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
            'hoodie.datasource.write.hive_style_partitioning': 'true',
        },
        transformation_ctx="glue_df",
    )
)

Py4JJavaError: An error occurred while calling o134.pyWriteDynamicFrame.
: org.apache.hudi.exception.HoodieException: Config conflict(key	current value	existing value):
PreCombineKey:	action	itemid
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:162)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:89)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQ