# Analyze Data Quality with SageMaker Processing Jobs and Spark

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Spark are used to process and analyze data sets in order to detect data quality issues and prepare them for model training.  

In this notebook we'll use Amazon SageMaker Processing with a library called [**Deequ**](https://github.com/awslabs/deequ), and leverage the power of Spark with a managed SageMaker Processing Job to run our data processing workloads.

Here are some great resources on Deequ: 
* Blog Post:  https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/
* Research Paper:  https://assets.amazon.science/4a/75/57047bd343fabc46ec14b34cdb3b/towards-automated-data-quality-management-for-machine-learning.pdf

![Deequ](img/deequ.png)

![](img/processing.jpg)

# Amazon Customer Reviews Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

### Dataset Columns:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

In [1]:
ingest_create_athena_table_tsv = False

In [2]:
%store -r ingest_create_athena_table_tsv

no stored variable or alias ingest_create_athena_table_tsv


In [3]:
if not ingest_create_athena_table_tsv:
    print('+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
    print('[ERROR] YOU HAVE TO RUN THE NOTEBOOKS IN THE INGEST FOLDER FIRST.')
    print('+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++')
else:
    print('[OK]')

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[ERROR] YOU HAVE TO RUN THE NOTEBOOKS IN THE INGEST FOLDER FIRST.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


In [4]:
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# Pull the Spark-Deequ Docker Image

In [86]:
public_image_uri='docker.io/datascienceonaws/spark-deequ:1.0.0'

In [None]:
!sm-docker build pull public_image_uri

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
................[Container] 2024/07/17 05:35:34.868909 Running on CodeBuild On-demand

[Container] 2024/07/17 05:35:34.868938 Waiting for agent ping
[Container] 2024/07/17 05:35:37.078263 Waiting for DOWNLOAD_SOURCE
[Container] 2024/07/17 05:35:37.324195 Phase is DOWNLOAD_SOURCE
[Container] 2024/07/17 05:35:37.325361 CODEBUILD_SRC_DIR=/codebuild/output/src3664130735/src
[Container] 2024/07/17 05:35:37.325879 YAML location is /codebuild/output/src3664130735/src/buildspec.yml
[Container] 2024/07/17 05:35:37.327677 Setting HTTP client timeout to higher timeout for S3 source
[Container] 2024/07/17 05:35:37.327866 Processing environment variables
[Container] 2024/07/17 05:35:37.370381 No runtime version selected in buildspec.
[Container] 2024/07/17 05:35:37.387136 Moving to director

# Push the Image to a Private Docker Repo

In [80]:
private_docker_repo = 'spark-deequ'
private_docker_tag = '1.0.0'

In [78]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

private_image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, private_docker_repo, private_docker_tag)
print(private_image_uri)

992382405090.dkr.ecr.us-east-1.amazonaws.com/spark-deequ:1.0.0


In [82]:
!sm-docker build tag public_image_uri private_image_uri

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
................[Container] 2024/07/17 05:01:45.570832 Running on CodeBuild On-demand

[Container] 2024/07/17 05:01:45.570983 Waiting for agent ping
[Container] 2024/07/17 05:01:48.782280 Waiting for DOWNLOAD_SOURCE
[Container] 2024/07/17 05:01:49.115989 Phase is DOWNLOAD_SOURCE
[Container] 2024/07/17 05:01:49.117015 CODEBUILD_SRC_DIR=/codebuild/output/src571516812/src
[Container] 2024/07/17 05:01:49.117478 YAML location is /codebuild/output/src571516812/src/buildspec.yml
[Container] 2024/07/17 05:01:49.119275 Setting HTTP client timeout to higher timeout for S3 source
[Container] 2024/07/17 05:01:49.119452 Processing environment variables
[Container] 2024/07/17 05:01:49.163445 No runtime version selected in buildspec.
[Container] 2024/07/17 05:01:49.177893 Moving to directory 

In [45]:
!$(aws ecr get-login-password --region us-east-1 | sm-docker build login --username AWS --password-stdin $account_id.dkr.ecr.us-east-1.amazonaws.com)

/bin/bash: line 1: sagemaker.config: command not found


