Setup and Import Dependencies


Define Parameters

In [15]:
import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import ProcessingStep

In [16]:
# Initialize SageMaker session
sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# Define S3 bucket and data paths
bucket = 'electronics-dataset'
prefix = 'data'
s3_train_data = f"s3://{bucket}/{prefix}/train/train_data.csv"
s3_val_data = f"s3://{bucket}/{prefix}/validation/validation_data.csv"
s3_test_data = f"s3://{bucket}/{prefix}/test/test_data_with_outcome.csv"

# Define pipeline parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
train_data = ParameterString(name="TrainData", default_value=s3_train_data)
val_data = ParameterString(name="ValData", default_value=s3_val_data)
test_data = ParameterString(name="TestData", default_value=s3_test_data)

Preprocessing Step

In [17]:
%%writefile code/preprocessing.py
import os
import pandas as pd

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    train = pd.read_csv(f"{base_dir}/train1/train_data.csv", header=None)
    test = pd.read_csv(f"{base_dir}/test1/test_data_with_outcome.csv", header=None)
    validation = pd.read_csv(f"{base_dir}/val1/validation_data.csv", header=None)

    prod = test.iloc[round(len(test)*0.8):]
    test = test.iloc[:round(len(test)*0.8)]

    pd.DataFrame(train).to_csv(f"{base_dir}/train2/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation2/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test2/test.csv", header=False, index=False)
    pd.DataFrame(prod).to_csv(f"{base_dir}/prod2/prod.csv", header=False, index=False)

Overwriting code/preprocessing.py


In [18]:
framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count.default_value,
    base_job_name="sklearn-process",
    role=role,
    sagemaker_session=pipeline_session,
)

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=train_data.default_value, destination="/opt/ml/processing/train1"),
        ProcessingInput(source=val_data.default_value, destination="/opt/ml/processing/val1"),
        ProcessingInput(source=test_data.default_value, destination="/opt/ml/processing/test1"),
    ],
    outputs=[
        ProcessingOutput(output_name="proc_train_out", source="/opt/ml/processing/train2"),
        ProcessingOutput(output_name="proc_validation_out", source="/opt/ml/processing/validation2"),
        ProcessingOutput(output_name="proc_test_out", source="/opt/ml/processing/test2"),
        ProcessingOutput(output_name="proc_prod_out", source="/opt/ml/processing/prod2"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="Process", step_args=processor_args)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Define and Run the Pipeline


In [19]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "ElectronicsPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        train_data,
        val_data,
        test_data,
    ],
    steps=[step_process],
)

# Submit the pipeline to SageMaker, start execution
pipeline.upsert(role_arn=role)
execution = pipeline.start()

# Wait for the pipeline execution to complete
execution.wait()

# List steps in the pipeline execution
steps = execution.list_steps()
for step in steps:
    print(f"Step: {step['StepName']}, Status: {step['StepStatus']}, Start Time: {step['StartTime']}, End Time: {step['EndTime']}")



WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [25]:
steps = execution.list_steps()
for step in steps:
    print(f"Step: {step['StepName']}, Status: {step['StepStatus']}")
    if step['StepStatus'] == 'Failed':
        print(f"Failure Reason: {step.get('FailureReason', 'Not available')}")

Step: Process, Status: Failed
Failure Reason: ClientError: Failed to invoke sagemaker:CreateProcessingJob. Error Details: No S3 objects found under S3 URL "s3://electronics-dataset/data/train/train_data.csv" given in input data source. Please ensure that the bucket exists in the selected region (us-east-1), that objects exist under that S3 prefix, and that the role "arn:aws:iam::676076160400:role/LabRole" has "s3:ListBucket" permissions on bucket "electronics-dataset".


In [28]:
bucket = 'electronics-dataset'
prefix = ''  # Remove the 'data' prefix

s3_train_data = f"s3://{bucket}/train_data.csv"
s3_val_data = f"s3://{bucket}/val_data.csv"
s3_test_data = f"s3://{bucket}/test_data.csv"

In [30]:
import boto3

s3 = boto3.client('s3')

# Upload files to S3
s3.upload_file('train_data.csv', 'electronics-dataset', 'train_data.csv')
s3.upload_file('val_data.csv', 'electronics-dataset', 'val_data.csv')
s3.upload_file('test_data.csv', 'electronics-dataset', 'test_data.csv')

In [31]:
execution = pipeline.start()
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"