## Part X.2 - Configure CI/CD Pipeline for DeepAR 

University of San Diego - MS Applied AI

AAI-540 Team 5

October 21, 2024

In [1]:
# setup environment
%run 0-Environment_Setup.ipynb

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
Stored 's3_datalake_path_csv' (str)
Stored 'local_data_path_csv' (str)
Stored 's3_datalake_path_parquet' (str)


In [3]:
# import necessary libraries
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [4]:
# init pipeline session
pipeline_session = PipelineSession()

In [5]:
# Setup S3 buckets
input_data_path = "s3://{}/store-sales-forecasting/deepar/pipelines/train/input".format(bucket)
batch_data_path = "s3://{}/store-sales-forecasting/deepar/pipelines/train/output".format(bucket)
train_job_output_path = "s3://{}/store-sales-forecasting/deepar/pipelines/train/model".format(bucket)

In [6]:
# get latest feature store and write local
input_local_file = './test-data/input_data.csv'
sales_features_store = get_store_dataset_from_offline_feature_group(store_sales_feature_group)
sales_features_store.to_csv(input_local_file)

# set destination path in S3
input_data_file = "{}/input_data.csv".format(input_data_path)

INFO:sagemaker:Query f430db85-c9fa-48c5-9b03-8db93dc55f6f is being executed.


Running 
    SELECT *
    FROM
        "store_sales_feature_group_offline_1728336748"
    ORDER BY
        store_nbr ASC, date ASC
    


INFO:sagemaker:Query f430db85-c9fa-48c5-9b03-8db93dc55f6f successfully executed.


In [7]:
# copy validation dataset local
!aws s3 cp $input_local_file $input_data_file

upload: test-data/input_data.csv to s3://sagemaker-us-east-1-343218227212/store-sales-forecasting/deepar/pipelines/train/input/input_data.csv


In [8]:
# Load the best model information from our tuning job
tuning_job_name = "deepar-hyperparamete-241007-2220"
tuning_job_result = sm.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name
)

# get model details from best training job
best_training_job_name = tuning_job_result["BestTrainingJob"]["TrainingJobName"]
best_training_job = sm.describe_training_job(TrainingJobName=best_training_job_name)

# get the best RMSE score to use in pipeline
best_rmse_metric = 0
for metric_name in best_training_job['FinalMetricDataList']:
    if(metric_name['MetricName'] == 'test:RMSE'):
        best_rmse_metric = metric_name['Value']

print("Best model test:RMSE score: {}".format(best_rmse_metric))

Best model test:RMSE score: 3434.864990234375


In [9]:
# define pipeline parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.2xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_path,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_path,
)

# this is the 
rmse_threshold = ParameterFloat(name="RmseThreshold", default_value=best_rmse_metric)

### Setup Docker Container for Custom Processing

Fetch latest data from the offline feature store (Past 1 week)
Transform in to JSONL Test Format


In [10]:
!mkdir -p code
!mkdir docker
!pip install sagemaker-studio-image-build

mkdir: cannot create directory ‘docker’: File exists


In [11]:
%%writefile docker/Dockerfile
FROM python:3.7-slim-buster

RUN pip3 install pandas==0.25.3 json boto3 sagemaker time awswrangler pyathena
ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

Writing docker/Dockerfile


In [14]:
# push docker container
ecr_repository = 'deepar-processing-container'
tag = ':latest'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

In [21]:
# build container
!sm-docker build -t $ecr_repository docker

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
Created ECR repository sagemaker-studio
...................[Container] 2024/10/15 21:16:28.146560 Running on CodeBuild On-demand

[Container] 2024/10/15 21:16:28.146574 Waiting for agent ping
[Container] 2024/10/15 21:16:28.247523 Waiting for DOWNLOAD_SOURCE
[Container] 2024/10/15 21:16:30.072083 Phase is DOWNLOAD_SOURCE
[Container] 2024/10/15 21:16:30.112993 CODEBUILD_SRC_DIR=/codebuild/output/src1727034112/src
[Container] 2024/10/15 21:16:30.113572 YAML location is /codebuild/output/src1727034112/src/buildspec.yml
[Container] 2024/10/15 21:16:30.116997 Setting HTTP client timeout to higher timeout for S3 source
[Container] 2024/10/15 21:16:30.117137 Processing environment variables
[Container] 2024/10/15 21:16:30.161892 No runtime version selected in buildspec.
[Container] 20

In [22]:
# Create ECR repository
!aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com
!aws ecr create-repository --repository-name $ecr_repository

/bin/bash: line 1: docker: command not found


Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>
BrokenPipeError: [Errno 32] Broken pipe

