In [None]:
!pip install -U sagemaker

In [None]:
import sys
from datetime import datetime
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from typing import List, Tuple
import pandas as pd
import numpy as np
from datetime import datetime
from gluonts.dataset.common import ListDataset
from gluonts.mx.trainer import Trainer
from gluonts.mx.model.deepar import DeepAREstimator
from gluonts.evaluation import Evaluator
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.model.forecast import SampleForecast
import matplotlib.pyplot as plt
from itertools import islice
from sagemaker.sklearn.processing import ScriptProcessor
from gluonts.evaluation.backtest import make_evaluation_predictions
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from pathlib import Path
# Set the container URI for the MXNet container
from sagemaker.mxnet.estimator import MXNet
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CacheConfig

In [None]:
%env AWS_PROFILE = weather-pipeline

In [None]:
# https://github.com/Ransaka/Sagemaker-Local-Mode-Example/blob/master/preprocess/preprocess.py

In [None]:
!mkdir -p data

In [None]:
def get_most_recent_data_version(bucket: str):
    s3 = boto3.client('s3')
    objects = s3.list_objects_v2(Bucket=bucket)['Contents']
    objects.sort(key=lambda obj: obj['LastModified'], reverse=True)
    most_recent_file_key = objects[0]['Key']
    return most_recent_file_key

In [None]:
sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
pipeline_session = PipelineSession()
role = "arn:aws:iam::371410071971:role/service-role/AmazonSageMaker-ExecutionRole-20200731T092838"
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"WeatherForecastModelPackageGroupName"

In [None]:
default_bucket

In [None]:
# get the current aws user
sts = boto3.client("sts")
current_user = sts.get_caller_identity()["Arn"].split("/")[-1]
print(current_user)



In [None]:
input_data_bucket = "clean-city-weather-data-1"
most_recent_file_key = get_most_recent_data_version(bucket="clean-city-weather-data-1")

In [None]:
local_path = Path.cwd() / "data" / "weather.csv"
s3 = boto3.resource("s3")
s3.Bucket(input_data_bucket).download_file(
    most_recent_file_key, local_path
)

base_uri = f"s3://{default_bucket}/recent-weather"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)


In [441]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

instance_type = ParameterString(name="TrainingInstanceType", 
                                default_value="ml.m5.xlarge"
                                )
model_approval_status = ParameterString(
                                name="ModelApprovalStatus", 
                                default_value="PendingManualApproval"
                                )
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

In [442]:
input_data

ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-west-2-371410071971/recent-weather/weather.csv')

### Preprocess

In [443]:
%%writefile code/preprocessing.py

import boto3
import pandas as pd
import os
import logging

base_dir = "/opt/ml/processing"  

    
if __name__ == "__main__":

    target_col = os.environ.get('target_col')
    id_col = os.environ.get('id_col')
    time_col = os.environ.get('time_col')
    frequency = os.environ.get('frequency')
    forecasting_horizon = int(os.environ.get('forecasting_horizon'))

    weather_df = pd.read_csv(f"{base_dir}/input/weather.csv",parse_dates=True)
    weather_df = weather_df.sort_values(by=[id_col,time_col]).reset_index(drop=True)
    weather_df[time_col] = pd.to_datetime(weather_df[time_col])
    weather_df = weather_df.set_index(time_col)
    
    # benchmark our forecast against a baseline
    backtest_split_point = weather_df.index[-1] - pd.Timedelta(hours=forecasting_horizon)
    backtest_train_df = weather_df[weather_df.index <= backtest_split_point]
    backtest_train_df = backtest_train_df.reset_index()
    backtest_train_df = backtest_train_df.rename({"index": time_col})
    backtest_validation_df = weather_df.copy()
    backtest_validation_df = backtest_validation_df.reset_index()
    backtest_validation_df = backtest_validation_df.rename({"index": time_col})
    
    # write backtest data to s3
    backtest_train_df.to_csv(f"{base_dir}/train/train.csv", index=False)
    backtest_validation_df.to_csv(f"{base_dir}/validation/validation.csv", index=False)


Overwriting code/preprocessing.py


