# Load Data
Lambda will be running once per week via a CRON schedule, making requests to Google's Air Quality history endpoint, processing the data and then uploading train.json and test.json files to S3. Each week, the train.json and test.json files will represent a group of increasingly longer timeseries, which should enable our DeepAR model to make more accurate predictions.

# Define Pipeline

In [1]:
import sagemaker

from sagemaker import image_uris

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput
)

from sagemaker.inputs import TrainingInput

from sagemaker.workflow.steps import TrainingStep, CreateModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.condition_step import ConditionStep

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo, ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import Join

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# additional imports
from sagemaker.workflow.steps import TuningStep
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import JsonGet
from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner
)

Define pipeline parameters

In [3]:
# high-level
session = sagemaker.Session()
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
bucket = 'sagemaker-aqi-tool-pipeline'

Define processed input data

In [4]:
# define dype and s3 URI for training and test datasets
train_key = 'data/train/data'
test_key = 'data/test/data'
s3_input_train = TrainingInput("s3://{}/{}".format(bucket,train_key))
s3_input_test = TrainingInput("s3://{}/{}".format(bucket,test_key))

Define DeepAR model parameters

In [5]:
freq = "1H"
context_length = 24 # 24 hours
prediction_length = 24 # 24 hours

Define estimator & hyperparameters

In [6]:
deepar_container = image_uris.retrieve(framework='forecasting-deepar',region='eu-north-1')
s3_model_path = "s3://{}/model".format(bucket)

hyperparameters = {
    "time_freq": freq,
    "epochs": "50",
    "early_stopping_patience": "10",
    "mini_batch_size": "64",
    "prediction_length": str(prediction_length),
    "context_length": str(context_length),

}

# build DeepAR model
estimator = sagemaker.estimator.Estimator(
    image_uri=deepar_container,
    sagemaker_session = sagemaker.Session(),
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1,
    instance_type="ml.c5.2xlarge",
    output_path = s3_model_path)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


Tuning step

In [7]:
hyperparameter_ranges = {
    # 'context_length': IntegerParameter(24, 168, scaling_type='Linear'), # explore context windows between 1 day and 1 week
    'learning_rate': ContinuousParameter(5e-4,5e-2, scaling_type='Logarithmic')
}

hyperparameter_tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name='test:RMSE',
    hyperparameter_ranges=hyperparameter_ranges,
    strategy='Bayesian',
    max_jobs=6,
    max_parallel_jobs=2,
    objective_type='Minimize',
)

tuning_step = TuningStep(
    name='TuningStep',
    tuner = hyperparameter_tuner,
    inputs = {
        'train': s3_input_train,
        'test': s3_input_test
    }

)

Register model step

In [8]:
# get custom model package name for next step
import boto3
import json

sm = boto3.client("sagemaker")
s3 = boto3.client("s3")

bucket = 'sagemaker-aqi-tool-pipeline'
train_key = 'data/train/data.json'

get_object_response = s3.get_object(
    Bucket = bucket,
    Key = train_key
)

training_data_json_strings = get_object_response['Body'].read().decode('utf-8').splitlines()
training_data = [json.loads(line) for line in training_data_json_strings]
london_obj = next((item for item in training_data if item.get('place') == 'London'), None)

no_weeks = round(len(london_obj['target'])/ (7 * 24))
custom_model_name = f"deepar{no_weeks}w"


In [9]:
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=estimator.image_uri,
    model_data=tuning_step.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=bucket,
        prefix='model'
    ),
    sagemaker_session=pipeline_session,
    role=role,
)

register_args = model.register(
    content_types=['application/json',],
    response_types=['application/json'],
    inference_instances=['ml.m5.large'],
    transform_instances=['ml.m5.large'],
    model_package_group_name='deepar-aqi-model-group',
    customer_metadata_properties = {'model_name': custom_model_name},
    approval_status='Approved', # Lambda can only deploy registered models whose status is "Approved". alternative is "PendingManualApproval"
)

register_model_step = ModelStep(
    name='AQIRegisterModel',
    step_args=register_args,
)



Create model step

In [10]:
create_model_step = CreateModelStep(
    name='CreateModelforEndpoint',
    model = model
)

Create Lambda step. This calls the Lambda function which will create a serverless endpoint to host the model.

In [11]:
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput
from sagemaker.lambda_helper import Lambda
deploy_lambda = Lambda(
    function_arn="arn:aws:lambda:eu-north-1:263108256547:function:createForecastServerlessEndpointPipeline"
)

lambda_step = LambdaStep(
    name='CreateServerlessEndpoint',
    lambda_func=deploy_lambda,
    inputs={
        'model_name': create_model_step.properties.ModelName
    },
    outputs=[
        LambdaOutput(output_name='endpoint_name'),
    ]
)

In [12]:
pipeline = Pipeline(
    name='aqi-pipeline',
    steps=[tuning_step,register_model_step, create_model_step, lambda_step]
)

In [13]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:263108256547:pipeline/aqi-pipeline',
 'ResponseMetadata': {'RequestId': '37ca3f95-6f8e-4661-8500-b5b93ca5772f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '37ca3f95-6f8e-4661-8500-b5b93ca5772f',
   'strict-transport-security': 'max-age=47304000; includeSubDomains',
   'x-frame-options': 'DENY',
   'content-security-policy': "frame-ancestors 'none'",
   'cache-control': 'no-cache, no-store, must-revalidate',
   'x-content-type-options': 'nosniff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '103',
   'date': 'Wed, 29 Oct 2025 19:04:50 GMT'},
  'RetryAttempts': 0}}

In [14]:
# programitically start the pipeline
# pipeline.start(
#     execution_display_name='conditional-model-registration',
#     execution_description='Starting from the SageMaker Studio'
# )