# NOTE:  THIS NOTEBOOK WILL TAKE 5-10 MINUTES TO COMPLETE.

# PLEASE BE PATIENT.


# Analyze Data Quality with SageMaker Processing Jobs and Spark

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Spark are used to process and analyze data sets in order to detect data quality issues and prepare them for model training.  

In this notebook we'll use Amazon SageMaker Processing with a library called [**Deequ**](https://github.com/awslabs/deequ), and leverage the power of Spark with a managed SageMaker Processing Job to run our data processing workloads.

Here are some great resources on Deequ: 
* Blog Post:  https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/
* Research Paper:  https://assets.amazon.science/4a/75/57047bd343fabc46ec14b34cdb3b/towards-automated-data-quality-management-for-machine-learning.pdf

![Deequ](./img/deequ.png)

![Processing Job](./img/processing.jpg)

# Amazon Customer Reviews Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

### Dataset Columns:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

In [3]:
%store -r setup_dependencies_passed

In [4]:
try:
    setup_dependencies_passed
except NameError:
    print("++++++++++++++++++++++++++++++++++++++++++++++")
    print("[ERROR] YOU HAVE TO RUN THE PREVIOUS NOTEBOOKS")
    print("++++++++++++++++++++++++++++++++++++++++++++++")

In [5]:
print(setup_dependencies_passed)

True


# Run the Analysis Job using a SageMaker Processing Job with Spark
Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built with our Spark script.

In [6]:
import sagemaker
import boto3

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

import botocore.config

config = botocore.config.Config(
    user_agent_extra='dsoaws/2.0'
)

# Review the Spark preprocessing script.

