
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session.                                                                                                 |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0).                               |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |  Changes the session type to Glue ETL.                                                                                                                    |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X.                                                                           |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer.                      |

In [31]:
%stop_session

There is no current session.


# Step 1 define Settings

In [40]:
%connections hudi-connection
%glue_version 3.0
%region us-west-2
%worker_type G.1X
%number_of_workers 5
%extra_jars s3://soumilshah-hudi-demos/jar/deequ-2.0.0-spark-3.1.jar
%additional_python_modules Faker,pydeequ,sagemaker-pyspark
%spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer


Connections to be included:
hudi-connection
Setting Glue version to: 3.0
Previous region: us-west-2
Setting new region to: us-west-2
Reauthenticating Glue client with new region: us-west-2
IAM role has been set to arn:aws:iam::043916019468:role/Lab3. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::043916019468:role/Lab3
Authentication done.
Region is set to: us-west-2
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 2
Setting new number of workers to: 5
Extra jars to be included:
s3://soumilshah-hudi-demos/jar/deequ-2.0.0-spark-3.1.jar
Additional python modules to be included:
Faker
pydeequ
sagemaker-pyspark
Previous Spark configuration: spark.serializer=org.apache.spark.serializer.KryoSerializer
Setting new Spark configuration to: spark.serializer=org.apache.spark.serializer.KryoSerializer
s3://soumilshah-hudi-demos/jar/deequ-2.0.0-spark-3.1.jar


# Step 2: Define Imports

In [3]:
try:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.session import SparkSession
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from awsglue.utils import getResolvedOptions
    from pyspark.sql.types import *
    from datetime import datetime
    import boto3
    from functools import reduce
except Exception as e:
    pass




# Step 3: Create Spark Session

In [4]:
from pyspark.sql import SparkSession, Row, DataFrame
import json
import pandas as pd
import sagemaker_pyspark

import pydeequ

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = (SparkSession
    .builder
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') 
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') 
    .config('className', 'org.apache.hudi') 
    .config("spark.driver.extraClassPath", classpath)
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
logger = glueContext.get_logger()




# Step 4: Create Hudi Datalake

In [5]:
import pyspark.sql.functions as f
spark_df = spark.read.parquet("s3a://amazon-reviews-pds/parquet/product_category=Electronics/")
spark_df = spark_df.withColumn("uuid", f.expr("uuid()"))




In [6]:
spark_df.show(3)
print("\n")
print(spark_df.printSchema())

+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+--------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|year|                uuid|
+-----------+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+----+--------------------+
|         US|   52826068|R3SC2T77Y0PSN8|B00FIYDC1W|     822091995|Monster DNA Over-...|          4|            0|          0|   Y|                N|Very nice sound, ...|These headphones ...| 2014-04-09|2014|3ea671d0-e7f4-4ba...|
|         US|   13676500|R2ONIZ7ICKORQV|B00E19H9U0|     662432872|Tech Armor 30ML G.

# Creating Hudi Tables 

In [7]:
db_name = "hudidb"
table_name="reviews"

recordkey = 'uuid'
precombine = 'customer_id'

method = 'upsert'
table_type = "COPY_ON_WRITE"

path = "s3://soumilshah-hudi-demos/reviews/"


connection_options={
    "path": path,
    "connectionName": "hudi-connection",

    "hoodie.datasource.write.storage.type": table_type,
    'className': 'org.apache.hudi',
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,


    'hoodie.datasource.hive_sync.enable': 'true',
    "hoodie.datasource.hive_sync.mode":"hms",
    'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
    'hoodie.datasource.hive_sync.database': db_name,
    'hoodie.datasource.hive_sync.table': table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
}




In [8]:
WriteDF = (
    glueContext.write_dynamic_frame.from_options(
        frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
)




# Data Quailty & Validation  with  PyDeequ for Datalakes 

In [14]:
ReadGlueDF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options=connection_options,
        transformation_ctx="ReadGlueDF",
    )
)
df = ReadGlueDF.toDF()




In [19]:
from pydeequ.analyzers import *

analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("review_id")) \
                    .addAnalyzer(ApproxCountDistinct("review_id")) \
                    .addAnalyzer(Mean("star_rating")) \
                    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
                    .addAnalyzer(Correlation("total_votes", "star_rating")) \
                    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
                    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show(truncate=False)