In [455]:
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-weather-process",
    role=role,
    sagemaker_session=pipeline_session,
    env={"target_col": "temp_farenheit",
         "id_col": "ts_id",
            "time_col": "time",
            "frequency": "H",
            "forecasting_horizon": "72"
         }
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [456]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
step_process = ProcessingStep(name="WeatherProcess", 
                              processor=sklearn_processor,
                              inputs=[
                ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
            ],
            outputs=[
                ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
                ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ],
    code="code/preprocessing.py",
    cache_config=cache_config
                              )
# this will create a directory in S3 like this: 
# s3://sagemaker-us-west-2-371410071971/WeatherPipeline-Local-1/0520x0r4fs5b/WeatherProcess/output/train/train.csv
# s3://sagemaker-us-west-2-371410071971/WeatherPipeline-Local-1/0520x0r4fs5b/WeatherProcess/output/validation/validation.csv

In [457]:
# OutputDataConfig.S3OutputPath
# s3://sagemaker-us-west-2-371410071971/


### train step

In [458]:
%%writefile code/train.py

from typing import List
import os
import joblib
import json
import pandas as pd
from pathlib import Path
import argparse
from gluonts.dataset.common import ListDataset
from gluonts.mx.model.deepar import DeepAREstimator
from gluonts.model.seasonal_naive import SeasonalNaivePredictor
from gluonts.mx.trainer import Trainer

def df_to_list_dataset(df: pd.DataFrame, id_col: str, target_col: str) -> List[dict]:
    # raise an exception if there is no index or the index is not datetime
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("Dataframe must have a DatetimeIndex")
    list_dataset = list()
    for tsid in df[id_col].unique():
        tmp_df = df[df[id_col] == tsid]
        ts_dict = {
            "start": tmp_df.index[0], "target": tmp_df[target_col].values, "item_id": tsid}
        list_dataset.append(ts_dict)
    return list_dataset


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # input
    parser.add_argument('--target-col', type=str, default=os.environ.get('SM_HP_TARGET_COL'))
    parser.add_argument('--id-col', type=str, default=os.environ.get('SM_HP_ID_COL'))
    parser.add_argument('--time-col', type=str, default=os.environ.get('SM_HP_TIME_COL'))
    parser.add_argument('--frequency', type=str, default=os.environ.get('SM_HP_FREQUENCY'))
    parser.add_argument('--forecasting-horizon', type=int, default=os.environ.get('SM_HP_FORECASTING_HORIZON'))
    parser.add_argument('--train-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--train-file', type=str, default='train.csv')
    # output
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR')) # /opt/ml/model - this is where you save the models

    args, _ = parser.parse_known_args()

    target_col = args.target_col
    id_col = args.id_col
    time_col = args.time_col
    frequency = args.frequency
    forecasting_horizon = args.forecasting_horizon
    # join the args.train_dir and args.train_file with pathlib
    train_df = pd.read_csv(Path(args.train_dir, args.train_file), parse_dates=True)     
    train_df = train_df.sort_values(
        by=[id_col, time_col]).reset_index(drop=True)
    train_df[time_col] = pd.to_datetime(train_df[time_col])
    train_df = train_df.set_index(time_col)

    train_lst = df_to_list_dataset(train_df, id_col, target_col)
    train_lds = ListDataset(train_lst, freq=frequency)
    # fit the model
    deep_ar_estimator = DeepAREstimator(
        freq=frequency,
        prediction_length=forecasting_horizon,
        context_length=forecasting_horizon,
        trainer=Trainer(epochs=1,
                        learning_rate=1e-3
                        )
        
    )
    # fit the benchmark model
    snaive_predictor = SeasonalNaivePredictor(freq=frequency,
                                            prediction_length=forecasting_horizon
                                            )
    deep_ar_predictor = deep_ar_estimator.train(train_lds)    
    Path(args.model_dir, "deep_ar_model").mkdir(parents=True, exist_ok=True)
    Path(args.model_dir, "snaive_model").mkdir(parents=True, exist_ok=True)
    joblib.dump(deep_ar_predictor, Path(args.model_dir, "deep_ar_model" ,"deep_ar_model.pkl"))
    joblib.dump(snaive_predictor, Path(args.model_dir, "snaive_model" ,"snaive_model.pkl"))
    print("model saved successfully")

