
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [54]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 3
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer

Connections to be included:
hudi-connection
Setting Glue version to: 3.0
Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 3
Setting new number of workers to: 3
Previous Spark configuration: spark.serializer=org.apache.spark.serializer.KryoSerializer
Setting new Spark configuration to: spark.serializer=org.apache.spark.serializer.KryoSerializer


In [47]:
%stop_session

Stopping session: 361c95d5-d328-4718-bdcd-916567bcc661
Stopped session.


In [1]:
import sys
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from pyspark.sql.types import *
from datetime import datetime
import boto3
from functools import reduce


Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 3
Session ID: e75e9145-bc11-4c08-b442-81ff2d602514
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
Waiting for session e75e9145-bc11-4c08-b442-81ff2d602514 to get into ready status...
Session e75e9145-bc11-4c08-b442-81ff2d602514 has been created.



# Step 1:  Create Spark Session

In [4]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




#  Dimension -> Customers

In [5]:
from pyspark.sql.functions import udf
import time

random_udf = udf(lambda: str(int(time.time() * 1000000)), StringType()) 

dim_customer_schema = StructType([
        StructField('customer_id', StringType(), False),
        StructField('first_name', StringType(), True),
        StructField('last_name', StringType(), True),
        StructField('city', StringType(), True),
        StructField('country', StringType(), True),
        StructField('eff_start_date', DateType(), True),
        StructField('eff_end_date', DateType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('is_current', BooleanType(), True),
    ])

customer_dim_df = spark.createDataFrame([('1', 'John', 'Smith', 
                    'London', 'UK', 
                    datetime.strptime('2020-09-27', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True),
                    ('2', 'Susan', 'Chas', 
                    'Seattle', 'US',
                    datetime.strptime('2020-10-14', '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'),
                    True)], dim_customer_schema)

customer_hudi_df = customer_dim_df.withColumn("customer_dim_key", random_udf())





In [6]:
customer_hudi_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          1|      John|    Smith| London|     UK|    2020-09-27|  2999-12-31|2020-12-08 09:15:32|      true|1671069844246049|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2999-12-31|2020-12-08 09:15:32|      true|1671069844601802|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+


In [17]:
db_name = "hudidb"
table_name="customers_table"

recordkey = 'customer_dim_key'
precombine = 'timestamp'

path = 's3://glue-learn-begineers/hudi/customers_table/'




In [8]:
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(customer_hudi_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options={
            "path": path,
            "connectionName": "hudi-connection",

            "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
            'className': 'org.apache.hudi',
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.recordkey.field': recordkey,
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': precombine,


            'hoodie.datasource.hive_sync.enable': 'true',
            "hoodie.datasource.hive_sync.mode":"hms",
            'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
            'hoodie.datasource.hive_sync.database': db_name,
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
            'hoodie.datasource.write.hive_style_partitioning': 'true',
        },
        transformation_ctx="glue_df",
    )
)




# Fact Table -> Sales

In [9]:
fact_sales_schema = StructType([
        StructField('item_id', StringType(), True),
        StructField('quantity', IntegerType(), True),
        StructField('price', DoubleType(), True),
        StructField('timestamp', TimestampType(), True),
        StructField('customer_id', StringType(), True)
    ])

sales_fact_df = spark.createDataFrame([('100', 25, 123.46,
                    datetime.strptime('2020-11-17 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                       ('101', 300, 123.46,
                    datetime.strptime('2020-10-28 09:15:32', '%Y-%m-%d %H:%M:%S'), '1'),
                                      ('102', 5, 1038.0,
                    datetime.strptime('2020-12-08 09:15:32', '%Y-%m-%d %H:%M:%S'), '2')], 
                    fact_sales_schema)





In [10]:
sales_fact_df.show()

+-------+--------+------+-------------------+-----------+
|item_id|quantity| price|          timestamp|customer_id|
+-------+--------+------+-------------------+-----------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|
|    101|     300|123.46|2020-10-28 09:15:32|          1|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|
+-------+--------+------+-------------------+-----------+


In [11]:
db_name = "hudidb"
table_name="sales_table"

recordkey = 'item_id'

precombine = 'timestamp'

path = 's3://glue-learn-begineers/hudi/sales_table/'





In [12]:
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(sales_fact_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options={
            "path": path,
            "connectionName": "hudi-connection",

            "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
            'className': 'org.apache.hudi',
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.recordkey.field': recordkey,
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': precombine,


            'hoodie.datasource.hive_sync.enable': 'true',
            "hoodie.datasource.hive_sync.mode":"hms",
            'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
            'hoodie.datasource.hive_sync.database': db_name,
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
            'hoodie.datasource.write.hive_style_partitioning': 'true',
        },
        transformation_ctx="glue_df",
    )
)




# Joining  Dimension and Facts

In [13]:
from pyspark.sql.functions import when

join_cond = [sales_fact_df.customer_id == customer_hudi_df.customer_id,
             sales_fact_df.timestamp >= customer_hudi_df.eff_start_date,
             sales_fact_df.timestamp < customer_hudi_df.eff_end_date]

customers_dim_key_df = (sales_fact_df
                          .join(customer_hudi_df, join_cond, 'leftouter')
                          .select(sales_fact_df['*'],
                            when(customer_hudi_df.customer_dim_key.isNull(), '-1')
                                  .otherwise(customer_hudi_df.customer_dim_key)
                                  .alias("customer_dim_key") )
                       )




In [14]:
customers_dim_key_df.show()

+-------+--------+------+-------------------+-----------+----------------+
|item_id|quantity| price|          timestamp|customer_id|customer_dim_key|
+-------+--------+------+-------------------+-----------+----------------+
|    100|      25|123.46|2020-11-17 09:15:32|          1|1671070039945039|
|    101|     300|123.46|2020-10-28 09:15:32|          1|1671070039945039|
|    102|       5|1038.0|2020-12-08 09:15:32|          2|1671070039977207|
+-------+--------+------+-------------------+-----------+----------------+


# Changes Needs to be done on Warehouse

In [18]:
new_customer_dim_df = spark.createDataFrame([('3', 'Bastian', 'Back', 'Berlin', 'GE',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 09:15:32', '%Y-%m-%d %H:%M:%S'), True),
                    ('2', 'Susan', 'Chas','Paris', 'FR',
                    datetime.strptime(datetime.today().strftime('%Y-%m-%d'), '%Y-%m-%d'),
                    datetime.strptime('2999-12-31', '%Y-%m-%d'),
                    datetime.strptime('2020-12-09 10:15:32', '%Y-%m-%d %H:%M:%S'), True)],
                dim_customer_schema)
new_customer_dim_df = new_customer_dim_df.withColumn("customer_dim_key", random_udf())





In [19]:
new_customer_dim_df.show()

+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|  city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back|Berlin|     GE|    2022-12-15|  2999-12-31|2020-12-09 09:15:32|      true|1671070118785997|
|          2|     Susan|     Chas| Paris|     FR|    2022-12-15|  2999-12-31|2020-12-09 10:15:32|      true|1671070118910573|
+-----------+----------+---------+------+-------+--------------+------------+-------------------+----------+----------------+


#### What is Slowly Changing Dimension ?
* A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time in a data warehouse. It is considered and implemented as one of the most critical ETL tasks in tracking the history of dimension records.

In this Case Customer Susan moved to  Paris from Seattle
we can handel this folloowing ways 

### Type 1 SCDs - Overwriting

In a Type 1 SCD the new data overwrites the existing data. Thus the existing data is lost as it is not stored anywhere else. This is the default type of dimension you create. You do not need to specify any additional information to create a Type 1 SCD.

### Type 2 SCDs - Creating another dimension record

A Type 2 SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record. Each record contains the effective time and expiration time to identify the time period between which the record was active.

### Type 3 SCDs - Creating a current value field

A Type 3 SCD stores two versions of values for certain selected level attributes. Each record stores the previous value and the current value of the selected attribute. When the value of any of the selected attributes changes, the current value is stored as the old value and the new value becomes the current value.


In [20]:
join_cond = [customer_hudi_df.customer_id == new_customer_dim_df.customer_id,
             customer_hudi_df.is_current == True]




In [21]:
## Find customer records to update
customers_to_update_df = (customer_hudi_df
                          .join(new_customer_dim_df, join_cond)
                          .select(customer_hudi_df.customer_id,
                                  customer_hudi_df.first_name,
                                  customer_hudi_df.last_name,
                                  customer_hudi_df.city,
                                  customer_hudi_df.country,
                                  customer_hudi_df.eff_start_date,
                                  new_customer_dim_df.eff_start_date.alias('eff_end_date'),
                                  customer_hudi_df.customer_dim_key,
                                  customer_hudi_df.timestamp)
                          .withColumn('is_current', lit(False))
                         )




In [22]:
customers_to_update_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|customer_dim_key|          timestamp|is_current|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2022-12-15|1671070148208987|2020-12-08 09:15:32|     false|
+-----------+----------+---------+-------+-------+--------------+------------+----------------+-------------------+----------+


In [23]:
## Union with new customer records
merged_customers_df = new_customer_dim_df. unionByName(customers_to_update_df)




In [24]:
merged_customers_df.show()

+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|customer_id|first_name|last_name|   city|country|eff_start_date|eff_end_date|          timestamp|is_current|customer_dim_key|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+
|          3|   Bastian|     Back| Berlin|     GE|    2022-12-15|  2999-12-31|2020-12-09 09:15:32|      true|1671070198638529|
|          2|     Susan|     Chas|  Paris|     FR|    2022-12-15|  2999-12-31|2020-12-09 10:15:32|      true|1671070198895047|
|          2|     Susan|     Chas|Seattle|     US|    2020-10-14|  2022-12-15|2020-12-08 09:15:32|     false|1671070198141873|
+-----------+----------+---------+-------+-------+--------------+------------+-------------------+----------+----------------+


In [25]:
db_name = "hudidb"
table_name="customers_table"

recordkey = 'customer_dim_key'
precombine = 'timestamp'

path = 's3://glue-learn-begineers/hudi/customers_table/'




In [26]:
ApacheHudiConnector0101forAWSGlue30_node1671045598524 = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(merged_customers_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options={
            "path": path,
            "connectionName": "hudi-connection",

            "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
            'className': 'org.apache.hudi',
            'hoodie.table.name': table_name,
            'hoodie.datasource.write.recordkey.field': recordkey,
            'hoodie.datasource.write.table.name': table_name,
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': precombine,


            'hoodie.datasource.hive_sync.enable': 'true',
            "hoodie.datasource.hive_sync.mode":"hms",
            'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
            'hoodie.datasource.hive_sync.database': db_name,
            'hoodie.datasource.hive_sync.table': table_name,
            'hoodie.datasource.hive_sync.use_jdbc': 'false',
            'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
            'hoodie.datasource.write.hive_style_partitioning': 'true',
        },
        transformation_ctx="glue_df",
    )
)




In [27]:
print("ok")

ok


# Referneces 
* https://aws.amazon.com/blogs/big-data/build-slowly-changing-dimensions-type-2-scd2-with-apache-spark-and-apache-hudi-on-amazon-emr/