In [9]:
from datetime import datetime
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)

# SageMakerセッションの作成
sagemaker_session = PipelineSession()

In [10]:
import boto3
from sagemaker import get_execution_role
region = sagemaker.Session().boto_region_name
role = get_execution_role()
from sagemaker.processing import ProcessingInput, ProcessingOutput


In [24]:
# Processing Job 1 の定義
processor1 = ScriptProcessor(
    image_uri=".XXXXXXXXXXXX.dkr.ecr.us-east-1.amazonaws.com/fmeval-container:latest",
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="evaluation-claude-2_0",
    sagemaker_session=sagemaker_session,
    role=sagemaker.get_execution_role(),
)

# パラメータの定義
input_data_param1 = ParameterString(
    name="input_data1",
    default_value="xsum_sample.jsonl"
)

model_id_param1 = ParameterString(
    name="model_id1",
    default_value="anthropic.claude-v2"
)


# Processingステップの作成
step_process1 = ProcessingStep(
    name="Claude2_0-evaluation",
    code="code/claude_text_summarization.py",
    processor=processor1,
    inputs=[ProcessingInput(source="data/xsum_sample.jsonl", 
                            destination="/opt/ml/processing/input/data/"),
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/evaluation")
    ],
    job_arguments=['--input_data', input_data_param1,
               '--model_id', model_id_param1,
              ]
)

In [25]:
# Processing Job 2 の定義
processor2 = ScriptProcessor(
    image_uri=".XXXXXXXXXXXX.dkr.ecr.us-east-1.amazonaws.com/fmeval-container:latest",
    command=["python3"],
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="evaluation-claude-2_1",
    sagemaker_session=sagemaker_session,
    role=sagemaker.get_execution_role(),
)

# パラメータの定義
input_data_param2 = ParameterString(
    name="input_data2",
    default_value="xsum_sample.jsonl"
)

model_id_param2 = ParameterString(
    name="model_id2",
    default_value="anthropic.claude-v2:1"
)


# Processingステップの作成
step_process2 = ProcessingStep(
    name="Claude2_1-evaluation",
    code="code/claude_text_summarization.py",
    processor=processor2,
    inputs=[ProcessingInput(source="data/xsum_sample.jsonl", 
                            destination="/opt/ml/processing/input/data/"),
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/evaluation")
    ],
    job_arguments=['--input_data', input_data_param2,
               '--model_id', model_id_param2,
              ]
)

In [26]:
# Processing Job 3 の定義
processor3 = ScriptProcessor(
    image_uri=".XXXXXXXXXXXX.dkr.ecr.us-east-1.amazonaws.com/fmeval-container:latest",
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="evaluation-gpt-3_5-turbo",
    sagemaker_session=sagemaker_session,
    role=sagemaker.get_execution_role(),
)

# パラメータの定義
input_data_param3 = ParameterString(
    name="input_data3",
    default_value="xsum_sample.jsonl"
)

model_id_param3 = ParameterString(
    name="model_id3",
    default_value="gpt-3.5-turbo"
)


# Processingステップの作成
step_process3 = ProcessingStep(
    name="gpt-3_5-turbo-evaluation",
    code="code/openai_text_summarization.py",
    processor=processor3,
    inputs=[ProcessingInput(source="data/xsum_sample.jsonl", 
                            destination="/opt/ml/processing/input/data/"),
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/evaluation")
    ],
    job_arguments=['--input_data', input_data_param3,
               '--model_id', model_id_param3,
              ]
)

In [27]:
# Processing Job 4 の定義
processor4 = ScriptProcessor(
    image_uri=".XXXXXXXXXXXX.dkr.ecr.us-east-1.amazonaws.com/fmeval-container:latest",
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="evaluation-gpt-4",
    sagemaker_session=sagemaker_session,
    role=sagemaker.get_execution_role(),
)

# パラメータの定義
input_data_param4 = ParameterString(
    name="input_data4",
    default_value="xsum_sample.jsonl"
)

model_id_param4 = ParameterString(
    name="model_id4",
    default_value="gpt-4"
)


# Processingステップの作成
step_process4 = ProcessingStep(
    name="gpt-4-evaluation",
    code="code/openai_text_summarization.py",
    processor=processor4,
    inputs=[ProcessingInput(source="data/xsum_sample.jsonl", 
                            destination="/opt/ml/processing/input/data/"),
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/evaluation")
    ],
    job_arguments=['--input_data', input_data_param4,
               '--model_id', model_id_param4,
              ]
)

In [28]:
# Processing Job 5 の定義
processor5 = ScriptProcessor(
    image_uri=".XXXXXXXXXXXX.dkr.ecr.us-east-1.amazonaws.com/fmeval-container:latest",
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_job_name="evaluation-gpt-4-turbo",
    sagemaker_session=sagemaker_session,
    role=sagemaker.get_execution_role(),
)

# パラメータの定義
input_data_param5 = ParameterString(
    name="input_dat52",
    default_value="xsum_sample.jsonl"
)

model_id_param5 = ParameterString(
    name="model_id5",
    default_value="gpt-4-turbo"
)


# Processingステップの作成
step_process5 = ProcessingStep(
    name="gpt-4-turbo-evaluation",
    code="code/openai_text_summarization.py",
    processor=processor5,
    inputs=[ProcessingInput(source="data/xsum_sample.jsonl", 
                            destination="/opt/ml/processing/input/data/"),
           ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output/evaluation")
    ],
    job_arguments=['--input_data', input_data_param5,
               '--model_id', model_id_param5,
              ]
)

## ステップ 2: Parallel ステップの定義


In [29]:
timestamp_str = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')[:-3]
pipeline = Pipeline(
        name=f"fmeval-pipeline",
        parameters=[input_data_param1, model_id_param1,
                    input_data_param2, model_id_param2,
                    input_data_param3, model_id_param3,
                    input_data_param4, model_id_param4,
                    input_data_param5, model_id_param5,
                   ],
        steps=[step_process1, 
               step_process2,
               step_process3,
               step_process4,
               step_process5,
             ]
    )

pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50})
      



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:820974724107:pipeline/fmeval-pipeline',
 'ResponseMetadata': {'RequestId': '5b04f5d7-b6ab-42d9-8425-b8d8b38aa174',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5b04f5d7-b6ab-42d9-8425-b8d8b38aa174',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Thu, 11 Apr 2024 13:34:11 GMT'},
  'RetryAttempts': 0}}