# Ignore `spark-deequ does not exist` error below

In [46]:
!aws ecr describe-repositories --repository-names $private_docker_repo || aws ecr create-repository --repository-name $private_docker_repo

{
    "repositories": [
        {
            "repositoryArn": "arn:aws:ecr:us-east-1:992382405090:repository/spark-deequ",
            "registryId": "992382405090",
            "repositoryName": "spark-deequ",
            "repositoryUri": "992382405090.dkr.ecr.us-east-1.amazonaws.com/spark-deequ",
            "createdAt": "2024-07-17T03:09:37.526000+00:00",
            "imageTagMutability": "MUTABLE",
            "imageScanningConfiguration": {
                "scanOnPush": false
            },
            "encryptionConfiguration": {
                "encryptionType": "AES256"
            }
        }
    ]
}


# Ignore ^^ `spark-deequ does not exist` ^^ error above

In [48]:
!sm-docker build push $private_image_uri

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
...............
[Container] 2024/07/17 04:26:48.382152 Running on CodeBuild On-demand

[Container] 2024/07/17 04:26:48.382167 Waiting for agent ping
[Container] 2024/07/17 04:26:48.483180 Waiting for DOWNLOAD_SOURCE
[Container] 2024/07/17 04:26:48.835032 Phase is DOWNLOAD_SOURCE
[Container] 2024/07/17 04:26:48.835844 CODEBUILD_SRC_DIR=/codebuild/output/src3444086258/src
[Container] 2024/07/17 04:26:48.836255 YAML location is /codebuild/output/src3444086258/src/buildspec.yml
[Container] 2024/07/17 04:26:48.837856 Setting HTTP client timeout to higher timeout for S3 source
[Container] 2024/07/17 04:26:48.837956 Processing environment variables
[Container] 2024/07/17 04:26:48.873011 No runtime version selected in buildspec.
[Container] 2024/07/17 04:26:48.890178 Moving to director

# Run the Analysis Job using a SageMaker Processing Job

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built with our Spark script.

# Review the Spark preprocessing script.

In [53]:
!pygmentize ../preprocess-deequ-pyspark.py

