## Exploring data and storing features
### [Amazon SageMaker Data Wranger](https://aws.amazon.com/sagemaker/data-wrangler/) & [Amazon SageMaker Feature Store](https://aws.amazon.com/sagemaker/feature-store/)

After importing the needed datasets, the next step in the Machine Learning workflow is to explore and preprocess data.

In this notebook we will see how to run an Amazon SageMaker Data Wrangler job (implemented via Amazon SageMaker Processing) that will execute the transformations defined in the Data Wrangler flow and export the transformed data to Amazon SageMaker Feature Store.

To do so we:
1. create a feature group in Amazon Feature Store, to store the features describing the records;
2. enable both offline and online feature store
3. explore the data through Amazon SageMaker Data Wrangler
4. define the data transformations and download the resulting `.flow` file
5. run a Processing job to transform the data through the transformations defined in the `.flow` file
6. output the transformed data to the feature group defined at 1.
7. read the features from Amazon SageMaker Feature Store offline store through Amazon Athena
8. output the features as CSV on S3 to be later used at training time

In [101]:
# Check SageMaker Python SDK version
import sagemaker
print(sagemaker.__version__)

def versiontuple(v):
    return tuple(map(int, (v.split("."))))

if versiontuple(sagemaker.__version__) < versiontuple('2.22.0'):
    raise Exception("This notebook requires at least SageMaker Python SDK version 2.22.0. Please install it via pip.")

2.57.0


In [102]:
import boto3
import time

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
prefix = 'endtoendmlsm'

print(region)
print(role)
print(bucket_name)

us-east-1
arn:aws:iam::996912938507:role/service-role/AmazonSageMaker-ExecutionRole-endtoendml
sagemaker-us-east-1-996912938507


### Amazon SageMaker Feature Store

First, let's create the target feature group in Amazon SageMaker Feature Store. A feature group is a logical grouping of features, defined in the feature store, to describe records. A feature group’s definition is composed of a list of feature definitions, a record identifier name, and configurations for its online and offline store.

In [74]:
import time
feature_group_name = 'endtoendml-feature-group-{0}'.format(str(int(time.time())))
print(feature_group_name)

%store feature_group_name

endtoendml-feature-group-1632177744
Stored 'feature_group_name' (str)


We now define the schema for the feature group, by using an empty Pandas data frame. You can also infer it by reading some data sample.

In [105]:
# JACOPO: perché qui non leggiamo lo schema tramite Glue?
import pandas as pd

df_columns = ["breakdown", "wind_speed", "rpm_blade", "oil_temperature", "oil_level", "temperature", "humidity", 
              "vibrations_frequency", "pressure", "turbine_id_TID004", "turbine_id_TID001", "turbine_id_TID006", "turbine_id_TID008", 
              "turbine_id_TID002", "turbine_id_TID003", "turbine_id_TID005", "turbine_id_TID009", "turbine_id_TID010", "turbine_id_TID007",
              "turbine_type_HAWT","turbine_type_VAWT", "wind_direction_S", "wind_direction_N", "wind_direction_W", "wind_direction_SW", 
              "wind_direction_E", "wind_direction_SE", "wind_direction_NE", "wind_direction_NW", "record_id", "event_timestamp"]
df_schema = pd.DataFrame(columns=df_columns, dtype=float)
# explicitly specifying data types for variables that are not floats
df_schema = df_schema.astype({'wind_speed': 'long', 'rpm_blade': 'long', 'oil_level': 'long', 'temperature': 'long', 'humidity': 'long',
                             'vibrations_frequency': 'long', 'pressure': 'long', 'record_id': 'long', 'event_timestamp': 'string'})

We create the feature group specifying its name and the SageMaker session, and then we specify the schema for the features.

In [76]:
from sagemaker.feature_store import feature_group

feature_group = feature_group.FeatureGroup(name=feature_group_name,
                                           sagemaker_session = sagemaker_session)