In [7]:
!pygmentize preprocess_deequ_pyspark.py

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function[37m[39;49;00m
[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m unicode_literals[37m[39;49;00m
[37m[39;49;00m
[34mimport[39;49;00m [04m[36mtime[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mshutil[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mcsv[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36msubprocess[39;49;00m[37m[39;49;00m
[37m[39;49;00m
subprocess.check_call([sys.executable, [33m"[39;49;00m[33m-m[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33mpip[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33minstall[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33m--no-deps[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33mpydeequ==0.1.5[39;49;00m[33m"[39;49;00m])[37m[39;49;00m
subpr

In [8]:
from sagemaker.spark.processing import PySparkProcessor

processor = PySparkProcessor(
    base_job_name="spark-amazon-reviews-analyzer",
    role=role,
    framework_version="2.4",
    instance_count=2,
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=300,
)

In [9]:
s3_input_data = "s3://{}/amazon-reviews-pds/tsv/".format(bucket)
print(s3_input_data)

s3://sagemaker-us-east-1-124111238543/amazon-reviews-pds/tsv/


In [10]:
!aws s3 ls $s3_input_data

2023-05-26 03:03:07   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2023-05-26 03:03:08   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz
2023-05-26 03:03:10   12134676 amazon_reviews_us_Gift_Card_v1_00.tsv.gz


## Setup Output Data

In [11]:
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = "amazon-reviews-spark-analyzer-{}".format(timestamp_prefix)
processing_job_name = "amazon-reviews-spark-analyzer-{}".format(timestamp_prefix)

print("Processing job name:  {}".format(processing_job_name))

Processing job name:  amazon-reviews-spark-analyzer-2023-05-26-03-05-31


In [12]:
s3_output_analyze_data = "s3://{}/{}/output".format(bucket, output_prefix)

print(s3_output_analyze_data)

s3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output


## Start the Spark Processing Job

_Notes on not using `ProcessingInput` and `ProcessingOutput`:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by `ProcessingInput` and `ProcessingOutput` (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes.
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use `ProcessingInput`, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [13]:
from sagemaker.processing import ProcessingOutput

processor.run(
    submit_app="preprocess_deequ_pyspark.py",
    submit_jars=["deequ-1.0.3-rc2.jar"],
    arguments=[
        "s3_input_data",
        s3_input_data,
        "s3_output_analyze_data",
        s3_output_analyze_data,
    ],
    logs=True,
    wait=False,
)

INFO:sagemaker:Creating processing-job with name spark-amazon-reviews-analyzer-2023-05-26-03-05-32-129


In [14]:
from IPython.core.display import display, HTML

processing_job_name = processor.jobs[-1].describe()["ProcessingJobName"]

display(
    HTML(
        '<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Processing Job</a></b>'.format(
            region, processing_job_name
        )
    )
)

In [15]:
from IPython.core.display import display, HTML

processing_job_name = processor.jobs[-1].describe()["ProcessingJobName"]

display(
    HTML(
        '<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After a Few Minutes</b>'.format(
            region, processing_job_name
        )
    )
)

In [16]:
from IPython.core.display import display, HTML

s3_job_output_prefix = output_prefix

display(
    HTML(
        '<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(
            bucket, s3_job_output_prefix, region
        )
    )
)

# Monitor the Processing Job

In [17]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(
    processing_job_name=processing_job_name, sagemaker_session=sess
)

processing_job_description = running_processor.describe()

print(processing_job_description)

{'ProcessingInputs': [{'InputName': 'jars', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-124111238543/spark-amazon-reviews-analyzer-2023-05-26-03-05-32-129/input/jars', 'LocalPath': '/opt/ml/processing/input/jars', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-124111238543/spark-amazon-reviews-analyzer-2023-05-26-03-05-32-129/input/code/preprocess_deequ_pyspark.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingJobName': 'spark-amazon-reviews-analyzer-2023-05-26-03-05-32-129', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2, 'InstanceType': 'ml.m5.2xlarge', 'VolumeSizeInGB': 30}}, 'StoppingCondition': {'MaxRuntimeInSeconds': 300}, 'AppSpecification':

In [18]:
running_processor.wait()

..........................[35m05-26 03:09 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '--jars', '/opt/ml/processing/input/jars', '/opt/ml/processing/input/code/preprocess_deequ_pyspark.py', 's3_input_data', 's3://sagemaker-us-east-1-124111238543/amazon-reviews-pds/tsv/', 's3_output_analyze_data', 's3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output'][0m
[35m05-26 03:09 smspark.cli  INFO     Raw spark options before processing: {'jars': '/opt/ml/processing/input/jars', 'class_': None, 'py_files': None, 'files': None, 'verbose': False}[0m
[35m05-26 03:09 smspark.cli  INFO     App and app arguments: ['/opt/ml/processing/input/code/preprocess_deequ_pyspark.py', 's3_input_data', 's3://sagemaker-us-east-1-124111238543/amazon-reviews-pds/tsv/', 's3_output_analyze_data', 's3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output'][0m
[35m05-26 03:09 smspark.cli  INFO     

# _Please Wait Until the ^^ Processing Job ^^ Completes Above._

# Inspect the Processed Output 

## These are the quality checks on our dataset.

## _The next cells will not work properly until the job completes above._

In [19]:
!aws s3 ls --recursive $s3_output_analyze_data/

2023-05-26 03:10:38          0 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-checks/_SUCCESS
2023-05-26 03:10:38        773 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-checks/part-00000-0dffed97-9506-497f-b68c-94decf43539b-c000.csv
2023-05-26 03:10:56          0 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-suggestions/_SUCCESS
2023-05-26 03:10:55       8615 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-suggestions/part-00000-7bcf80e7-b594-490f-a1fd-934d859dcd0b-c000.csv
2023-05-26 03:10:30          0 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/dataset-metrics/_SUCCESS
2023-05-26 03:10:30        363 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/dataset-metrics/part-00000-0a15a342-590b-4393-a7e4-b13a0c683975-c000.csv
2023-05-26 03:10:40          0 amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/success-metrics/_SUCCESS
2023-05-26 03:10:40        277 amazon-reviews-s

## Copy the Output from S3 to Local
* dataset-metrics/
* constraint-checks/
* success-metrics/
* constraint-suggestions/


In [20]:
!aws s3 cp --recursive $s3_output_analyze_data ./amazon-reviews-spark-analyzer/ --exclude="*" --include="*.csv"

download: s3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-checks/part-00000-0dffed97-9506-497f-b68c-94decf43539b-c000.csv to amazon-reviews-spark-analyzer/constraint-checks/part-00000-0dffed97-9506-497f-b68c-94decf43539b-c000.csv
download: s3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/success-metrics/part-00000-1103f978-f729-4a7e-af87-a9634e71b558-c000.csv to amazon-reviews-spark-analyzer/success-metrics/part-00000-1103f978-f729-4a7e-af87-a9634e71b558-c000.csv
download: s3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/dataset-metrics/part-00000-0a15a342-590b-4393-a7e4-b13a0c683975-c000.csv to amazon-reviews-spark-analyzer/dataset-metrics/part-00000-0a15a342-590b-4393-a7e4-b13a0c683975-c000.csv
download: s3://sagemaker-us-east-1-124111238543/amazon-reviews-spark-analyzer-2023-05-26-03-05-31/output/constraint-suggestions/part-00000-7bcf8

## Analyze Constraint Checks

In [21]:
import glob
import pandas as pd

def load_dataset(path, sep, header):
    data = pd.concat(
        [pd.read_csv(f, sep=sep, header=header) for f in glob.glob("{}/*.csv".format(path))], ignore_index=True
    )

    return data

In [22]:
df_constraint_checks = load_dataset(path="./amazon-reviews-spark-analyzer/constraint-checks/", sep="\t", header=0)
df_constraint_checks[["check", "constraint", "constraint_status", "constraint_message"]]

Unnamed: 0,check,constraint,constraint_status,constraint_message
0,Review Check,"MinimumConstraint(Minimum(star_rating,None))",Success,
1,Review Check,"CompletenessConstraint(Completeness(review_id,...",Success,
2,Review Check,CompletenessConstraint(Completeness(marketplac...,Success,
3,Review Check,SizeConstraint(Size(None)),Success,
4,Review Check,"MaximumConstraint(Maximum(star_rating,None))",Success,
5,Review Check,UniquenessConstraint(Uniqueness(List(review_id...,Success,
6,Review Check,ComplianceConstraint(Compliance(marketplace co...,Success,


## Analyze Dataset Metrics

In [23]:
df_dataset_metrics = load_dataset(path="./amazon-reviews-spark-analyzer/dataset-metrics/", sep="\t", header=0)
df_dataset_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Mutlicolumn,"total_votes,star_rating",Correlation,-0.086052
2,Column,star_rating,Mean,4.102493
3,Mutlicolumn,"total_votes,helpful_votes",Correlation,0.985751
4,Column,review_id,ApproxCountDistinct,381704.0
5,Dataset,*,Size,396601.0
6,Column,top star_rating,Compliance,0.765893


## Analyze Success Metrics

In [24]:
df_success_metrics = load_dataset(path="./amazon-reviews-spark-analyzer/success-metrics/", sep="\t", header=0)
df_success_metrics

Unnamed: 0,entity,instance,name,value
0,Column,review_id,Completeness,1.0
1,Dataset,*,Size,396601.0
2,Column,star_rating,Minimum,1.0
3,Column,marketplace,Completeness,1.0
4,Column,review_id,Uniqueness,1.0
5,Column,star_rating,Maximum,5.0
6,Column,"marketplace contained in US,UK,DE,JP,FR",Compliance,1.0


## Analyze Constraint Suggestions

In [25]:
pd.set_option('max_colwidth', 999)

df_constraint_suggestions = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-suggestions/', sep='\t', header=0)
df_constraint_suggestions

Unnamed: 0,code_for_constraint,column_name,constraint_name,current_value,description,rule_description,suggesting_rule
0,".isUnique(\review_id\"")""",review_id,"UniquenessConstraint(Uniqueness(List(review_id),None))",ApproxDistinctness: 0.9624383196209793,'review_id' is unique,"If the ratio of approximate num distinct values in a column is close to the number of records (within the error of the HLL sketch), we suggest a UNIQUE constraint",UniqueIfApproximatelyUniqueRule()
1,".isNonNegative(\customer_id\"")""",customer_id,"ComplianceConstraint(Compliance('customer_id' has no negative values,customer_id >= 0,None))",Minimum: 10229.0,'customer_id' has no negative values,"If we see only non-negative numbers in a column, we suggest a corresponding constraint",NonNegativeNumbersRule()
2,".isComplete(\review_date\"")""",review_date,"CompletenessConstraint(Completeness(review_date,None))",Completeness: 1.0,'review_date' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
3,".isComplete(\star_rating\"")""",star_rating,"CompletenessConstraint(Completeness(star_rating,None))",Completeness: 1.0,'star_rating' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
4,".isComplete(\product_title\"")""",product_title,"CompletenessConstraint(Completeness(product_title,None))",Completeness: 1.0,'product_title' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
5,".isComplete(\product_id\"")""",product_id,"CompletenessConstraint(Completeness(product_id,None))",Completeness: 1.0,'product_id' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
6,".isComplete(\product_category\"")""",product_category,"CompletenessConstraint(Completeness(product_category,None))",Completeness: 1.0,'product_category' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
7,".isContainedIn(\marketplace\"", [\""US\""])""",marketplace,"ComplianceConstraint(Compliance('marketplace' has value range 'US',`marketplace` IN ('US'),None))",Compliance: 1,'marketplace' has value range 'US',"If we see a categorical range for a column, we suggest an IS IN (...) constraint",CategoricalRangeRule()
8,".isComplete(\marketplace\"")""",marketplace,"CompletenessConstraint(Completeness(marketplace,None))",Completeness: 1.0,'marketplace' is not null,"If a column is complete in the sample, we suggest a NOT NULL constraint",CompleteIfCompleteRule()
9,".isContainedIn(\verified_purchase\"", [\""Y\"", \""N\""])""",verified_purchase,"ComplianceConstraint(Compliance('verified_purchase' has value range 'Y', 'N',`verified_purchase` IN ('Y', 'N'),None))",Compliance: 1,"'verified_purchase' has value range 'Y', 'N'","If we see a categorical range for a column, we suggest an IS IN (...) constraint",CategoricalRangeRule()


# Release Resources

In [26]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [27]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}

<IPython.core.display.Javascript object>