# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [4]:
# Adding required libraries and extra jars to the job -   # <------- PLEASE REPLACE ${BUCKET_NAME} BELOW!!!

%extra_py_files s3://glueworkshop-100022632938-us-east-1/library/pycountry_convert.zip
%extra_jars s3://crawler-public/json/serde/json-serde.jar

# Adding required properties to the job - # <------- PLEASE REPLACE ${BUCKET_NAME} BELOW!!!

%%configure 
{
  "--enable-spark-ui": "true",
  "--spark-event-logs-path": "s3://glueworkshop-100022632938-us-east-1/output/lab3/sparklog/",
  "max_retries": "0"         
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Extra py files to be included:
s3://glueworkshop-100022632938-us-east-1/library/pycountry_convert.zip
Extra jars to be included:
s3://crawler-public/json/serde/json-serde.jar
The following configurations have been updated: {'--enable-spark-ui': 'true', '--spark-event-logs-path': 's3://glueworkshop-100022632938-us-east-1/output/lab3/sparklog/', 'max_retries': '0'}
s3://crawler-public/json/serde/json-serde.jar


In [1]:
#Importing all the basic Glue, Spark libraries 

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Important further required libraries

import os, sys, boto3
from pprint import pprint
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime

# Starting Spark/Glue Context

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Important pycountry_convert function from the external python library (pycountry_convert.zip)

from pycountry_convert import (
    convert_country_alpha2_to_country_name,
    convert_country_alpha2_to_continent,
    convert_country_name_to_country_alpha2,
    convert_country_alpha3_to_country_alpha2,
)

# Defining the function code

def get_country_code2(country_name):
    country_code2 = 'US'
    try:
        country_code2 = convert_country_name_to_country_alpha2(country_name)
    except KeyError:
        country_code2 = ''
    return country_code2

udf_get_country_code2 = udf(lambda z: get_country_code2(z), StringType())

Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 8d865432-c018-4215-9975-be13f0f5e55b
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--enable-spark-ui true
--spark-event-logs-path s3://glueworkshop-100022632938-us-east-1/output/lab3/sparklog/
--max_retries 0
--extra-py-files s3://glueworkshop-100022632938-us-east-1/library/pycountry_convert.zip
--extra-jars s3://crawler-public/json/serde/json-serde.jar
Waiting for session 8d865432-c018-4215-9975-be13f0f5e55b to get into ready status...
Session 8d865432-c018-4215-9975-be13f0f5e55b has been created.



In [4]:
#Get parameter values

s3_bucket_name = "s3://glueworkshop-100022632938-us-east-1/"                              # <------- PLEASE REPLACE ONLY THE ${BUCKET_NAME} HERE (Keep the "s3://" and the final "/" part)!!!
region_name = 'us-east-1'                                        #  <--- REPLACE THE AWS REGION
ddb_table_name='glueworkshop-lab3-new'


# Create the dynamodb with appropriate read and write capacity
# Get service resource
dynamodb = boto3.resource('dynamodb', region_name=region_name)

table_status = dynamodb.create_table(
    TableName=ddb_table_name,
    KeySchema=[{'AttributeName': 'uuid','KeyType': 'HASH'}],
    AttributeDefinitions=[{'AttributeName': 'uuid','AttributeType': 'N'}],
    ProvisionedThroughput={'ReadCapacityUnits': 500,'WriteCapacityUnits': 5000}
    )
# Wait until the table exists.
table_status.meta.client.get_waiter('table_exists').wait(TableName=ddb_table_name)
pprint(table_status)

df = spark.read.load(s3_bucket_name + "input/lab2/sample.csv", 
                     format="csv", 
                     sep=",", 
                     inferSchema="true", 
                     header="true")


new_df = df.withColumn('country_code_2', udf_get_country_code2(col("Country")))
new_df_dyf=DynamicFrame.fromDF(new_df, glueContext, "new_df_dyf")

print("Start writing to DBB : {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
glueContext.write_dynamic_frame_from_options(
    frame=new_df_dyf,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": ddb_table_name,
        "dynamodb.throughput.write.percent": "1.0"
    }
)
print("Finished writing to DBB : {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

# Comparing Counts
    
new_df.count()

dynamodb.Table(name='glueworkshop-lab3-new')
Start writing to DBB : 2025-01-30 22:21:27
Finished writing to DBB : 2025-01-30 22:21:53
100000