feature_group.load_feature_definitions(df_schema)


[FeatureDefinition(feature_name='breakdown', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='wind_speed', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='rpm_blade', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='oil_temperature', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='oil_level', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='temperature', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='humidity', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='vibrations_frequency', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='pressure', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='turbine_id_TID004', feature_type=<FeatureTypeEnum.

We are ready to create the feature store; we will enable both online and offline store for this example.

_Online store_: used for low latency real-time inference use cases (low millisecond latency reads and high throughput writes).

_Offline store_: used for training and batch inference.

In [77]:
# we specify an s3 location for the offline feature store.
offline_store_uri = 's3://{0}/{1}/feature_store'.format(bucket_name, prefix)

feature_group.create(s3_uri=offline_store_uri,
                     record_identifier_name='record_id',
                     event_time_feature_name='event_timestamp',
                     role_arn=role,
                     enable_online_store=True,
                     description='Wind turbine features.')

{'FeatureGroupArn': 'arn:aws:sagemaker:us-east-1:996912938507:feature-group/endtoendml-feature-group-1632177744',
 'ResponseMetadata': {'RequestId': '406aa8ba-1095-4f8d-bfef-11baa66ff495',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '406aa8ba-1095-4f8d-bfef-11baa66ff495',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '112',
   'date': 'Mon, 20 Sep 2021 22:42:25 GMT'},
  'RetryAttempts': 0}}

Let's wait a few seconds for the feature group to be created.

In [7]:
import time

while True:
    status = feature_group.describe()['FeatureGroupStatus']
    print(status)
    if status == 'Created':
        break;
    time.sleep(5)


Created


### Amazon SageMaker Data Wrangler

Amazon SageMaker is a fully managed service that provides every developer and data scientist with the ability to build, train, and deploy machine learning (ML) models quickly. SageMaker removes the heavy lifting from each step of the machine learning process to make it easier to develop high quality models.

First, follow these steps:
1. In the left menu, go to SageMaker resources (orange triangle shape)
2. Select 'Data Wrangler'
3. Create a New Flow and click on Amazon S3

![image](images/wrangler_1.png)

4. Import the data after briefly inspecting it

![image](images/wrangler_2.png)

5. Click on the + symbol on the right and add an Analysis to explore the data through Data Wrangler's features

![image](images/wrangler_3.png)

6. For instance, you may choose the Histogram visualization and plot the `wind_speed` distribution

![image](images/wrangler_4.png)

7. Go back to the data flow and add a Transform. There are many pre built transforms to choose from, plus you can bring your own transform or formula.

![image](images/wrangler_5.png)

8. As an example, choose `Handle Missing` -> `Fill missing` -> `turbine_type` -> `HAWT`, and preview the transformation by clicking on __Preview__. The missing values in the column `turbine_type` were filled in with the string `HAWT`. If you are satisfied of the results, you can add the transform step to the transformation pipeline by clicking on __Add__.

![image](images/wrangler_6.png)

9. After you have added all the needed steps, you are all set. You may explore the `.flow` generated file in your local SageMaker repository.

![image](images/wrangler_7.png)

We are now ready to use an Amazon SageMaker Data Wrangler job, implemented as a SageMaker Processing job, to interpret the data flow defined with Amazon SageMaker Data Wrangler and load the transformed data to the feature group previously created.

First thing to do is uploading the data flow to Amazon S3, since it will be used as input to the processing job.

In [35]:
import json

flow_file_name = 'data_exploration.flow'
with open(flow_file_name) as f:
    flow = json.load(f)
    
data_flow_uri = 's3://{0}/{1}/data_flow/{2}'.format(bucket_name, prefix, flow_file_name)
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket_name, '{0}/data_flow/{1}'.format(prefix, flow_file_name))

print(data_flow_uri)

s3://sagemaker-us-east-1-996912938507/endtoendmlsm/data_flow/data_exploration.flow