An error occurred (RepositoryAlreadyExistsException) when calling the CreateRepository operation: The repository with name 'deepar-processing-container' already exists in the registry with id '343218227212'


In [24]:
# push docker image
!sm-docker tag {ecr_repository + tag} $processing_repository_uri
!sm-docker push $processing_repository_uri

usage: sm-docker [-h] {build} ...
sm-docker: error: argument subcommand: invalid choice: 'tag' (choose from 'build')
usage: sm-docker [-h] {build} ...
sm-docker: error: argument subcommand: invalid choice: 'push' (choose from 'build')


### 1. Setup Data Pre-Processing Step

In [10]:
%%writefile code/preprocessing.py
import os
import json
from time import gmtime, strftime
import datetime
import pandas as pd

# helper function to build the target time series for each store
def build_store_timeseries(store_sales, target_col):
    unique_stores = store_sales['store_nbr'].unique()
    store_timeseries = []
    for store_nbr in unique_stores:
        # get the sales data for this store and only keep the timestep and sales number
        store_data = store_sales[store_sales['store_nbr'] == store_nbr]
        store_data = store_data[['date', target_col]]

        # convert to datetime and then to series with timestep = 1d
        store_data['date'] = pd.to_datetime(store_data['date'])
        
        store_data = store_data.set_index('date')
        store_data = store_data.resample('D').sum()
        store_ts = store_data.iloc[:, 0]

        # add to list
        store_timeseries.append(store_ts)    
    return store_timeseries

# helper function to write ts datasets to json
def write_dicts_to_file(path, data):
    with open(path, "wb") as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode("utf-8"))


if __name__ == "__main__":
    
    base_dir = "/opt/ml/processing"
    
    #load input data
    input_data_path = os.path.join("/opt/ml/processing/input", "input_data.csv")
    sales_features_store = pd.read_csv(input_data_path)

    # calculate the total days in the date range so we can split at 80% mark
    series_start_date = pd.to_datetime(sales_features_store['date'].min())
    series_end_date = pd.to_datetime(sales_features_store['date'].max())
    delta = series_end_date - series_start_date

    # set training cutoff parameters
    training_series_day_count = int(delta.days * .8)
    start_training = series_start_date
    end_training = series_start_date + datetime.timedelta(days=training_series_day_count)

    # set test cutoff parameters
    start_test = end_training + datetime.timedelta(days=1)
    test_days = delta.days - training_series_day_count
    test_weeks = int((delta.days - training_series_day_count) / 7)
    val_weeks = int(test_weeks / 2)
    test_weeks = val_weeks
    end_test = start_test + datetime.timedelta(days=(test_weeks * 7))

    # build the target timeseries
    timeseries_stores_sales = build_store_timeseries(sales_features_store, 'sales')
    timeseries_stores_oil = build_store_timeseries(sales_features_store, 'oil')
    timeseries_stores_holidays = build_store_timeseries(sales_features_store, 'is_holiday')
    timeseries_stores_promotions = build_store_timeseries(sales_features_store, 'onpromotion')

    # capture the unique stores
    unique_store_nbrs = sales_features_store['store_nbr'].unique()
    deepar_prediction_length = 7

    # generate training data
    training_data = [
        {
            "start": str(start_training),
            "target": ts[start_training:end_training].tolist(),
            "cat": [int(unique_store_nbrs[i]) - 1],
            "dynamic_feat": [
                timeseries_stores_oil[i][start_training:end_training].tolist(),
                timeseries_stores_holidays[i][start_training:end_training].tolist(),
                timeseries_stores_promotions[i][start_training:end_training].tolist(),
            ],
        }
        for i, ts in enumerate(timeseries_stores_sales)
    ]
    print(len(training_data))

    val_end = start_test + datetime.timedelta(days=(val_weeks*7))

    # generate validation data
    val_end = start_test + datetime.timedelta(days=(val_weeks*7))
    val_data = [
        {
            "start": str(start_test),
            "target": ts[start_test:val_end].tolist(),
            "cat": [int(unique_store_nbrs[i]) - 1],
            "dynamic_feat": [
                timeseries_stores_oil[i][start_test:val_end].tolist(),
                timeseries_stores_holidays[i][start_test:val_end].tolist(),
                timeseries_stores_promotions[i][start_test:val_end].tolist(),
            ],
        }
        for i, ts in enumerate(timeseries_stores_sales)
    ]
    print(len(val_data))

