
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [6]:
# Setup Spark and Glue configurations

%glue_version 3.0
%spark_conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
%spark_conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
%number_of_workers 2

%%configure
{
  "--datalake-formats": "delta"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Setting Glue version to: 3.0
Previous Spark configuration: None
Setting new Spark configuration to: spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Previous Spark configuration: spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Setting new Spark configuration to: spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
Previous number of workers: 5
Setting new number of workers to: 2
The following configurations have been updated: {'--datalake-formats': 'delta'}


In [1]:
# Setup Python and Spark libraries
import sys
from awsglue.transforms import *
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as fn
from urllib.request import urlopen
from pyspark.sql.functions import udf
import hashlib
import urllib.request
from io import StringIO

from delta.tables import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, TimestampType, FloatType
import json

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
#spark = glueContext.spark_session
spark = SparkSession \
            .builder \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()

job = Job(glueContext)

Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::175908995626:role/glue-role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 2
Session ID: 9ea615dc-f815-473c-a9fd-7b09521be37c
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
--datalake-formats delta
Waiting for session 9ea615dc-f815-473c-a9fd-7b09521be37c to get into ready status...
Session 9ea615dc-f815-473c-a9fd-7b09521be37c has been created.



In [2]:
# Send arguments to the job

sys.argv+=["--S3_BUCKET", "aws-analytics-course"]
sys.argv+=["--BRONZE_LAYER_NAMESPACE", "bronze/dms/sales"]
sys.argv+=["--BRONZE_LAYER_ECOMMERCE_NAMESPACE", "bronze/kinesis"]
sys.argv+=["--SILVER_LAYER_NAMESPACE", "silver"]
sys.argv+=["--JOB_DATE", "2023/01/12/18"]
sys.argv+=["--TABLES", "{\"store_orders\": \"currency\", \"store_customers\": \"country\", \"products\": \"product_category\"}"]
sys.argv+=["--JOIN_COLUMNS_DELTA", "{\"store_orders\": \"order_number\", \"store_customers\": \"email\",  \"products\": \"product_id\"}"]
sys.argv+=["--JOIN_COLUMNS_INCREMENTAL", "{\"store_orders\": \"order_number\", \"store_customers\": \"email\", \"products\": \"product_category\"}"]
sys.argv+=["--CURRENCY_CONVERSION_URL", "https://api.exchangerate-api.com/v4/latest/usd"]
sys.argv+=["--ECOMMERCE_LOGS_BUCKET", "aws-analytics-incoming"]
sys.argv+=["--ECOMMERCE_STREAM_DATE", "2023/01/11/23"]

args = getResolvedOptions(sys.argv,["S3_BUCKET", "BRONZE_LAYER_NAMESPACE", "BRONZE_LAYER_ECOMMERCE_NAMESPACE", "SILVER_LAYER_NAMESPACE", "JOB_DATE", "TABLES", "JOIN_COLUMNS_DELTA", "JOIN_COLUMNS_INCREMENTAL", "CURRENCY_CONVERSION_URL", "ECOMMERCE_LOGS_BUCKET", "ECOMMERCE_STREAM_DATE"])




In [3]:
currency_conversion_response = urlopen(args['CURRENCY_CONVERSION_URL'])
currency_conversion_json = json.loads(currency_conversion_response.read())




In [4]:
def get_currency_conversion(currency_conversion_json, currency):
    return currency_conversion_json['rates'][currency]




In [5]:
def curate_sales_price(currency, sales_price):
  if (currency != 'USD'):
    curated_value = float(sales_price)/float(get_currency_conversion(currency_conversion_json, currency))
    return float(curated_value)
  else:
    return float(sales_price)
curate_sales_price_udf = udf(curate_sales_price, FloatType())




In [6]:
def mask_value(column):
  mask_value = hashlib.sha256(column.encode()).hexdigest()
  return mask_value

mask_udf = udf(mask_value, StringType())




In [7]:
def ip_to_country(ip):
  ipsplit = ip.split(".")
  ip_number=16777216*int(ipsplit[0]) + 65536*int(ipsplit[1]) + 256*int(ipsplit[2]) + int(ipsplit[3])  
  return ip_number

ip_to_country_udf = udf(ip_to_country, StringType())




In [8]:
def get_dataframe(BRONZE_TABLE_PATH):
    try:
        df_read_data_incremental = spark.read                             \
                                            .option("header", "true")         \
                                            .option("inferSchema", "true")    \
                                            .csv(BRONZE_TABLE_PATH)

        df_read_data_incremental=curate_columns(df_read_data_incremental)
        df_read_data_incremental = df_read_data_incremental.drop('Op')
        df_read_data_incremental.printSchema()
        df_read_data_incremental.show(10)
        return df_read_data_incremental
    except:
        return 0
    




In [9]:
def merge_to_delta(SILVER_TABLE_PATH, BRONZE_TABLE_PATH, JOB_DATE, JOIN_COLUMN_DELTA, JOIN_COLUMN_INCREMENTAL):
    DELTA_TABLE_ALIAS="delta_table"
    INCREMENTAL_TABLE_ALIAS="data_incremental"
    JOIN_CONDITION=DELTA_TABLE_ALIAS + "." + JOIN_COLUMN_DELTA + "=" + INCREMENTAL_TABLE_ALIAS + "." + JOIN_COLUMN_INCREMENTAL
    df_read_data_incremental = get_dataframe(BRONZE_TABLE_PATH + "/" + JOB_DATE + "/" + "*.csv")
    if df_read_data_incremental != 0:
        deltaTable = DeltaTable.forPath(spark, SILVER_TABLE_PATH)
        if deltaTable:
            print("Delta table exists")
            deltaTable.alias(DELTA_TABLE_ALIAS).merge(
                    source=df_read_data_incremental.alias(INCREMENTAL_TABLE_ALIAS),
                    condition=fn.expr(JOIN_CONDITION)) \
                    .whenMatchedUpdateAll()            \
                    .whenNotMatchedInsertAll()         \
                    .execute()




In [10]:
def merge_data_to_delta(BRONZE_TABLE_PATH, SILVER_TABLE_PATH, JOB_DATE, TABLE, PARTITION_COLUMN, JOIN_COLUMN_DELTA, JOIN_COLUMN_INCREMENTAL):
    try:   
        deltaTable = DeltaTable.forPath(spark, SILVER_TABLE_PATH)
        if deltaTable:
            print("Delta table exists")
            merge_to_delta(SILVER_TABLE_PATH, BRONZE_TABLE_PATH, JOB_DATE, JOIN_COLUMN_DELTA, JOIN_COLUMN_INCREMENTAL)
    except:
        print("Delta table does not exist")
        df_read_data_full = get_dataframe(BRONZE_TABLE_PATH + "/" + "LOAD00000001.csv")
        df_read_data_full.write.format("delta").save(SILVER_TABLE_PATH)
        merge_to_delta(SILVER_TABLE_PATH, BRONZE_TABLE_PATH, JOB_DATE, JOIN_COLUMN_DELTA, JOIN_COLUMN_INCREMENTAL)




In [11]:
def curate_email(email):
  curated_value = email.lower()
  return curated_value

curate_email_udf = udf(curate_email, StringType())




In [12]:
def curate_country(country):
  if (country == 'USA' or country == 'United States'):
    curated_value = 'USA'
  elif (country == 'UK' or country == 'United Kingdom'):
    curated_value = 'UK'
  elif (country == 'CAN' or country == 'Canada'):
    curated_value = 'CAN'
  elif (country == 'IND' or country == 'India'):
    curated_value = 'IND'
  else:
    curated_value = country
  return curated_value

curate_country_udf = udf(curate_country, StringType())




In [13]:
def curate_columns(df):
    if "order_date" in df.columns:
        df = df.withColumn("order_date", to_date(df.order_date,  'MM/dd/yyyy'))
    if "updated_at" in df.columns:
        df = df.withColumn("updated_at", to_timestamp(df.updated_at,  'yyyy-MM-dd HH:mm:ss')) 
    if "sale_price" in df.columns:
        df = df.withColumn('sale_price_usd',curate_sales_price_udf('currency', 'sale_price'))
    if "email" in df.columns:
        df = df.withColumn('email_curated',curate_email_udf('email')).drop('email').withColumnRenamed('email_curated', 'email')
    if "country" in df.columns:
        df = df.withColumn('country_curated',curate_country_udf('country')).drop('country').withColumnRenamed('country_curated', 'country')
    if "phone" in df.columns:
        df = df.withColumn('phone_masked',mask_udf('phone')).drop('phone').withColumnRenamed('phone_masked', 'phone')
    if "credit_card" in df.columns:
        df = df.withColumn("credit_card",df.credit_card.cast(StringType()))
        df = df.withColumn('credit_card_masked',mask_udf('credit_card')).drop('credit_card').withColumnRenamed('credit_card_masked', 'credit_card')
    if "address" in df.columns:
        df = df.withColumn('address_masked',mask_udf('address')).drop('address').withColumnRenamed('address_masked', 'address')
    return df




In [None]:
def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df


In [14]:
TABLE_DICT = json.loads(args['TABLES'])
JOIN_COLUMNS_DELTA_DICT = json.loads(args['JOIN_COLUMNS_DELTA'])
JOIN_COLUMNS_INCREMENTAL_DICT = json.loads(args['JOIN_COLUMNS_INCREMENTAL'])

for TABLE in TABLE_DICT:
    BRONZE_TABLE_PATH="s3a://" + args['S3_BUCKET'] + "/" + args['BRONZE_LAYER_NAMESPACE'] + "/" + TABLE
    SILVER_TABLE_PATH="s3a://" + args['S3_BUCKET'] + "/" + args['SILVER_LAYER_NAMESPACE'] + "/" + TABLE
    merge_data_to_delta(BRONZE_TABLE_PATH, SILVER_TABLE_PATH, args['JOB_DATE'], TABLE, TABLE_DICT[TABLE],  JOIN_COLUMNS_DELTA_DICT[TABLE], JOIN_COLUMNS_INCREMENTAL_DICT[TABLE])

Delta table does not exist
root
 |-- order_number: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- units: integer (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- order_mode: string (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- sale_price_usd: float (nullable = true)

+------------+-----------+----------+----------+-----+----------+--------+----------+-------------------+--------------+
|order_number|customer_id|product_id|order_date|units|sale_price|currency|order_mode|         updated_at|sale_price_usd|
+------------+-----------+----------+----------+-----+----------+--------+----------+-------------------+--------------+
|           1|        212|         5|2019-02-03|   10|      11.6|     USD|       NEW|2023-01-12 18:20:59|          11.6|
|           2|       1940|        10|2020-06-24|    8|     

In [16]:
IPLOCATION_SCHEMA =[
    ('ip1', IntegerType()),
    ('ip2', IntegerType()),
    ('country_code', StringType()),
    ('country_name', StringType())
]

ipfields = [StructField(*field) for field in IPLOCATION_SCHEMA]
schema_iplocation = StructType(ipfields)

IPLOCATION_PATH="s3a://" + args['ECOMMERCE_LOGS_BUCKET'] + "/" + "iplocation/"
df_iplocation = spark.read.csv(IPLOCATION_PATH, schema=schema_iplocation)
df_iplocation.registerTempTable('iplocation')

LOGS_SCHEMA =[
    ('time', StringType()),
    ('remote_ip', StringType()),
    ('country_name', StringType()),
    ('ip_number', IntegerType()),
    ('request', StringType()),
    ('response', StringType()),
    ('agent', StringType())
]

logfields = [StructField(*field) for field in LOGS_SCHEMA]
schema_logs = StructType(logfields)

ECOMMERCE_LOGS_PATH="s3a://" + args['ECOMMERCE_LOGS_BUCKET'] + "/" + "ecommerce_logs/"
df_ecommerce_logs = spark.read.json(ECOMMERCE_LOGS_PATH, schema=schema_logs)
df_ecommerce_logs = df_ecommerce_logs.withColumn('ip_number',ip_to_country_udf('remote_ip'))
df_ecommerce_logs = df_ecommerce_logs.withColumn("ip_number_int", df_ecommerce_logs['ip_number'].cast('int')).drop('ip_number').withColumnRenamed('ip_number_int', 'ip_number')
df_ecommerce_logs.registerTempTable('ecommerce')

df_ecommerce_country = spark.sql("SELECT e.time, e.remote_ip, i.country_name, e.ip_number, e.request, e.response, e.agent " \
                                 " FROM ecommerce e JOIN iplocation i WHERE ip1 <= ip_number AND ip2 >= ip_number")

ECOMM_LOGS_SILVER_TABLE_PATH="s3a://" + args['S3_BUCKET'] + "/" + args['SILVER_LAYER_NAMESPACE'] + "/" + "ecommerce_country"

try:   
    deltaTable = DeltaTable.forPath(spark, ECOMM_LOGS_SILVER_TABLE_PATH)
    if deltaTable:
        deltaTable.alias("logs").merge(
             df_ecommerce_country.alias("logs_incr"),
             "logs.remote_ip = logs_incr.remote_ip") \
            .whenNotMatchedInsertAll() \
            .execute()
        
except:
    print("Delta table does not exist")
    df_ecommerce_country.write.format("delta").save(ECOMM_LOGS_SILVER_TABLE_PATH)




In [29]:
import pyspark.sql.functions as F
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df




In [32]:
#ECOMMERCE_TABLE_PATH="s3a://" + args['S3_BUCKET'] + "/" + args['BRONZE_LAYER_ECOMMERCE_NAMESPACE'] + "/" + args['ECOMMERCE_STREAM_DATE']
ECOMMERCE_TABLE_PATH="s3a://aws-analytics-course/temp/json/eCommStreamDelivery-1-2023-01-11-23-38-31-2b69733f-1123-48cc-95ff-5307e084fcc5"
df_ecommerce = spark.read.json(ECOMMERCE_TABLE_PATH)
df_ecommerce=df_ecommerce.select('data')
df_ecommerce=flatten_df(df_ecommerce)
df_ecommerce.printSchema()
df_ecommerce.show()

root
 |-- data_address: string (nullable = true)
 |-- data_city: string (nullable = true)
 |-- data_country: string (nullable = true)
 |-- data_currency: string (nullable = true)
 |-- data_customer_name: string (nullable = true)
 |-- data_email: string (nullable = true)
 |-- data_order_date: string (nullable = true)
 |-- data_order_mode: string (nullable = true)
 |-- data_order_number: string (nullable = true)
 |-- data_phone: string (nullable = true)
 |-- data_postalcode: string (nullable = true)
 |-- data_product_name: string (nullable = true)
 |-- data_sale_price: string (nullable = true)

+--------------------+---------+------------+-------------+------------------+--------------------+---------------+---------------+-----------------+--------------+---------------+-----------------+---------------+
|        data_address|data_city|data_country|data_currency|data_customer_name|          data_email|data_order_date|data_order_mode|data_order_number|    data_phone|data_postalcode|data_

In [33]:
df_ecommerce1=spark.read.parquet("s3://aws-analytics-course/bronze/kinesis/2023/01/31/02/eCommStreamDelivery-2-2023-01-31-02-38-51-a8696e2c-d14d-4394-97b6-7aa79bb6d616.parquet")




In [35]:
df_ecommerce1=flatten_df(df_ecommerce1)
df_ecommerce1.show()

+----+--------------+---------------+----------+-----------+------------------+--------------------+-------------+---------------+------------+----------------+--------------------+-----------------+---------------+-------------+---------------+---------------+-----------------+
|  id|     eventtype|        subject| eventtime|dataversion|data_customer_name|        data_address|    data_city|data_postalcode|data_country|      data_phone|          data_email|data_product_name|data_order_date|data_currency|data_order_mode|data_sale_price|data_order_number|
+----+--------------+---------------+----------+-----------+------------------+--------------------+-------------+---------------+------------+----------------+--------------------+-----------------+---------------+-------------+---------------+---------------+-----------------+
|3001|recordInserted|ecomm/customers|2021-01-01|        1.0|        Julie Rich|Ap #255-3031 Dui ...|     Billings|          80834|         USA|  1-528-884-4331|