# Feature transformation with Amazon SageMaker processing job and Feature Store

convert the original review text into machine-readable features used by BERT: configure an Amazon SageMaker processing job, which will be running a custom Python script.


In [None]:
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.35.0
!conda install -q -y pytorch==1.6.0 -c pytorch
!pip install --disable-pip-version-check -q transformers==3.5.1

import boto3
import sagemaker
import botocore

config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w1')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', config=config)
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime', config=config)
sess = sagemaker.Session(sagemaker_client=sm, sagemaker_featurestore_runtime_client=featurestore_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

## Configure the SageMaker Feature Store

In [None]:
#configure a SageMaker processing job to run a custom Python script to balance and transform the raw data into a format used by BERT model.
#To balance and transform our data, you will use a scikit-learn-based processing job.use a scikit-learn-based processing job. This is essentially a generic Python processing job with scikit-learn pre-installed.

#DEFINE AND RUN THE PROCESSING JOB

#arguments
import time
raw_input_data_s3_uri = 's3://dlai-practical-data-science/data/raw/'
timestamp = int(time.time())
feature_group_name = 'reviews-feature-group-' + str(timestamp) #FEATURE GROUP: To configure a Feature Store you need to setup a Feature Group. This is the main resource containing all of the metadata related to the data stored in the Feature Store. A Feature Group should contain a list of Feature Definitions. A Feature Definition consists of a name and the data type. The Feature Group also contains an online store configuration and an offline store configuration controlling where the data is stored. Enabling the online store allows quick access to the latest value for a record via the GetRecord API. The offline store allows storage of the data in your S3 bucket. You will be using the offline store in this lab.
feature_store_offline_prefix = 'reviews-feature-store-' + str(timestamp) #As the result of the transformation, in addition to generating files in S3 bucket, you will also save the transformed data in the Amazon SageMaker Feature Store to be used by others in your organization, for example.
processing_instance_type='ml.c5.xlarge'
processing_instance_count=1
train_split_percentage=0.90
validation_split_percentage=0.05
test_split_percentage=0.05
balance_dataset=True
max_seq_length=128 #max_seq_length, which specifies the maximum length of the classified reviews for the RoBERTa model. If the sentence is shorter than the maximum length parameter, it will be padded. In another case, when the sentence is longer, it will be truncated from the right side. Since a smaller max_seq_length leads to faster training and lower resource utilization, you want to find the smallest power-of-2 that captures 100% of our reviews. For this dataset, the 100th percentile is 115. However, it's best to stick with powers-of-2 when using BERT.

#define and run the process
from sagemaker.sklearn.processing import SKLearnProcessor
processor = SKLearnProcessor(framework_version='0.23-1',role=role, instance_type= 'ml.c5.xlarge', 
            instance_count=1,env={'AWS_DEFAULT_REGION': region}, max_runtime_in_seconds=7200)

from sagemaker.processing import ProcessingInput, ProcessingOutput

processor.run(code='./c2w1/src/prepare_data.py',
          inputs=[
                ProcessingInput(source=raw_input_data_s3_uri,
                                destination='/opt/ml/processing/input/data/', s3_data_distribution_type='ShardedByS3Key')],
          outputs=[
                ProcessingOutput(output_name='sentiment-train', source='/opt/ml/processing/output/sentiment/train', s3_upload_mode='EndOfJob'),
                ProcessingOutput(output_name='sentiment-validation', source='/opt/ml/processing/output/sentiment/validation', s3_upload_mode='EndOfJob'),
                ProcessingOutput(output_name='sentiment-test', source='/opt/ml/processing/output/sentiment/test', s3_upload_mode='EndOfJob')],
          arguments=['--train-split-percentage', str(train_split_percentage),
                     '--validation-split-percentage', str(validation_split_percentage),
                     '--test-split-percentage', str(test_split_percentage),
                     '--balance-dataset', str(balance_dataset),
                     '--max-seq-length', str(max_seq_length),                         
                     '--feature-store-offline-prefix', str(feature_store_offline_prefix),
                     '--feature-group-name', str(feature_group_name)], logs=True, wait=False)    

In [26]:
#You can see the information about the processing jobs using the `describe` function. The result is in dictionary format.
print(processor.jobs[-1].describe().keys())
scikit_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']
scikit_processing_job_status = processor.jobs[-1].describe()['ProcessingJobStatus']
print('Processing job status: {}'.format(scikit_processing_job_status))

#Review the created processing job in the AWS console.

from IPython.core.display import display, HTML
display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">processing job</a></b>'.format(region, scikit_processing_job_name)))

dict_keys(['ProcessingInputs', 'ProcessingOutputConfig', 'ProcessingJobName', 'ProcessingResources', 'StoppingCondition', 'AppSpecification', 'Environment', 'RoleArn', 'ProcessingJobArn', 'ProcessingJobStatus', 'ProcessingStartTime', 'LastModifiedTime', 'CreationTime', 'ResponseMetadata'])
Processing job status: InProgress


In [27]:
#After the completion of the processing job you can also review the output in the S3 bucket.
display(HTML('<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 output data</a> after the processing job has completed</b>'.format(bucket, scikit_processing_job_name, region)))

#Inspect the transformed and balanced data in the S3 bucket.
#%%time
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=scikit_processing_job_name, sagemaker_session=sess)
running_processor.wait(logs=False) #Please wait until ^^ Processing Job ^^ completes above
processing_job_description = running_processor.describe()