Overwriting code/train.py


In [459]:
%%writefile code/requirements.txt

gluonts==0.11.9
pandas==1.1.5
joblib==1.1.1

Overwriting code/requirements.txt


In [462]:
#s3://sagemaker-us-west-2-371410071971/WeatherPipeline-Local-90/vnp5is5hlp22/WeatherProcess/output/train
# https://sagemaker.readthedocs.io/en/stable/frameworks/mxnet/using_mxnet.html#create-an-estimator
mxnet_estimator = MXNet(
    entry_point="train.py",
    source_dir="code",
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    framework_version="1.6.0",
    py_version="py3",
    hyperparameters={"target_col": "temp_farenheit",
         "id_col": "ts_id",
            "time_col": "time",
            "frequency": "H",
            "forecasting_horizon": 72
         },
)


step_train = TrainingStep(
    name="WeatherTrain",
    estimator=mxnet_estimator, # training algo + compute resources
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),        
    },
    cache_config=cache_config,
    depends_on=[step_process],
)

# model artifiact gets sent to: s3://sagemaker-us-west-2-371410071971/pipelines-r7ur1p9auifq-WeatherTrain-xIYAziWhaD/output/model.tar.gz


### Evaluation Step


In [475]:
%%writefile code/evaluation.py


import sys
import subprocess

# use subprocess to pip install dependencies
dependencies = ["gluonts==0.11.9", "pandas==1.1.5", "joblib==1.1.1"]
subprocess.check_call([sys.executable, "-m", "pip", "install", *dependencies])

import os
import tarfile
import joblib
from pathlib import Path
from typing import List
import argparse
import pandas as pd
import numpy as np
from datetime import datetime
from gluonts.model.predictor import Predictor
from gluonts.model.forecast import SampleForecast
from gluonts.dataset.common import ListDataset
from gluonts.evaluation import Evaluator
from gluonts.evaluation.backtest import make_evaluation_predictions



def df_to_list_dataset(df: pd.DataFrame, id_col: str, target_col: str) -> List[dict]:
    # raise an exception if there is no index or the index is not datetime
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("Dataframe must have a DatetimeIndex")
    list_dataset = list()
    for tsid in df[id_col].unique():
        tmp_df = df[df[id_col] == tsid]
        ts_dict = {
            "start": tmp_df.index[0], "target": tmp_df[target_col].values, "item_id": tsid}
        list_dataset.append(ts_dict)
    return list_dataset

def create_metrics_df(tgt_metrics=["MASE", "sMAPE", "RMSE"], **kwargs) -> pd.DataFrame:
    df_metrics = pd.DataFrame()
    for model_name, metrics_dict in kwargs.items():
        df_metrics = pd.DataFrame.join(df_metrics, pd.DataFrame.from_dict(metrics_dict, orient="index", columns=[model_name]), how="outer")
    df_metrics = df_metrics.round(2)
    df_metrics = df_metrics.reset_index()
    df_metrics = df_metrics.rename(columns={"index": "metric"})
    df_metrics = df_metrics[df_metrics["metric"].isin(tgt_metrics)]    
    return df_metrics     

def create_actuals_vs_forecast_df(forecasts: List[SampleForecast], tss: List[pd.DataFrame], backtest_split_point: pd.Timestamp, model: str) -> pd.DataFrame:
    # TO DO: REFACTOR THIS FUNCTION
    backtest_fcast_vs_actuals_df = pd.DataFrame()
    for forecast, ts_df in zip(forecasts, tss):    
        ts_df.index = ts_df.index.to_timestamp()
        ts_df = ts_df[ts_df.index > backtest_split_point]
        ts_df.columns = ['actual']
        ts_df['p50'] = forecast.quantile(0.5)
        ts_df['p90'] = forecast.quantile(0.9)
        ts_df['p10'] = forecast.quantile(0.1)
        ts_df['in_interval'] = np.where((ts_df['actual'] >= ts_df['p10']) &
                                            (ts_df['actual'] <= ts_df['p90']), 1, 0)
        ts_df['item_id'] = forecast.item_id    
        ts_df['model'] = model
        backtest_fcast_vs_actuals_df = pd.concat([backtest_fcast_vs_actuals_df, ts_df])
    return backtest_fcast_vs_actuals_df    