+-----------+-------------------------+-------------------+---------------------+
|entity     |instance                 |name               |value                |
+-----------+-------------------------+-------------------+---------------------+
|Column     |review_id                |Completeness       |1.0                  |
|Column     |review_id                |ApproxCountDistinct|3010972.0            |
|Mutlicolumn|total_votes,star_rating  |Correlation        |-0.034510979965387205|
|Dataset    |*                        |Size               |3120938.0            |
|Column     |star_rating              |Mean               |4.036143941340712    |
|Column     |top star_rating          |Compliance         |0.7494070692849394   |
|Mutlicolumn|total_votes,helpful_votes|Correlation        |0.9936463809903744   |
+-----------+-------------------------+-------------------+---------------------+


From this, we learn that:

* review_id has no missing values and approximately 3,010,972 unique values.
* 74.9% of reviews have a star_rating of 4 or higher
* total_votes and star_rating are not correlated.
* helpful_votes and total_votes are strongly correlated
* the average star_rating is 4.0
* The dataset contains 3,120,938 reviews.


In [43]:
from pydeequ.checks import *
from pydeequ.verification import *

check = Check(spark, CheckLevel.Warning, "Amazon Electronic Products Reviews")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000000) \
        .hasMin("star_rating", lambda x: x == 1.0) \
        .hasMax("star_rating", lambda x: x == 5.0)  \
        .isComplete("review_id")  \
        .isUnique("review_id")  \
        .isComplete("marketplace")  \
        .isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) \
        .isNonNegative("year")) \
    .run()

print(f"Verification Run Status: {checkResult.status}")
checkResult_df_all = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df_all.show()
checkResult_df_all.printSchema()

+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
+--------------------+-----------+------------+--------------------+-----------------+--------------------+

root
 |-- check: string (nullable = true)
 |-- check_level: string (nullable = true)
 |-- check_status: string (nullable = true)
 |-- constraint: string (nullable = true)
 |-- constraint_status: string (nullable = true)
 |-- constraint_message: string (nullable = true)


After calling run(), PyDeequ translates your test description into Deequ, which in its turn translates it into a series of Spark jobs which are executed to compute metrics on the data. Afterwards, it invokes your assertion functions (e.g., lambda x: x == 1.0 for the minimum star-rating check) on these metrics to see if the constraints hold on the data.

Interestingly, the review_id column is not unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running:

In [41]:
checkResult_df = VerificationResult.successMetricsAsDataFrame(spark, checkResult)
checkResult_df.show()

+-------+--------------------+------------+------------------+
| entity|            instance|        name|             value|
+-------+--------------------+------------+------------------+
| Column|           review_id|Completeness|               1.0|
| Column|           review_id|  Uniqueness|0.9926566948782706|
|Dataset|                   *|        Size|         3120938.0|
| Column|         star_rating|     Minimum|               1.0|
| Column|         star_rating|     Maximum|               5.0|
| Column|marketplace conta...|  Compliance|               1.0|
| Column|         marketplace|Completeness|               1.0|
| Column|year is non-negative|  Compliance|               1.0|
+-------+--------------------+------------+------------------+


In [46]:
checkResult_df_all.filter(checkResult_df_all.constraint_status == 'Failure').show(truncate=False)

+----------------------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check                             |check_level|check_status|constraint                                            |constraint_status|constraint_message                                                 |
+----------------------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+
+----------------------------------+-----------+------------+------------------------------------------------------+-----------------+-------------------------------------------------------------------+


# Credits 

* Github Profile  gucciwang for priving a Notebook due to which it was easy to learn concepts 


# References


https://aws.amazon.com/glue/features/data-quality/



Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog
https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/



Getting started with AWS Glue Data Quality for ETL Pipelines
https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/



Monitor data quality in your data lake using PyDeequ and AWS Glue
https://aws.amazon.com/blogs/big-data/monitor-data-quality-in-your-data-lake-using-pydeequ-and-aws-glue/



Testing data quality at scale with PyDeequ
https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/




Test data quality at scale with PyDeequ
https://github.com/awslabs/python-deequ/blob/master/tutorials/test_data_quality_at_scale.ipynb