# Generate test data
test_windows = test_weeks - 2
gen_test_start = start_test + datetime.timedelta(days=(val_weeks*7))
gen_test_end = gen_test_start + datetime.timedelta(days=(test_weeks*7))
cw = 7

test_data = [
    {
        "start": str(gen_test_start + datetime.timedelta(days=((k-1) * cw))),
        "target": ts[(gen_test_start + datetime.timedelta(days=((k-1) * cw))) : (gen_test_start + datetime.timedelta(days=((k * cw) - 1)))].tolist(),
        "cat": [int(unique_store_nbrs[i]) - 1],
        "dynamic_feat": [
            timeseries_stores_oil[i][(gen_test_start + datetime.timedelta(days=((k-1) * cw))) : (gen_test_start + datetime.timedelta(days=((k * cw) + deepar_prediction_length - 1)))].tolist(),
            timeseries_stores_holidays[i][(gen_test_start + datetime.timedelta(days=((k-1) * cw))) : (gen_test_start + datetime.timedelta(days=((k * cw) + deepar_prediction_length - 1)))].tolist(),
            timeseries_stores_promotions[i][(gen_test_start + datetime.timedelta(days=((k-1) * cw))) : (gen_test_start + datetime.timedelta(days=((k * cw) + deepar_prediction_length - 1)))].tolist(),
        ],
    }
    for k in range(1, test_windows + 1)
    for i, ts in enumerate(timeseries_stores_sales)
]

print(len(test_data))

# write datasets to json files
write_dicts_to_file("/opt/ml/processing/train/train.json", training_data)
write_dicts_to_file("/opt/ml/processing/val/val.json", val_data)
write_dicts_to_file("/opt/ml/processing/test/test.json", test_data)

    

Overwriting code/preprocessing.py


In [11]:
# setup script processor
from sagemaker.processing import ScriptProcessor
script_processor = ScriptProcessor(
    image_uri='343218227212.dkr.ecr.us-east-1.amazonaws.com/deepar-processing-container:latest',
    role=role,
    instance_count=1,
    instance_type='ml.m5.2xlarge',
    command=['python3'],
    base_job_name="deepar-feature-process",
    sagemaker_session=pipeline_session,
)

