In [2]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

## Input - S3 Source: raw-out

In [3]:
data_sources.append(ProcessingInput(
    source="s3://ml-dataset-raw-s3/raw-out/", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/raw-out",
    input_name="raw-out",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Output: S3 settings

In [4]:
import time
import uuid
import sagemaker

# Sagemaker session
sess = sagemaker.Session()

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")

# unique flow export ID
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"

Data Wrangler export storage bucket: sagemaker-us-east-1-456441140195


In [5]:
# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "bc3fa374-27dc-4e5b-a1a1-58ea5addeb1b.default"

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

Flow S3 export result path: s3://sagemaker-us-east-1-456441140195/export-flow-13-21-38-01-5829c9ff/output


## Upload Flow to S3

In [6]:
import os
import json
import boto3

# name of the flow file which should exist in the current notebook working directory
flow_file_name = "Ml-Airport-01-XGBoost-v2.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_export_name}.flow", ExtraArgs={"ServerSideEncryption": "aws:kms"})

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_export_name}.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

Loading flow file from current notebook working directory: /root
Data Wrangler flow ml-airport-delay.flow uploaded to s3://sagemaker-us-east-1-456441140195/data_wrangler_flows/flow-13-21-38-01-5829c9ff.flow


In [7]:
## Input - Flow: ml-airport-delay.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

# Run Processing Job 
## Job Configurations

In [8]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.13.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# List of tags to be passed to the processing job
user_tags = []

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

# KMS key for per object encryption; default is None
kms_key = None

## Create Processing Job

In [9]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess,
    output_kms_key=kms_key,
    tags=user_tags
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)


Job Name:  data-wrangler-flow-processing-13-21-38-01-5829c9ff
Inputs:  [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-456441140195/data_wrangler_flows/flow-13-21-38-01-5829c9ff.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'raw-out', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://ml-dataset-raw-s3/raw-out/', 'LocalPath': '/opt/ml/processing/raw-out', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'bc3fa374-27dc-4e5b-a1a1-58ea5addeb1b.default', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-456441140195/export-flow-13-21-38-01-5829c9ff/output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


## Job Status & S3 Output Location

In [10]:
s3_job_results_path = f"s3://{bucket}/{s3_output_prefix}/{processing_job_name}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

job_result = sess.wait_for_processing_job(processing_job_name)
job_result

Job results are saved to S3 path: s3://sagemaker-us-east-1-456441140195/export-flow-13-21-38-01-5829c9ff/output/data-wrangler-flow-processing-13-21-38-01-5829c9ff
...................................................................!

{'ProcessingInputs': [{'InputName': 'flow',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-456441140195/data_wrangler_flows/flow-13-21-38-01-5829c9ff.flow',
    'LocalPath': '/opt/ml/processing/flow',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'raw-out',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://ml-dataset-raw-s3/raw-out/',
    'LocalPath': '/opt/ml/processing/raw-out',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'bc3fa374-27dc-4e5b-a1a1-58ea5addeb1b.default',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-456441140195/export-flow-13-21-38-01-5829c9ff/output',
     'LocalPath': '/opt/ml/processing/output',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False}]},
 'ProcessingJ

In [11]:
run_optional_steps = True

### Load Processed Data into Pandas

In [12]:
!pip install -q awswrangler pandas
import awswrangler as wr

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m


In [13]:
chunksize = 1000

if output_content_type.upper() == "CSV":
    dfs = wr.s3.read_csv(s3_output_path, chunksize=chunksize)
elif output_content_type.upper() == "PARQUET":
    dfs = wr.s3.read_parquet(s3_output_path, chunked=chunksize)
else:
    print(f"Unexpected output content type {output_content_type}") 

df = next(dfs)
df

Unnamed: 0,ORIGIN,FL_DATE,DEP_DELAY
0,ABE,2018-01-01,7.833333
1,ABE,2018-01-02,77.375000
2,ABE,2018-01-03,51.333333
3,ABE,2018-01-04,30.125000
4,ABE,2018-01-05,17.375000
...,...,...,...
995,ABQ,2018-09-23,7.714286
996,ABQ,2018-09-24,5.472222
997,ABQ,2018-09-25,3.647059
998,ABQ,2018-09-26,19.457143


## Train a model

### Set Training Data path

In [14]:
s3_training_input_path = s3_job_results_path
print(f"training input data path: {s3_training_input_path}")

training input data path: s3://sagemaker-us-east-1-456441140195/export-flow-13-21-38-01-5829c9ff/output/data-wrangler-flow-processing-13-21-38-01-5829c9ff


### Configure the algorithm and training job

In [15]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth":"5",
    "objective": "reg:squarederror",
    "num_round": "10",
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET"
    else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=s3_training_input_path,
    content_type=train_content_type,
)

### Start the Training Job

In [17]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.xlarge",
)
estimator.fit({"train": train_input})

2022-02-13 21:49:24 Starting - Starting the training job...
2022-02-13 21:49:47 Starting - Launching requested ML instancesProfilerReport-1644788964: InProgress
......
2022-02-13 21:50:55 Starting - Preparing the instances for training......
2022-02-13 21:51:54 Downloading - Downloading input data...
2022-02-13 21:52:08 Training - Downloading the training image...
2022-02-13 21:52:54 Uploading - Uploading generated training model.[34m[2022-02-13 21:52:51.324 ip-10-0-227-25.ec2.internal:1 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:squarederror to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter 

In [None]:
print(f"Training job name: {estimator.latest_training_job.job_name}")

### Creating the EndPoint ###

In [None]:
endpoint_name = f"airport-data-delay-{estimator.latest_training_job.job_name}"

estimator_inference = estimator.deploy(initial_instance_count=1, instance_type="ml.m4.xlarge", endpoint_name = endpoint_name)

In [None]:
print(f"Endpoint name: {estimator_inference.endpoint}")

Mas detalles<p>
https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html.