In [40]:
from sagemaker.processing import Processor
from sagemaker import image_uris
# https://docs.aws.amazon.com/sagemaker/latest/dg/ecr-us-east-1.html#data-wrangler-us-east-1.title

data_wrangler_image_uri = image_uris.retrieve(framework='data-wrangler',region=region)

processor = Processor(image_uri=data_wrangler_image_uri,
                      role=role,
                      instance_count=1,
                      instance_type='ml.m5.4xlarge',
                      base_job_name='endtoendml-load-featurestore',
                      sagemaker_session=sagemaker_session)

663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x


We need to define the inputs for the Data Wrangler job. It expects the flow definition and all dataset definitions used to laod data in the flow. In this scenario, we only accessed a dataset from S3, so we are going to parse only S3 inputs.

In [41]:
from sagemaker.processing import ProcessingInput

# Load the flow processing input.
processing_inputs = []
flow_input = ProcessingInput(input_name='flow', source=data_flow_uri, destination='/opt/ml/processing/flow')
processing_inputs.append(flow_input)

# Load S3 processing inputs.
for node in flow["nodes"]:
    if "dataset_definition" in node["parameters"]:
        dataset_def = node["parameters"]["dataset_definition"]
        name = dataset_def['name']
        source_type = dataset_def["datasetSourceType"]
        
        if source_type == "S3":
            s3_processing_input = ProcessingInput(input_name=name, 
                                                  source=dataset_def["s3ExecutionContext"]["s3Uri"], 
                                                  destination='/opt/ml/processing/{0}'.format(name))
            processing_inputs.append(s3_processing_input)

            
processing_inputs

[<sagemaker.processing.ProcessingInput at 0x7f2041289e10>,
 <sagemaker.processing.ProcessingInput at 0x7f2041289ad0>]

Then, we define the processing outputs. We need to add a feature store output, where the name corresponds to the output name of the node in the data flow we want transformed data to be exported from.

In [42]:
from sagemaker.processing import ProcessingOutput, FeatureStoreOutput

processing_outputs = []
processing_output = ProcessingOutput(output_name='e8277ec0-4c16-4469-ad66-3229508a2f20.default',
                                     feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name),
                                     app_managed=True)
processing_outputs.append(processing_output)

processing_outputs


[<sagemaker.processing.ProcessingOutput at 0x7f203e727d50>]

We are now ready to run the processing job (~20 mins to complete).

Note that we stop getting logs since logging is quite verbose, but you can still review all logs from Amazon CloudWatch logs. To do this you may go to the [Amazon SageMaker console](console.aws.amazon.com/sagemaker/) -> Processing -> Processing jobs -> select the latest job in progress -> Monitoring -> View logs -> click on the log 'endtoendml-load-featurestore-...'region

In [None]:
processor.run(inputs=processing_inputs,
              outputs=processing_outputs,
              logs=False)

### Extract features for training¶

In this section we are going to extract features for training, by reading them from the Amazon SageMaker Feature Store offline store. We will run a SageMaker Processing job that will run an Amazon Athena query to read data from the feature store; then, we are going to transform this data to CSV for training.

In [44]:
image_uri = sagemaker.image_uris.retrieve(
    framework='sklearn',
    region=region,
    version='0.20.0',
    py_version='py3',
    instance_type='ml.m5.xlarge',
    image_scope='training'
)
print(image_uri)

683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3


In [45]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(role=role,
                                     base_job_name='end-to-end-ml-sm-proc-fs',
                                     instance_type='ml.m5.large',
                                     instance_count=1,
                                     framework_version='0.20.0')

In [90]:
feature_group.athena_query()

AthenaQuery(catalog='AwsDataCatalog', database='sagemaker_featurestore', table_name='endtoendml-feature-group-1632177744-1632177746', sagemaker_session=<sagemaker.session.Session object at 0x7f2040fb5550>, _current_query_execution_id=None, _result_bucket=None, _result_file_prefix=None)

In [91]:
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition

train_data_path = 's3://{0}/{1}/data/preprocessed/train/'.format(bucket_name, prefix)
val_data_path = 's3://{0}/{1}/data/preprocessed/val/'.format(bucket_name, prefix)

query_string = f'SELECT "breakdown","wind_speed","rpm_blade","oil_temperature","oil_level","temperature",\
                "humidity","vibrations_frequency","pressure","turbine_id_tid004","turbine_id_tid001","turbine_id_tid006",\
                "turbine_id_tid008","turbine_id_tid002","turbine_id_tid003","turbine_id_tid005","turbine_id_tid009",\
                "turbine_id_tid010","turbine_id_tid007","turbine_type_hawt","turbine_type_vawt","wind_direction_s",\
                "wind_direction_n","wind_direction_w","wind_direction_sw","wind_direction_e","wind_direction_se",\
                "wind_direction_ne","wind_direction_nw" \
                FROM "{feature_group.athena_query().database}"."{feature_group.athena_query().table_name}";'

featurestore_input = ProcessingInput(
    input_name="features_input",
    app_managed=False,
    dataset_definition=DatasetDefinition(
        local_path="/opt/ml/processing/features",
        data_distribution_type="FullyReplicated",
        input_mode="File",
        athena_dataset_definition=AthenaDatasetDefinition(
            catalog=feature_group.athena_query().catalog,
            database=feature_group.athena_query().database,
            query_string=query_string,
            output_s3_uri='s3://{0}/{1}/tempathena'.format(bucket_name, prefix),
            output_format="TEXTFILE"),
        )
    )

sklearn_processor.run(code='preprocessor.py',
                      inputs=[featurestore_input],
                      outputs=[ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train', destination=train_data_path),
                               ProcessingOutput(output_name='val_data', source='/opt/ml/processing/val', destination=val_data_path)],
                      arguments=['--train-test-split-ratio', '0.2'])