In [12]:
processor_args = script_processor.run(
    inputs=[
        ProcessingInput(source=input_data_file, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="val", source="/opt/ml/processing/val"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="DeepARFeatureProcess", step_args=processor_args)



### 2. Train DeepAR model with Tuned Hyperparameters

In [13]:
from sagemaker.inputs import TrainingInput

# configure model image and output path
image_name = sagemaker.image_uris.retrieve("forecasting-deepar", region)

data_channels = {
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="json",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri,
            content_type="json",
        ),
}

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


In [14]:
# we use 1d frequency for the time series
deepar_freq = "1D"

# prediction window 7 days
deepar_prediction_length = 7

# window size/context length is 15 days
deepar_context_length = 15

In [15]:
# initialize estimator
estimator = sagemaker.estimator.Estimator(
    image_uri=image_name,
    sagemaker_session=pipeline_session,
    role=role,
    train_instance_count=1,
    train_instance_type="ml.m5.2xlarge",
    base_job_name="deepar-pipeline-train",
    output_path=train_job_output_path,
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [16]:
# define hyperparameters
hyperparameters = {
    "time_freq": deepar_freq,
    "epochs": "101",
    "num_cells": "97",
    "early_stopping_patience": "40",
    "mini_batch_size": "1024",
    "learning_rate": "0.0006294407061415784",
    "context_length": "3",
    "prediction_length": str(deepar_prediction_length),
}

# set hyperparameters to model
estimator.set_hyperparameters(**hyperparameters)

In [17]:
# setup the model to train
train_args = estimator.fit(inputs=data_channels)

In [18]:
# setup train step
from sagemaker.workflow.steps import TrainingStep
step_train = TrainingStep(
    name="DeepARPipelineTrain",
    step_args=train_args,
)

### 3. Create Temporary Model to Use to Generate Predictions for Eval Step

In [19]:
# instantiate model
from sagemaker.model import Model
model = Model(
    image_uri=image_name,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

In [20]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_eval_model = ModelStep(
    name="DeepARPipelineCreateEvalModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

### Create Batch Processing Step to Generate Eval Data

In [29]:
# Initialize transformer
from sagemaker.transformer import Transformer
transformer = Transformer(
    model_name=step_create_eval_model.properties.ModelName,
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    output_path="{}/eval_transorm".format(batch_data_path),
)

In [30]:
# Initialize TransformStep
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep
transform_input = TransformInput(
    data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, 
    split_type="Line"
)

step_transform_eval = TransformStep(
    name="DeepARPipelineBatchEval", transformer=transformer, inputs=transform_input
)

### 5.  Create Processing Step to Evaluate Model Predictions on Test Data

In [44]:
%%writefile code/evaluation.py
import json
import joblib
import numpy as np
import pandas as pd


from sklearn.metrics import root_mean_squared_error

# helper function to load json
def load_json_by_line(file_path):
    results_raw = []
    with open(file_path) as f:
        for line in f:
            results_raw.append(json.loads(line))
    return results_raw


if __name__ == "__main__":
    # load input data
    transform_path = f"/opt/ml/processing/eval_transform/test.json.out"
    transform_input = load_json_by_line(transform_path)

    # compile predictions with inputs
    input_start_date = pd.to_datetime(transform_input[0]['start'])
    predictions_available = False
    predictions_data = []
    for i, input in enumerate(transform_input):
        start_date = pd.to_datetime(input['start'])
        if(start_date > input_start_date):
            predictions_available = True
        store_nbr = int(input['cat'][0]) + 1
        targets = input['target']

        if(predictions_available):
            predictions = transform_results[i]['mean']
        else:
            predictions = np.negative(np.ones(len(targets)))
        
        for j in range(7):
            target = targets[j]
            target_date = start_date + pd.Timedelta(days=j)
            predictions_data.append([target_date, store_nbr, target, predictions[j]])

    predictions_df = pd.DataFrame(columns=['date', 'store_nbr', 'true_sales', 'predicted_sales'], data=predictions_data)
    predictions_df['date'] = pd.to_datetime(predictions_df['date'])

    # calculate RMSE
    rmse = root_mean_squared_error(predictions_df[54:]['true_sales'], predictions_df[54:]['predicted_sales'])
    std = np.std(predictions_df[54:]['true_sales'] - predictions_df[54:]['predicted_sales'])
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting code/evaluation.py


In [46]:
from sagemaker.processing import ScriptProcessor

sklearn_image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

model_eval = ScriptProcessor(
    image_uri=sklearn_image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="deepar-pipeline-model-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

model_eval_args = model_eval.run(
    inputs=[
        ProcessingInput(
            source=step_transform_eval.properties.TransformOutput.S3OutputPath,
            destination="/opt/ml/processing/eval_transorm",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)



In [47]:
# create processing step with property file output
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_model_eval = ProcessingStep(
    name="DeepARPipelineModelEval",
    step_args=model_eval_args,
    property_files=[evaluation_report],
)

### Deploy and Execute Pipeline

In [48]:
pipeline_name = f"DeepARTrainDeployPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        rmse_threshold,
    ],
    steps=[step_process, step_train, step_create_eval_model, step_transform_eval, step_model_eval],
)

In [49]:
pipeline.upsert(role_arn=role)





{'PipelineArn': 'arn:aws:sagemaker:us-east-1:343218227212:pipeline/DeepARTrainDeployPipeline',
 'ResponseMetadata': {'RequestId': '84896cf5-b51c-4ae3-b82b-73356f007201',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '84896cf5-b51c-4ae3-b82b-73356f007201',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '93',
   'date': 'Wed, 16 Oct 2024 16:18:30 GMT'},
  'RetryAttempts': 0}}

In [50]:
execution = pipeline.start()

In [51]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:343218227212:pipeline/DeepARTrainDeployPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:343218227212:pipeline/DeepARTrainDeployPipeline/execution/iy2y6v8cgawr',
 'PipelineExecutionDisplayName': 'execution-1729095514048',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'deepartraindeploypipeline',
  'TrialName': 'iy2y6v8cgawr'},
 'CreationTime': datetime.datetime(2024, 10, 16, 16, 18, 33, 984000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 16, 16, 18, 33, 984000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:343218227212:user-profile/d-nzj1ohif3tlp/default-20241007T091581',
  'UserProfileName': 'default-20241007T091581',
  'DomainId': 'd-nzj1ohif3tlp',
  'IamIdentity': {'Arn': 'arn:aws:sts::343218227212:assumed-role/AmazonSageMaker-ExecutionRole-20241007T091581/SageMaker',
   'PrincipalId': 'AROAU72LGRAGGJLCV6WF3:SageMaker'}},
 

In [52]:
execution.list_steps()

[{'StepName': 'DeepARFeatureProcess',
  'StartTime': datetime.datetime(2024, 10, 16, 16, 18, 35, 308000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:343218227212:processing-job/pipelines-iy2y6v8cgawr-DeepARFeatureProcess-UVWP18i9HP'}},
  'AttemptCount': 1}]