# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [6]:

%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

%%configure
{
    "--conf": "spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false",
    "--enable-glue-datacatalog" :"true",
    "--datalake-formats":"hudi"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
The following configurations have been updated: {'--conf': 'spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false', '--enable-glue-datacatalog': 'true', '--datalake-formats': 'hudi'}


In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 4ee7d88f-4f04-478d-82e1-d0d1557cf146
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
--datalake-formats hudi
Waiting for session 4ee7d88f-4f04-478d-82e1-d0d1557cf146 to get into ready status...
Session 4ee7d88f-4f04-478d-82e1-d0d1557cf146 has been created.



In [3]:
from pyspark.sql.session import SparkSession




In [4]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




In [7]:
##creating a df

from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

dim_customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

customer_dim_df = spark.createDataFrame([('1', 'John', 'Smith', 
                    'London', 'UK', 
                    datetime.strptime('2020-09-27', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True),
                    ('2', 'Susan', 'Chas', 
                    'Seattle', 'US',
                    datetime.strptime('2020-10-14', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True)], dim_customer_schema)

customer_hudi_df = customer_dim_df.withColumn("customer_dim_key", random_udf())




In [6]:

from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *

from pyspark.sql.types import *
from datetime import datetime
import boto3
from functools import reduce




In [8]:
customer_hudi_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|2020-12-08 09:15:32|      true|1727768187773414|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2999-12-31|2020-12-08 09:15:32|      true|1727768187889384|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+


In [17]:
database_name1 = "hudidb9"
table_name = "customer_table"
base_s3_path = "s3a://test-ramneek-3"
final_base_path = "{base_s3_path}/{table_name}".format(
    base_s3_path=base_s3_path, table_name=table_name
)




In [24]:
hudi_options = {
    'hoodie.table.name': table_name,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    'hoodie.datasource.write.recordkey.field': 'customer_dim_key',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'timestamp',

    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name1,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',

}




In [25]:
customer_hudi_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(final_base_path)




In [26]:
spark.sql("use hudidb9;").show()

++
||
++
++


In [27]:
spark.sql("show tables;").show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|  hudidb9|customer_table|      false|
+---------+--------------+-----------+


In [28]:
spark.sql("select * from customer_table;").show()

+-------------------+--------------------+------------------+----------------------+--------------------+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+----------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|       timestamp|is_current|customer_dim_key|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+----------------+
|  20241001075114800|20241001075114800...|  1727769086113419|                      |9eb8d2a1-f5df-40a...|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|1607418932000000|      true|1727769086113419|
|  20241001075114800|20241001075114800...|  1727769086114582|                      |9eb8

In [29]:
##fact table:

fact_sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_fact_df = spark.createDataFrame([('100', 25, 123.46,
                    datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('101', 300, 123.46,
                    datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('102', 5, 1038.0,
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], 
                    fact_sales_schema)




In [30]:
sales_fact_df.show()

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|
|    101|     300|123.46|2020-10-28 09:15:32|          1|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|
+-------+--------+------+-------------------+-----------+


In [32]:
database_name1 = "hudidb9"
table_name1 = "sales_table"
base_s3_path = "s3a://test-ramneek-3"
final_base_path = "{base_s3_path}/{table_name}".format(
    base_s3_path=base_s3_path, table_name=table_name
)




In [35]:
hudi_options = {
    'hoodie.table.name': table_name1,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    'hoodie.datasource.write.recordkey.field': 'customer_id',
    'hoodie.datasource.write.table.name': table_name1,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'timestamp',

    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name1,
    'hoodie.datasource.hive_sync.table': table_name1,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',

}




In [36]:
sales_fact_df.write.format("hudi").options(**hudi_options).mode("overwrite").save(final_base_path)




In [37]:
spark.sql("select * from sales_table;").show()

+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------+------+----------------+-----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|item_id|quantity| price|       timestamp|customer_id|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------+------+----------------+-----------+
|  20241001080038210|20241001080038210...|                 1|                      |7710c2ba-9b65-432...|    100|      25|123.46|1605604532000000|          1|
|  20241001080038210|20241001080038210...|                 2|                      |7710c2ba-9b65-432...|    102|       5|1038.0|1607418932000000|          2|
+-------------------+--------------------+------------------+----------------------+--------------------+-------+--------+------+----------------+-----------+


In [39]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_hudi_df.customer_id,
             sales_fact_df.timestamp >= customer_hudi_df.eff_start_date,
             sales_fact_df.timestamp < customer_hudi_df.eff_end_date]

customers_dim_key_df = (sales_fact_df
                          .join(customer_hudi_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_hudi_df.customer_dim_key.isNull(), '-1')
                                  .otherwise(customer_hudi_df.customer_dim_key)
                                  .alias("customer_dim_key") )
                       )




In [40]:
customers_dim_key_df.show()

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|1727769805558024|
|    101|     300|123.46|2020-10-28 09:15:32|          1|1727769805558024|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|1727769805622081|
+-------+--------+------+-------------------+-----------+----------------+


In [41]:
new_customer_dim_df = spark.createDataFrame([('3', 'Bastian', 'Back', 'Berlin', 'GE',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Susan', 'Chas','Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                dim_customer_schema)
new_customer_dim_df = new_customer_dim_df.withColumn("customer_dim_key", random_udf())




In [42]:
new_customer_dim_df.show()

+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|  city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back|Berlin|     GE|    2024-10-01|  2999-12-31|2020-12-09 09:15:32|      true|1727769970838559|
|          2|     Susan|     Chas| Paris|     FR|    2024-10-01|  2999-12-31|2020-12-09 10:15:32|      true|1727769970882771|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+


In [43]:
"""What is Slowly Changing Dimension ?
A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.
In this Case Customer Susan moved to Paris from Seattle we can handel this folloowing ways

Type 1 SCDs - Overwriting
In a Type 1 SCD the new data overwrites the existing data. Thus the existing data is lost as it is not stored anywhere else. This is the default type of dimension you create. You do not need to specify any additional information to create a Type 1 SCD.

Type 2 SCDs - Creating another dimension record
A Type 2 SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record. Each record contains the effective time and expiration time to identify the time period between which the record was active.

Type 3 SCDs - Creating a current value field
A Type 3 SCD stores two versions of values for certain selected level attributes. Each record stores the previous value and the current value of the selected attribute. When the value of any of the selected attributes changes, the current value is stored as the old value and the new value becomes the current value.


What is Slowly Changing Dimension ?
A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.
In this Case Customer Susan moved to Paris from Seattle we can handel this folloowing ways

Type 1 SCDs - Overwriting
In a Type 1 SCD the new data overwrites the existing data. Thus the existing data is lost as it is not stored anywhere else. This is the default type of dimension you create. You do not need to specify any additional information to create a Type 1 SCD.

Type 2 SCDs - Creating another dimension record
A Type 2 SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record. Each record contains the effective time and expiration time to identify the time period between which the record was active.

Type 3 SCDs - Creating a current value field
A Type 3 SCD stores two versions of values for certain selected level attributes. Each record stores the previous value and the current value of the selected attribute. When the value of any of the selected attributes changes, the current value is stored as the old value and the new value becomes the current value.

"""

'What is Slowly Changing Dimension ?\nA Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.\nIn this Case Customer Susan moved to Paris from Seattle we can handel this folloowing ways\n\nType 1 SCDs - Overwriting\nIn a Type 1 SCD the new data overwrites the existing data. Thus the existing data is lost as it is not stored anywhere else. This is the default type of dimension you create. You do not need to specify any additional information to create a Type 1 SCD.\n\nType 2 SCDs - Creating another dimension record\nA Type 2 SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record. Each record contains the effective time and expiration 

In [44]:
join_cond = [customer_hudi_df.customer_id == new_customer_dim_df.customer_id,
             customer_hudi_df.is_current == True]




In [45]:
## Find customer records to update
customers_to_update_df = (customer_hudi_df
                          .join(new_customer_dim_df, join_cond)
                          .select(customer_hudi_df.customer_id,
                                  customer_hudi_df.first_name,
                                  customer_hudi_df.last_name,
                                  customer_hudi_df.city,
                                  customer_hudi_df.country,
                                  customer_hudi_df.eff_start_date,
                                  new_customer_dim_df.eff_start_date.alias('eff_end_date'),
                                  customer_hudi_df.customer_dim_key,
                                  customer_hudi_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )




In [46]:
customers_to_update_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|customer_dim_key|          timestamp|is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2024-10-01|1727770091620484|2020-12-08 09:15:32|     false|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+


In [47]:
## Union with new customer records
merged_customers_df = new_customer_dim_df. unionByName(customers_to_update_df)




In [48]:

merged_customers_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back| Berlin|     GE|    2024-10-01|  2999-12-31|2020-12-09 09:15:32|      true|1727770126740537|
|          2|     Susan|     Chas|  Paris|     FR|    2024-10-01|  2999-12-31|2020-12-09 10:15:32|      true|1727770126804428|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2024-10-01|2020-12-08 09:15:32|     false|1727770125588014|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+


In [50]:
database_name1 = "hudidb9"
table_name = "customer_table"
base_s3_path = "s3a://test-ramneek-3"
final_base_path = "{base_s3_path}/{table_name}".format(
    base_s3_path=base_s3_path, table_name=table_name
)




In [51]:
hudi_options = {
    'hoodie.table.name': table_name,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    'hoodie.datasource.write.recordkey.field': 'customer_dim_key',
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'timestamp',

    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': database_name1,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',

}




In [52]:
merged_customers_df.write.format("hudi").options(**hudi_options).mode("append").save(final_base_path)




In [53]:
spark.sql("select * from customer_table;").show() #you can see, Susan has 2 records, one with city Seattle, other with Paris and the former record is set as false.

+-------------------+--------------------+------------------+----------------------+--------------------+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+----------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|       timestamp|is_current|customer_dim_key|
+-------------------+--------------------+------------------+----------------------+--------------------+-----------+----------+---------+-------+-------+--------------+------------+----------------+----------+----------------+
|  20241001075114800|20241001075114800...|  1727769086113419|                      |9eb8d2a1-f5df-40a...|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|1607418932000000|      true|1727769086113419|
|  20241001075114800|20241001075114800...|  1727769086114582|                      |9eb8