# helpful input keys and values from eviroment variables
# ProcessingInputs.0.S3Input.LocalPath = /opt/ml/model
# ProcessingInputs.3.S3Input.S3Uri = s3://sagemaker-us-west-2-371410071971/WeatherPipeline-Local-666/code/69552bf494f141614c19e00214edad63/evaluation.py
# ProcessingOutputConfig.Outputs.0.S3Output.LocalPath = /opt/ml/evaluation

if __name__ == "__main__":
    # get the location of the model
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-dir', type=str, default="/opt/ml/processing/model")
    # get the location of the validation data
    parser.add_argument('--validation-dir', type=str, default="/opt/ml/processing/validation")
    parser.add_argument('--validation-file', type=str, default='validation.csv')
    # get the location of the test data    
    parser.add_argument('--output_dir', type=str, default="/opt/ml/processing/evaluation")

    target_col = os.environ.get('target_col')
    id_col = os.environ.get('id_col')
    time_col = os.environ.get('time_col')
    frequency = os.environ.get('frequency')
    forecasting_horizon = int(os.environ.get('forecasting_horizon'))

    args, _ = parser.parse_known_args()
    model_dir = args.model_dir
    validation_dir = args.validation_dir
    validation_file = args.validation_file
    # print(f"SAGEMAKER ENVIRONMENT VARIABLES: {os.environ}")
    # 
    # print the directory structure of the operating system
    # print(f"Directory structure of the operating system: {os.listdir()}")

    # of the opt directory

    # for root, dirs, files in os.walk("/opt"):
    #     for name in files:
    #         print(os.path.join(root, name))
    #     for name in dirs:
    #         print(os.path.join(root, name))

    # unzip the directory containing the models
    with tarfile.open(Path(model_dir) / "model.tar.gz", "r:gz") as tar:
        tar.extractall(path=Path(model_dir))
        
    deep_ar_predictor = joblib.load(Path(model_dir) / "deep_ar_model" / "deep_ar_model.pkl")
    snaive_predictor = joblib.load(Path(model_dir) / "snaive_model" / "snaive_model.pkl")

    # load the data
    validation_df = pd.read_csv(Path(validation_dir, validation_file), parse_dates=True)
    validation_df = validation_df.set_index("time")
    eval_split_point = validation_df.index[-1] - pd.Timedelta(hours=forecasting_horizon)
    eval_validate_lst = df_to_list_dataset(validation_df, id_col, target_col)
    eval_validate_lds = ListDataset(eval_validate_lst, freq=frequency)

    
    deep_ar_forecast_it, deep_ar_tss_it = make_evaluation_predictions(
    dataset=eval_validate_lst,  # test dataset
    predictor=deep_ar_predictor,  # predictor
    num_samples=100,  # number of sample paths we want for evaluation
    )
    snaive_forecast_it, snaive_tss_it = make_evaluation_predictions(
        dataset=eval_validate_lst,  # test dataset
        predictor=snaive_predictor,  # predictor
        )

    # now we can convert these generators to lists
    deep_ar_forecasts = list(deep_ar_forecast_it)
    deep_ar_tss = list(deep_ar_tss_it)
    snaive_forecasts = list(snaive_forecast_it)
    snaive_tss = list(snaive_tss_it)

    evaluator = Evaluator(quantiles=[0.5])
    num_series = validation_df[id_col].unique().shape[0]

    deep_ar_agg_metrics, deep_ar_item_metrics = evaluator(deep_ar_tss, 
                                                      deep_ar_forecasts, 
                                                      num_series=num_series)

    snaive_agg_metrics, snaive_item_metrics = evaluator(snaive_tss, 
                                                        snaive_forecasts, 
                                                        num_series=num_series
                                                    )

    df_metrics = create_metrics_df(deep_ar=deep_ar_agg_metrics, snaive=snaive_agg_metrics)
    df_metrics['backtest_window_start'] = (eval_split_point + pd.Timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
    df_metrics['backtest_window_end'] = validation_df.index[-1]
    df_metrics['run_dt'] = datetime.now().strftime("%Y-%m-%d")

    # now make the backtest
    df_backtest_deep_ar = create_actuals_vs_forecast_df(deep_ar_forecasts, deep_ar_tss, eval_split_point, 'deep_ar')
    df_backtest_snaive = create_actuals_vs_forecast_df(snaive_forecasts, snaive_tss, eval_split_point, 'snaive')
    df_backtest = pd.concat([df_backtest_deep_ar, df_backtest_snaive]).reset_index().rename(columns={'index':'time'})

    # save the metrics
    
    Path(args.output_dir).mkdir(parents=True, exist_ok=True)
    df_metrics.to_csv(Path(args.output_dir, 'metrics.csv'), index=False)
    df_backtest.to_csv(Path(args.output_dir, 'backtest.csv'), index=False)
    #
    print("Finished Evaluation")
    

Overwriting code/evaluation.py


In [476]:
# here is where you go to see the images - note the region
# https://docs.aws.amazon.com/sagemaker/latest/dg/ecr-us-west-2.html
from sagemaker.processing import ScriptProcessor
from sagemaker import image_uris

# get mxnet image
mxnet_img_uri = image_uris.retrieve(framework='mxnet',
                                    region='us-west-2',
                                    version='1.4.1',
                                    py_version='py3',
                                    image_scope='inference', 
                                    instance_type="ml.m5.xlarge")

script_eval = ScriptProcessor(
    command=["python3"],
    base_job_name="script-weather-eval",
    instance_type="ml.m5.xlarge",
    image_uri=mxnet_img_uri,
    instance_count=1,
    role=role,
    sagemaker_session=pipeline_session,
    env={"target_col": "temp_farenheit",
         "id_col": "ts_id",
            "time_col": "time",
            "frequency": "H",
            "forecasting_horizon": "72"
         }
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # location of the model artifact
            destination="/opt/ml/processing/model"), # where to put it in the container
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"].S3Output.S3Uri, # location of the validation data
            destination = "/opt/ml/processing/validation") # where to put it in the container               
                ],
    outputs=[ProcessingOutput(source="/opt/ml/processing/evaluation",)],
    code="code/evaluation.py",           
                )