[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m print_function[37m[39;49;00m
[34mfrom[39;49;00m [04m[36m__future__[39;49;00m [34mimport[39;49;00m unicode_literals[37m[39;49;00m
[37m[39;49;00m
[34mimport[39;49;00m [04m[36mtime[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mshutil[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mcsv[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36msubprocess[39;49;00m[37m[39;49;00m
[37m[39;49;00m
subprocess.check_call([sys.executable, [33m"[39;49;00m[33m-m[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33mpip[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33minstall[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33m--no-deps[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33mpydeequ==0.1.5[39;49;00m[33m"[39;49;00m])[37m[39;49;00m
subpr

In [69]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-analyzer',
                            image_uri=private_image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.t3.medium',
                            env={
                                'mode': 'jar',
                                'main_class': 'Main'
                            })

In [56]:
s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)
print(s3_input_data)

s3://sagemaker-us-east-1-992382405090/amazon-reviews-pds/tsv/


In [57]:
!aws s3 ls $s3_input_data

2024-07-10 06:24:13   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz
2024-07-10 06:24:15   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz
2024-07-10 06:24:17   12134676 amazon_reviews_us_Gift_Card_v1_00.tsv.gz


## Setup Output Data

In [58]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)
processing_job_name = 'amazon-reviews-spark-analyzer-{}'.format(timestamp_prefix)

print('Processing job name:  {}'.format(processing_job_name))

Processing job name:  amazon-reviews-spark-analyzer-2024-07-17-04-34-44


In [59]:
s3_output_analyze_data = 's3://{}/{}/output'.format(bucket, output_prefix)

print(s3_output_analyze_data)

s3://sagemaker-us-east-1-992382405090/amazon-reviews-spark-analyzer-2024-07-17-04-34-44/output


## Start the Spark Processing Job

_Notes on Invoking from Lambda:_
* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess.py` file to S3 and specify the everything include --py-files, etc.
* We would need to do the following before invoking the Lambda:
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py
* Then reference the s3://<location> above in the --py-files, etc.
* See Lambda example code in this same project for more details.

_Notes on not using ProcessingInput and Output:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._"
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [70]:
from sagemaker.processing import ProcessingOutput

processor.run(code='../preprocess-deequ-pyspark.py',
              arguments=['s3_input_data', s3_input_data,
                         's3_output_analyze_data', s3_output_analyze_data,
              ],
              # See https://github.com/aws/sagemaker-python-sdk/issues/1341 
              #   for why we need to specify a null-output
              outputs=[
                  ProcessingOutput(s3_upload_mode='EndOfJob',
                                   output_name='null-output',
                                   source='/opt/ml/processing/output')
              ],
              logs=True,
              wait=False
)

INFO:sagemaker:Creating processing-job with name spark-amazon-reviews-analyzer-2024-07-17-04-38-17-663


In [72]:
from IPython.display import display, HTML

processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Processing Job</a></b>'.format(region, processing_job_name)))


In [73]:
from IPython.display import display, HTML

processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After a Few Minutes</b>'.format(region, processing_job_name)))


In [74]:
from IPython.display import display, HTML

s3_job_output_prefix = output_prefix

display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, s3_job_output_prefix, region)))


# Monitor the Processing Job

In [75]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

print(processing_job_description)

{'ProcessingInputs': [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-992382405090/spark-amazon-reviews-analyzer-2024-07-17-04-38-17-663/input/code/preprocess-deequ-pyspark.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'null-output', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-992382405090/spark-amazon-reviews-analyzer-2024-07-17-04-38-17-663/output/null-output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}, 'AppManaged': False}]}, 'ProcessingJobName': 'spark-amazon-reviews-analyzer-2024-07-17-04-38-17-663', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2, 'InstanceType': 'ml.t3.medium', 'VolumeSizeInGB': 30}}, 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}, 'AppSpecification': {'ImageUri': '992382405090.dkr.ecr.us-e

In [76]:
running_processor.wait()

.........
..

UnexpectedStatusException: Error for Processing job spark-amazon-reviews-analyzer-2024-07-17-04-38-17-663: Failed. Reason: ClientError: API error (404): manifest for 992382405090.dkr.ecr.us-east-1.amazonaws.com/spark-deequ:1.0.0 not found: manifest unknown: Requested image not found

# _Please Wait Until the ^^ Processing Job ^^ Completes Above._

# Inspect the Processed Output 

## These are the quality checks on our dataset.

## _The next cells will not work properly until the job completes above._

In [None]:
!aws s3 ls --recursive $s3_output_analyze_data/

## Copy the Output from S3 to Local
* dataset-metrics/
* constraint-checks/
* success-metrics/
* constraint-suggestions/


In [None]:
!aws s3 cp --recursive $s3_output_analyze_data ./amazon-reviews-spark-analyzer/ --exclude="*" --include="*.csv"

## Analyze Constraint Checks

In [None]:
import glob
import pandas as pd
import os

def load_dataset(path, sep, header):
    data = pd.concat([pd.read_csv(f, sep=sep, header=header) for f in glob.glob('{}/*.csv'.format(path))], ignore_index = True)

    return data

In [None]:
df_constraint_checks = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-checks/', sep='\t', header=0)
df_constraint_checks[['check', 'constraint', 'constraint_status', 'constraint_message']]

## Analyze Dataset Metrics

In [None]:
df_dataset_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/dataset-metrics/', sep='\t', header=0)
df_dataset_metrics

## Analyze Success Metrics

In [None]:
df_success_metrics = load_dataset(path='./amazon-reviews-spark-analyzer/success-metrics/', sep='\t', header=0)
df_success_metrics

## Analyze Constraint Suggestions

In [None]:
df_constraint_suggestions = load_dataset(path='./amazon-reviews-spark-analyzer/constraint-suggestions/', sep='\t', header=0)
df_constraint_suggestions.columns=['column_name', 'description', 'code']
df_constraint_suggestions

# Save for the Next Notebook(s)

In [None]:
%store df_dataset_metrics

In [None]:
%%javascript
Jupyter.notebook.save_checkpoint();
Jupyter.notebook.session.delete();