
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [40]:
%stop_session

Stopping session: 29d099d7-f56f-445f-a086-3281cdf53933
Stopped session.


# Step 1 define Settings

In [64]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 3
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
%additional_python_modules Faker

You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Connections to be included:
hudi-connection


You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2


You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 3
Setting new number of workers to: 3
Previous Spark configuration: spark.serializer=org.apache.spark.serializer.KryoSerializer
Setting new Spark configuration to: spark.serializer=org.apache.spark.serializer.KryoSerializer


You are already connected to a glueetl session bb7031ff-b083-41eb-8e2e-450bd249b134.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Additional python modules to be included:
Faker


# Step 2: Define Imports

In [54]:

try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from awsglueml.transforms import EntityDetector
    from pyspark.sql.types import StringType
    from pyspark.sql.types import *
    from datetime import datetime

    import boto3
    from functools import reduce
except Exception as e:
    print("Error ")




# Step 3: Create Spark Session

In [55]:
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
                            .config('spark.sql.hive.convertMetastoreParquet','false') \
                            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true') .getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




# Step 4: Generating PII data and Inserting into Hudi Datalakes 

In [63]:
import uuid
from faker import Faker

global faker
faker = Faker()

class DataGenerator(object):

    @staticmethod
    def get_data():
        return [
            (
                uuid.uuid4().__str__(),
                faker.name(),
                faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                str(faker.random_int(min=10000, max=150000)),
                str(faker.random_int(min=18, max=60)),
                str(faker.random_int(min=0, max=100000)),
                str(faker.unix_time()),
                faker.email(),
                faker.credit_card_number(card_type='amex')
            ) for x in range(20)
        ]





In [64]:
data = DataGenerator.get_data()
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
spark_df = spark.createDataFrame(data=data, schema=columns)
spark_df.show(2)


+--------------------+---------------+----------+-----+------+---+-----+----------+------------------+---------------+
|              emp_id|  employee_name|department|state|salary|age|bonus|        ts|             email|    credit_card|
+--------------------+---------------+----------+-----+------+---+-----+----------+------------------+---------------+
|9754f7a8-5e59-485...|Garrett Stewart|     Sales|   CA| 61400| 32|28970|1044552062|gina03@example.org|348909523317140|
|ccce94cc-ad7f-4d2...|Ronald Campbell| Marketing|   TX| 46067| 27|45654| 995679202| qpark@example.com|349250521614603|
+--------------------+---------------+----------+-----+------+---+-----+----------+------------------+---------------+
only showing top 2 rows


# Creating Hudi Tables 

In [65]:
db_name = "hudidb"
table_name="hudi_table"

recordkey = 'emp_id'
path = "s3://soumilshah-hudi-demos/tmp/pii/"

method = 'upsert'
table_type = "COPY_ON_WRITE"

connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    
    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
}




In [66]:
WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




# lets see how we can Mask PII data which is already on Datalake 

In [69]:
from awsglueml.transforms import EntityDetector
MASK = "#########"
PII = [
        "CREDIT_CARD",
        "EMAIL"
    ]

glueDf  = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options={
            "path": path,
            "connectionName": "hudi-connection",
        },
        transformation_ctx="glueDf",
    )
)

entity_detector = EntityDetector()

detected_df = entity_detector.detect(
    glueDf,
    PII,
    "DetectedEntities",
)


def replace_cell(original_cell_value, sorted_reverse_start_end_tuples):
    if sorted_reverse_start_end_tuples:
        for entity in sorted_reverse_start_end_tuples:
            to_mask_value = original_cell_value[entity[0] : entity[1]]
            original_cell_value = original_cell_value.replace(
                to_mask_value, MASK
            )
    return original_cell_value


def row_pii(column_name, original_cell_value, detected_entities):
    if column_name in detected_entities.keys():
        entities = detected_entities[column_name]
        start_end_tuples = map(
            lambda entity: (entity["start"], entity["end"]), entities
        )
        sorted_reverse_start_end_tuples = sorted(
            start_end_tuples, key=lambda start_end: start_end[1], reverse=True
        )
        return replace_cell(original_cell_value, sorted_reverse_start_end_tuples)
    return original_cell_value


row_pii_udf = udf(row_pii, StringType())


def recur(df, remaining_keys):
    if len(remaining_keys) == 0:
        return df
    else:
        head = remaining_keys[0]
        tail = remaining_keys[1:]
        modified_df = df.withColumn(
            head, row_pii_udf(lit(head), head, "DetectedEntities")
        )
        return recur(modified_df, tail)


keys = glueDf.toDF().columns
updated_masked_df = recur(detected_df.toDF(), keys)
updated_masked_df = updated_masked_df.drop("DetectedEntities")

DetectSensitiveData_node1674307865885 = DynamicFrame.fromDF(
    updated_masked_df, glueContext, "updated_masked_df"
)
DetectSensitiveData_df = DetectSensitiveData_node1674307865885.toDF()

column_process = [column for column in DetectSensitiveData_df.columns if column not in 
                  ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', '_hoodie_partition_path', '_hoodie_file_name']]





In [70]:
DetectSensitiveData_df.select(column_process).show(5)

+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
|              emp_id|  employee_name|department|state|salary|age|bonus|        ts|    email|credit_card|
+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
|9754f7a8-5e59-485...|Garrett Stewart|     Sales|   CA| 61400| 32|28970|1044552062|#########|  #########|
|15ddbe57-47ad-499...|   Leslie Payne| Marketing|   IL| 96788| 20|72432|1663487013|#########|  #########|
|43ea4c7f-67a5-4bf...| Sandra Camacho|        HR|   CA| 52474| 54|98622| 185484648|#########|  #########|
|1cefb593-a8f0-47c...|  Allison Davis| Marketing|   CA|124597| 38|77046|1442842439|#########|  #########|
|66713ca4-d011-4b5...|       Ryan Cox| Marketing|   IL|110613| 51|55816| 191214027|#########|  #########|
+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
only showing top 5 rows


# Write Masked data back intop Hudi tables 

In [71]:
processed_df = DetectSensitiveData_df.select(column_process)




In [72]:
processed_df.show(3)

+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
|              emp_id|  employee_name|department|state|salary|age|bonus|        ts|    email|credit_card|
+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
|9754f7a8-5e59-485...|Garrett Stewart|     Sales|   CA| 61400| 32|28970|1044552062|#########|  #########|
|15ddbe57-47ad-499...|   Leslie Payne| Marketing|   IL| 96788| 20|72432|1663487013|#########|  #########|
|43ea4c7f-67a5-4bf...| Sandra Camacho|        HR|   CA| 52474| 54|98622| 185484648|#########|  #########|
+--------------------+---------------+----------+-----+------+---+-----+----------+---------+-----------+
only showing top 3 rows


# Write Masked Data into Hudi table

In [73]:

WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(processed_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="processed_df",
    )
)


