In [3]:
%store -r
%store

Stored variables and their in-db values:
dw_output_path_prm             -> 's3://ml-predictivemaintainaince-28-01-2022/export


In [4]:
import pkg_resources
import subprocess
import sys
orignal_version = pkg_resources.get_distribution("sagemaker").version
_ = subprocess.check_call([sys.executable,"-m","pip","install","sagemaker==2.20.0"])

In [5]:
import json
import os
import time
import uuid
import boto3
import sagemaker

Parameters
Configurable parameters used throughout the notebook

In [6]:
#s3 bucker for saving procesing job outputs
sess = sagemaker.Session()
bucket = "ml-predictivemaintainaince-28-01-2022"
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S',time.gmtime())}-{str(uuid.uuid4())[:8]}"


In [7]:
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"
flow_file_name = "dw_flow/prm.flow"
iam_role = sagemaker.get_execution_role()



In [8]:
print(bucket)
print(prefix)
print(flow_id)
print(flow_name)
print(flow_uri)
print(flow_file_name)
print(iam_role)

ml-predictivemaintainaince-28-01-2022
data_wrangler_flows
29-13-00-36-29009274
flow-29-13-00-36-29009274
s3://ml-predictivemaintainaince-28-01-2022/data_wrangler_flows/flow-29-13-00-36-29009274.flow
dw_flow/prm.flow
arn:aws:iam::832173187970:role/service-role/AmazonSageMaker-ExecutionRole-20211213T210605


In [11]:

# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.

container_uri = (
    "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.2.1"
)
instance_count = 1
instance_type = "ml.m5.4xlarge"

#Processing job path uri info

output_prefix = f"export-{flow_name}/output"
output_path = f"s3://{bucket}/{output_prefix}"
output_name = "ff586e7b-a02d-472b-91d4-da3dd05d7a30.default"

processing_job_name = f"data-wrangler-flow-processing-{flow_id}"
processing_dir = "/opt/ml/processing"

output_content_type = "CSV"

sagemaker_endpoint_uri = None


In [12]:
from demo_helper import update_dw_s3uri, get_dw_container_for_region


# update the flow file to change the s3 location to our bucket
#get the data wrangler container associated with our region
region = boto3.Session().region_name

dw_output_path_prm = output_path
print(f"Storing dw_output_path_prm = {dw_output_path_prm} for use in next notebook 2_fleet_predmaint.ipynb")
%store dw_output_path_prm

Storing dw_output_path_prm = s3://ml-predictivemaintainaince-28-01-2022/export-flow-29-13-00-36-29009274/output for use in next notebook 2_fleet_predmaint.ipynb
Stored 'dw_output_path_prm' (str)


In [23]:
container_uri = get_dw_container_for_region(region)

KeyError: 'ap-south-1'

In [13]:
#Load flowfile

with open(flow_file_name) as f:
    flow = json.load(f)
    


In [14]:
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name,bucket,f"{prefix}/{flow_name}.flow")
print(f"Data Wrangler Flow notebook uploaded to {flow_uri}")

Data Wrangler Flow notebook uploaded to s3://ml-predictivemaintainaince-28-01-2022/data_wrangler_flows/flow-29-13-00-36-29009274.flow


Create Processing Job Arguments

In [15]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import (
    AthenaDatasetDefinition,
    DatasetDefinition,
    RedshiftDatasetDefinition,
)


def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
    return ProcessingInput(
        source=flow_s3_uri,
        destination=f"{base_dir}/flow",
        input_name="flow",
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )


def create_s3_processing_input(s3_dataset_definition, name, base_dir):
    return ProcessingInput(
        source=s3_dataset_definition["s3ExecutionContext"]["s3Uri"],
        destination=f"{base_dir}/{name}",
        input_name=name,
        s3_data_type="S3Prefix",
        s3_input_mode="File",
        s3_data_distribution_type="FullyReplicated",
    )