Job Name:  end-to-end-ml-sm-proc-fs-2021-09-20-22-53-54-421
Inputs:  [{'InputName': 'features_input', 'AppManaged': False, 'DatasetDefinition': {'LocalPath': '/opt/ml/processing/features', 'DataDistributionType': 'FullyReplicated', 'InputMode': 'File', 'AthenaDatasetDefinition': {'Catalog': 'AwsDataCatalog', 'Database': 'sagemaker_featurestore', 'QueryString': 'SELECT "breakdown","wind_speed","rpm_blade","oil_temperature","oil_level","temperature",                "humidity","vibrations_frequency","pressure","turbine_id_tid004","turbine_id_tid001","turbine_id_tid006",                "turbine_id_tid008","turbine_id_tid002","turbine_id_tid003","turbine_id_tid005","turbine_id_tid009",                "turbine_id_tid010","turbine_id_tid007","turbine_type_hawt","turbine_type_vawt","wind_direction_s",                "wind_direction_n","wind_direction_w","wind_direction_sw","wind_direction_e","wind_direction_se",                "wind_direction_ne","wind_direction_nw"                 FROM "sage

UnexpectedStatusException: Error for Processing job end-to-end-ml-sm-proc-fs-2021-09-20-22-53-54-421: Failed. Reason: ClientError: Failed to download data. S3 key: s3://sagemaker-us-east-1-996912938507/endtoendmlsm/tempathena/end-to-end-ml-sm-proc-fs-2021-09-20-22-53-54-421/data matched no files on s3

In [92]:
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition

train_data_path = 's3://{0}/{1}/data/preprocessed/train/'.format(bucket_name, prefix)
val_data_path = 's3://{0}/{1}/data/preprocessed/val/'.format(bucket_name, prefix)

query_string = f'SELECT "breakdown","wind_speed","rpm_blade","oil_temperature","oil_level","temperature",\
                "humidity","vibrations_frequency","pressure","turbine_id_tid004","turbine_id_tid001","turbine_id_tid006",\
                "turbine_id_tid008","turbine_id_tid002","turbine_id_tid003","turbine_id_tid005","turbine_id_tid009",\
                "turbine_id_tid010","turbine_id_tid007","turbine_type_hawt","turbine_type_vawt","wind_direction_s",\
                "wind_direction_n","wind_direction_w","wind_direction_sw","wind_direction_e","wind_direction_se",\
                "wind_direction_ne","wind_direction_nw" \
                FROM "{feature_group.athena_query().database}"."endtoendml-feature-group-1632172794-1632172946";'

featurestore_input = ProcessingInput(
    input_name="features_input",
    app_managed=False,
    dataset_definition=DatasetDefinition(
        local_path="/opt/ml/processing/features",
        data_distribution_type="FullyReplicated",
        input_mode="File",
        athena_dataset_definition=AthenaDatasetDefinition(
            catalog=feature_group.athena_query().catalog,
            database=feature_group.athena_query().database,
            query_string=query_string,
            output_s3_uri='s3://{0}/{1}/tempathena'.format(bucket_name, prefix),
            output_format="TEXTFILE"),
        )
    )

sklearn_processor.run(code='preprocessor.py',
                      inputs=[featurestore_input],
                      outputs=[ProcessingOutput(output_name='train_data', source='/opt/ml/processing/train', destination=train_data_path),
                               ProcessingOutput(output_name='val_data', source='/opt/ml/processing/val', destination=val_data_path)],
                      arguments=['--train-test-split-ratio', '0.2'])


Job Name:  end-to-end-ml-sm-proc-fs-2021-09-20-23-01-23-897
Inputs:  [{'InputName': 'features_input', 'AppManaged': False, 'DatasetDefinition': {'LocalPath': '/opt/ml/processing/features', 'DataDistributionType': 'FullyReplicated', 'InputMode': 'File', 'AthenaDatasetDefinition': {'Catalog': 'AwsDataCatalog', 'Database': 'sagemaker_featurestore', 'QueryString': 'SELECT "breakdown","wind_speed","rpm_blade","oil_temperature","oil_level","temperature",                "humidity","vibrations_frequency","pressure","turbine_id_tid004","turbine_id_tid001","turbine_id_tid006",                "turbine_id_tid008","turbine_id_tid002","turbine_id_tid003","turbine_id_tid005","turbine_id_tid009",                "turbine_id_tid010","turbine_id_tid007","turbine_type_hawt","turbine_type_vawt","wind_direction_s",                "wind_direction_n","wind_direction_w","wind_direction_sw","wind_direction_e","wind_direction_se",                "wind_direction_ne","wind_direction_nw"                 FROM "sage

In [98]:
train_features_df = pd.read_csv(train_data_path + 'train_features.csv')
train_labels_df = pd.read_csv(train_data_path + 'train_labels.csv')

In [99]:
train_features_df

Unnamed: 0,55,15,46.0,29,81,29.1,1,50,0.0,0.0.1,...,0.0.9,1.0.1,0.0.10,0.0.11,1.0.2,0.0.12,0.0.13,0.0.14,0.0.15,0.0.16
0,66,29,46.0,28,67,63,10,20,0.0,0.0,...,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
1,77,46,30.0,6,83,77,14,29,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
2,66,24,44.0,14,27,59,15,41,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
3,50,52,33.0,6,77,76,11,72,0.0,0.0,...,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,44,70,40.0,6,29,83,9,27,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1599994,45,79,46.0,15,84,23,12,34,0.0,0.0,...,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
1599995,24,63,43.0,25,22,35,10,80,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
1599996,45,27,50.0,19,72,53,4,78,0.0,0.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
1599997,17,39,25.0,6,71,55,7,74,0.0,1.0,...,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0


In [100]:
train_labels_df

Unnamed: 0,0.0
0,0.0
1,0.0
2,0.0
3,0.0
4,1.0
...,...
1599994,1.0
1599995,0.0
1599996,0.0
1599997,0.0