output_config = processing_job_description['ProcessingOutputConfig']
for output in output_config['Outputs']:
    if output['OutputName'] == 'sentiment-train':
        processed_train_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'sentiment-validation':
        processed_validation_data_s3_uri = output['S3Output']['S3Uri']
    if output['OutputName'] == 'sentiment-test':
        processed_test_data_s3_uri = output['S3Output']['S3Uri']
        
print(processed_train_data_s3_uri)
print(processed_validation_data_s3_uri)
print(processed_test_data_s3_uri)


!aws s3 ls $processed_train_data_s3_uri/
!aws s3 ls $processed_validation_data_s3_uri/
!aws s3 ls $processed_test_data_s3_uri/

!head -n 5 ./balanced/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv

....................................................!s3://sagemaker-us-east-1-170235698766/sagemaker-scikit-learn-2022-07-26-06-44-43-422/output/sentiment-train
s3://sagemaker-us-east-1-170235698766/sagemaker-scikit-learn-2022-07-26-06-44-43-422/output/sentiment-validation
s3://sagemaker-us-east-1-170235698766/sagemaker-scikit-learn-2022-07-26-06-44-43-422/output/sentiment-test


In [None]:
#Copy and review the data into the folder balanced.
!aws s3 cp $processed_train_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./c2w1/balanced/sentiment-train/
!aws s3 cp $processed_validation_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./c2w1/balanced/sentiment-validation/
!aws s3 cp $processed_test_data_s3_uri/part-algo-1-womens_clothing_ecommerce_reviews.tsv ./c2w1/balanced/sentiment-test/

!head -n 5 ./c2w1/balanced/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv
!head -n 5 ./c2w1/balanced/sentiment-validation/part-algo-1-womens_clothing_ecommerce_reviews.tsv
!head -n 5 ./c2w1/balanced/sentiment-test/part-algo-1-womens_clothing_ecommerce_reviews.tsv

## Query the Feature Store
###### In addition to transforming the data and saving in S3 bucket, the processing job populates the feature store with the transformed and balanced data. Let's query this data using Amazon Athena.

In [40]:
#this box can be found in the script also

from sagemaker.feature_store.feature_definition import (FeatureDefinition,FeatureTypeEnum,)
feature_definitions= [
    # unique ID of the review
    FeatureDefinition(feature_name='review_id', feature_type=FeatureTypeEnum.STRING), 
    # ingestion timestamp
    FeatureDefinition(feature_name='date', feature_type=FeatureTypeEnum.STRING),
    # sentiment: -1 (negative), 0 (neutral) or 1 (positive). It will be found the Rating values (1, 2, 3, 4, 5)
    FeatureDefinition(feature_name='sentiment', feature_type=FeatureTypeEnum.STRING), 
    # label ID of the target class (sentiment)
    FeatureDefinition(feature_name='label_id', feature_type=FeatureTypeEnum.STRING),
    # reviews encoded with the BERT tokenizer
    FeatureDefinition(feature_name='input_ids', feature_type=FeatureTypeEnum.STRING),
    # original Review Text
    FeatureDefinition(feature_name='review_body', feature_type=FeatureTypeEnum.STRING),
    # train/validation/test label
    FeatureDefinition(feature_name='split_type', feature_type=FeatureTypeEnum.STRING)]

from sagemaker.feature_store.feature_group import FeatureGroup
feature_group = FeatureGroup(name=feature_group_name, feature_definitions=feature_definitions,sagemaker_session=sess)

In [None]:
#Query the feature store.

feature_store_query = feature_group.athena_query() # create an Athena query for the defined above Feature Group

feature_store_table = feature_store_query.table_name #pull the table name of the Amazon Glue Data Catalog table which is auto-generated by Feature Store

query_string = """SELECT date, review_id, sentiment, label_id,input_ids,review_body FROM "{}" WHERE split_type='train' LIMIT 5""".format(feature_store_table)

#Configure the S3 location for the query results. This allows us to re-use the query results for future queries if the data has not changed.
output_s3_uri = 's3://{}/query_results/{}/'.format(bucket, feature_store_offline_prefix)

feature_store_query.run(query_string=query_string, output_location=output_s3_uri)
feature_store_query.wait()

import pandas as pd
pd.set_option("max_colwidth", 100)
df_feature_store = feature_store_query.as_dataframe()

#Export CSV from Feature Store
df_feature_store.to_csv('./feature_store_export.tsv',sep='\t',index=False,header=True)
!aws s3 cp ./feature_store_export.tsv s3://$bucket/feature_store/feature_store_export.tsv
!aws s3 ls --recursive s3://$bucket/feature_store/feature_store_export.tsv

In [47]:
#Visualize the result of the query in the bar plot
import seaborn as sns
sns.barplot(data=df_feature_store, x="review_id",y="sentiment",color="blue")

<AxesSubplot:xlabel='review_id', ylabel='sentiment'>