step_evaluate = ProcessingStep(
    name="WeatherEval",
    step_args=eval_args,
    depends_on=[step_process],
    
)



In [477]:
# add in a step where we take the best model of the 2, retrain it on the full dataset, and then create a forecast
# during the eval phase, create a plots directory and save the plots there for manual review
# It's nice to visually inspect the plots to see if the model is doing a good job. 
# 


In [478]:

pipeline_name = f"WeatherPipeline-Local-10001"
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

instance_type = ParameterString(name="TrainingInstanceType", 
                                default_value="ml.m5.xlarge"
                                )
model_approval_status = ParameterString(
                                name="ModelApprovalStatus", 
                                default_value="PendingManualApproval"
                                )
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
# update the existing piipline
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,        
    ],
    steps=[step_process, 
           step_train,
           step_evaluate
           ],
)
client = boto3.client('sagemaker')
response = client.list_pipeline_executions(PipelineName=pipeline_name)
pipeline_arns = [x['PipelineExecutionArn'] for x in response['PipelineExecutionSummaries']]
pipeline_names = [x.split('/')[-3].lower() for x in pipeline_arns]
if pipeline_name.lower() in pipeline_names:
    print("Pipeline already exists, updating")
    pipeline.update(role_arn=role)
else:
    pipeline.create(role_arn=role)
    

Pipeline already exists, updating


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


In [479]:
execution = pipeline.start()
steps = execution.list_steps()

In [None]:
# aws sagemaker delete-pipeline --pipeline-name WeatherPipeline-Local-87
steps

In [None]:
# describe the input and output of the pipeline
execution.describe()