def create_athena_processing_input(athena_dataset_defintion, name, base_dir):
    return ProcessingInput(
        input_name=name,
        dataset_definition=DatasetDefinition(
            local_path=f"{base_dir}/{name}",
            athena_dataset_definition=AthenaDatasetDefinition(
                catalog=athena_dataset_defintion["catalogName"],
                database=athena_dataset_defintion["databaseName"],
                query_string=athena_dataset_defintion["queryString"],
                output_s3_uri=athena_dataset_defintion["s3OutputLocation"] + f"{name}/",
                output_format=athena_dataset_defintion["outputFormat"].upper(),
            ),
        ),
    )


def create_redshift_processing_input(redshift_dataset_defintion, name, base_dir):
    return ProcessingInput(
        input_name=name,
        dataset_definition=DatasetDefinition(
            local_path=f"{base_dir}/{name}",
            redshift_dataset_definition=RedshiftDatasetDefinition(
                cluster_id=redshift_dataset_defintion["clusterIdentifier"],
                database=redshift_dataset_defintion["database"],
                db_user=redshift_dataset_defintion["dbUser"],
                query_string=redshift_dataset_defintion["queryString"],
                cluster_role_arn=redshift_dataset_defintion["unloadIamRole"],
                output_s3_uri=redshift_dataset_defintion["s3OutputLocation"] + f"{name}/",
                output_format=redshift_dataset_defintion["outputFormat"].upper(),
            ),
        ),
    )


def create_processing_inputs(processing_dir, flow, flow_uri):
    """Helper function for creating processing inputs
    :param flow: loaded data wrangler flow notebook
    :param flow_uri: S3 URI of the data wrangler flow notebook
    """
    processing_inputs = []
    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
    processing_inputs.append(flow_processing_input)

    for node in flow["nodes"]:
        if "dataset_definition" in node["parameters"]:
            data_def = node["parameters"]["dataset_definition"]
            name = data_def["name"]
            source_type = data_def["datasetSourceType"]

            if source_type == "S3":
                processing_inputs.append(create_s3_processing_input(data_def, name, processing_dir))
            elif source_type == "Athena":
                processing_inputs.append(
                    create_athena_processing_input(data_def, name, processing_dir)
                )
            elif source_type == "Redshift":
                processing_inputs.append(
                    create_redshift_processing_input(data_def, name, processing_dir)
                )
            else:
                raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")

    return processing_inputs


def create_processing_output(output_name, output_path, processing_dir):
    return ProcessingOutput(
        output_name=output_name,
        source=os.path.join(processing_dir, "output"),
        destination=output_path,
        s3_upload_mode="EndOfJob",
    )


def create_container_arguments(output_name, output_content_type):
    output_config = {output_name: {"content_type": output_content_type}}
    return [f"--output-config '{json.dumps(output_config)}'"]

In [18]:
%%time
from sagemaker.processing import Processor

processor = Processor(
    role = iam_role,
    image_uri = container_uri,
    instance_count = instance_count,
    instance_type = instance_type,
    sagemaker_session = sess
)


CPU times: user 26 µs, sys: 0 ns, total: 26 µs
Wall time: 29.1 µs


In [22]:

processor.run(
    inputs=create_processing_inputs(processing_dir,flow,flow_uri),
    outputs=[create_processing_output(output_name,output_path,processing_dir)],
    arguments=create_container_arguments(output_name,output_content_type),
    wait = True,
    logs = False,
    job_name = "Predictive-Maintainaince-29-01-2022-18-50",
)


Job Name:  Predictive-Maintainaince-29-01-2022-18-50
Inputs:  [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://ml-predictivemaintainaince-28-01-2022/data_wrangler_flows/flow-29-13-00-36-29009274.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'example_fleet_sensor_logs.csv', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://ml-predictivemaintainaince-28-01-2022/data_wrangler_flows/data/example_fleet_sensor_logs.csv', 'LocalPath': '/opt/ml/processing/example_fleet_sensor_logs.csv', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'example_fleet_info.csv', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://ml-predictivemaintainaince-28-01-2022/data_wrangler_flows/data/example_fleet_info.csv', 'LocalPath': '/opt/ml/processing/example_fleet_info.cs

ClientError: An error occurred (ValidationException) when calling the CreateProcessingJob operation: Invalid region us-east-2 in image URI 415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.2.1. Please provide an image URI in region ap-south-1.