# ML Pipeline for "Forecasting Air Quality with Amazon SageMaker DeepAR

In this example, we are going to build a ML Pipeline to automate air quality forecasting application with [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io). 

## ML Pipeline

### Outcome
* Create the flow for ML process for air quality forcasting build/train/deploy
* Create simple retrain flow

### Design
* Use Step Functions Data Science SDK to orchestrate the ML flow
* Use SageMaker Processing to do data preprocessing, especially,
 * A common Docker image will be build for data retrieving (interact with Amazon Athena) and data/feature engineering
* Use SageMaker Processing to do Model Evaluation
* A scheduled job mechanism will be used to do model retraining.

### Implementation

#### Initialize

In [1]:
%load_ext autoreload
%autoreload 2

In [None]:
!pip install --upgrade sagemaker

In [2]:
import boto3
import sagemaker
from sagemaker import get_execution_role

region = boto3.session.Session().region_name
role = get_execution_role()

#### Create Docker Image for SageMaker Processing

Define your own processing container and install related dependencies.

Below, you talk through how to create a processing container, and how to use a `ScriptProcessor` to run your own code within a container. Create a container support data preprocessing, feature engineering and model evaluation. 

In [3]:
# create a subfolder for docker 
!mkdir -p docker
!cp -r ../sql ./docker/

Below is the Dockerfile to create processing container. Install PyAthena, pandas and GeoPandas into it. You can install your own dependencies.

In [4]:
%%writefile docker/Dockerfile

FROM python:3.7-slim-buster
    
COPY ./sql /opt/ml/processing/
    
RUN pip install PyAthena[Pandas] geopandas scikit-learn
ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

Overwriting docker/Dockerfile


This block of code buils the container using the docker command, creates an Amazon Elastic Container Registry (Amazon ECR) repository, and pushes the image to Amazon ECR

In [5]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository = 'aq-forecasting-processing-container'
tag = ':latest'

uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
processing_repository_uri = f'{account_id}.dkr.ecr.{region}.{uri_suffix}/{ecr_repository + tag}'


In [6]:
processing_repository_uri

'593380422482.dkr.ecr.us-east-1.amazonaws.com/aq-forecasting-processing-container:latest'

In [None]:
# @todo consider using CFN template to create ECR repo and only manage the docker image build and push.
!docker build -t $ecr_repository docker


In [None]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

Below cell writes a file `preprocessing.py`, which contains the pre-processing script. You can update the script, and rerun the cell to overwrite `preprocessing.py`. You run this as a processing job in the next cell. In this script, the actions will be done:

* Create Athena table with external source - OpenAQ
* Query OpenAQ data 
* Feature engineering on the dataset
* Split and store the data on S3 buckets.

In [7]:
import argparse
import os
import warnings

import boto3, time, s3fs, json, warnings, os
import urllib.request
from datetime import date, timedelta
import numpy as np
import pandas as pd
import geopandas as gpd
from multiprocessing import Pool

# the train test split date is used to split each time series into train and test sets
train_test_split_date = date.today() - timedelta(days = 30)

# the sampling frequency determines the number of hours per sample
# and is used for aggregating and filling missing values
frequency = '1'

# prediction length is how many hours into future to predict values for
prediction_length = 48

# context length is how many prior time steps the predictor needs to make a prediction
context_length = 3

warnings.filterwarnings('ignore')

session = boto3.Session()
sagemaker_session = sagemaker.Session()
region = session.region_name
account = session.client('sts').get_caller_identity().get('Account')
bucket_name = f"{account_id}-openaq-lab"
athena_s3_staging_dir = f's3://{bucket_name}/athena/results/'

s3 = boto3.client('s3')

# @todo to evaluate whether we should store existing model.tar.gz onto s3 bucket.

# processing Athena
def athena_query_table(query_file, wait=None):
    results_uri = athena_execute(query_file, 'csv', wait)
    return results_uri

def athena_execute(query_file, ext, wait):
    with open(query_file) as f:
        query_str = f.read()  
        
    athena = boto3.client('athena')
    s3_dest = athena_s3_staging_dir
    query_id = athena.start_query_execution(
        QueryString= query_str, 
         ResultConfiguration={'OutputLocation': athena_s3_staging_dir}
    )['QueryExecutionId']
        
    results_uri = f'{athena_s3_staging_dir}{query_id}.{ext}'
        
    start = time.time()
    while wait == None or wait == 0 or time.time() - start < wait:
        result = athena.get_query_execution(QueryExecutionId=query_id)
        status = result['QueryExecution']['Status']['State']
        if wait == 0 or status == 'SUCCEEDED':
            break
        elif status in ['QUEUED','RUNNING']:
            continue
        else:
            raise Exception(f'query {query_id} failed with status {status}')

            time.sleep(3) 

    return results_uri       

def get_sydney_openaq_data(sql_query_file_path = "/opt/ml/processing/sql/sydney.dml"):
    query_results_uri = athena_query_table(sql_query_file_path)
    print (f'reading {query_results_uri}')
    raw = pd.read_csv(query_results_uri, parse_dates=['timestamp'])
    return raw

raw = get_sydney_openaq_data()


reading s3://593380422482-openaq-lab/athena/results/cdd9bf67-f051-4f6b-8efe-509915d9cc84.csv


In [11]:
def featurize(raw):

    def fill_missing_hours(df):
        df = df.reset_index(level=categorical_levels, drop=True)                                    
        index = pd.date_range(df.index.min(), df.index.max(), freq='1H')
        return df.reindex(pd.Index(index, name='timestamp'))

    # Sort and index by location and time
    categorical_levels = ['country', 'city', 'location', 'parameter']
    index_levels = categorical_levels + ['timestamp']
    indexed = raw.sort_values(index_levels, ascending=True)
    indexed = indexed.set_index(index_levels)
    # indexed.head()    
    
    # Downsample to hourly samples by maximum value
    downsampled = indexed.groupby(categorical_levels + [pd.Grouper(level='timestamp', freq='1H')]).max()

    # Back fill missing values
    filled = downsampled.groupby(level=categorical_levels).apply(fill_missing_hours)
    filled[filled['value'].isnull()].groupby('location').count().describe()
    
    filled['value'] = filled['value'].interpolate().round(2)
    filled['point_latitude'] = filled['point_latitude'].fillna(method='pad')
    filled['point_longitude'] = filled['point_longitude'].fillna(method='pad')

    # Create Features
    aggregated = filled.reset_index(level=4)\
        .groupby(level=categorical_levels)\
        .agg(dict(timestamp='first', value=list, point_latitude='first', point_longitude='first'))\
        .rename(columns=dict(timestamp='start', value='target'))    
    aggregated['id'] = np.arange(len(aggregated))
    aggregated.reset_index(inplace=True)
    aggregated.set_index(['id']+categorical_levels, inplace=True)
    
    metadata = gpd.GeoDataFrame(
        aggregated.drop(columns=['target','start']), 
        geometry=gpd.points_from_xy(aggregated.point_longitude, aggregated.point_latitude), 
        crs={"init":"EPSG:4326"}
    )
    metadata.drop(columns=['point_longitude', 'point_latitude'], inplace=True)
    # set geometry index
    metadata.set_geometry('geometry')

    # Add Categorical features
    level_ids = [level+'_id' for level in categorical_levels]
    for l in level_ids:
        aggregated[l], index = pd.factorize(aggregated.index.get_level_values(l[:-3]))

    aggregated['cat'] = aggregated.apply(lambda columns: [columns[l] for l in level_ids], axis=1)
    features = aggregated.drop(columns=level_ids+ ['point_longitude', 'point_latitude'])
    features.reset_index(level=categorical_levels, inplace=True, drop=True)
    
    return features

def filter_dates(df, min_time, max_time, frequency):
    min_time = None if min_time is None else pd.to_datetime(min_time)
    max_time = None if max_time is None else pd.to_datetime(max_time)
    interval = pd.Timedelta(frequency)
    
    def _filter_dates(r): 
        if min_time is not None and r['start'] < min_time:
            start_idx = int((min_time - r['start']) / interval)
            r['target'] = r['target'][start_idx:]
            r['start'] = min_time
        
        end_time = r['start'] + len(r['target']) * interval
        if max_time is not None and end_time > max_time:
            end_idx = int((end_time - max_time) / interval)
            r['target'] = r['target'][:-end_idx]
            
        return r
    
    filtered = df.apply(_filter_dates, axis=1) 
    filtered = filtered[filtered['target'].str.len() > 0]
    return filtered

def split_train_test_data(features, days = 30):
    train_test_split_date = date.today() - timedelta(days = days)
    train = filter_dates(features, None, train_test_split_date, '1H')
    test = filter_dates(features, train_test_split_date, None, '1H')
    return train, test

features = featurize(raw)
train, test = split_train_test_data(features)

# upload dataset to S3.
local_data_path = 'data'
os.makedirs(local_data_path, exist_ok = True)
train.to_json(f'{local_data_path}/train.json', orient='records', lines = True)
train_data_uri = sagemaker_session.upload_data(path=f'{local_data_path}/train.json', key_prefix = 'preprocessing_data')

test.to_json(f'{local_data_path}/test.json', orient='records', lines = True) 
test_data_uri =  sagemaker_session.upload_data(path=f'{local_data_path}/test.json', key_prefix = 'preprocessing_data')

features.to_json(f'{local_data_path}/all_data.json', orient='records', lines = True)
all_data_uri = sagemaker_session.upload_data(path=f'{local_data_path}/all_data.json', key_prefix = 'preprocessing_data')


In [12]:
features.head()

Unnamed: 0_level_0,start,target,cat
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,2016-04-09 20:00:00,"[12.2, 12.3, 12.4, 12.5, 12.8, 13.3, 13.3, 13....","[0, 0, 0, 0]"
1,2019-09-08 21:00:00,"[5.2, 5.4, 5.6, 5.8, 6.0, 6.1, 6.4, 6.7, 7.2, ...","[0, 0, 1, 0]"
2,2016-04-09 20:00:00,"[11.2, 11.3, 11.5, 11.6, 11.4, 11.3, 11.2, 11....","[0, 0, 2, 0]"
3,2017-08-31 00:00:00,"[5.5, 5.3, 5.3, 5.3, 5.2, 4.9, 4.8, 4.8, 4.8, ...","[0, 0, 3, 0]"
4,2016-04-09 20:00:00,"[20.2, 20.55, 20.9, 20.2, 19.1, 18.2, 17.8, 18...","[0, 0, 4, 0]"


In [13]:
features.count()

start     18
target    18
cat       18
dtype: int64

In [14]:
train.head()

Unnamed: 0_level_0,start,target,cat
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,2016-04-09 20:00:00,"[12.2, 12.3, 12.4, 12.5, 12.8, 13.3, 13.3, 13....","[0, 0, 0, 0]"
1,2019-09-08 21:00:00,"[5.2, 5.4, 5.6, 5.8, 6.0, 6.1, 6.4, 6.7, 7.2, ...","[0, 0, 1, 0]"
2,2016-04-09 20:00:00,"[11.2, 11.3, 11.5, 11.6, 11.4, 11.3, 11.2, 11....","[0, 0, 2, 0]"
3,2017-08-31 00:00:00,"[5.5, 5.3, 5.3, 5.3, 5.2, 4.9, 4.8, 4.8, 4.8, ...","[0, 0, 3, 0]"
4,2016-04-09 20:00:00,"[20.2, 20.55, 20.9, 20.2, 19.1, 18.2, 17.8, 18...","[0, 0, 4, 0]"


In [15]:
test.head()

Unnamed: 0_level_0,start,target,cat
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,2020-08-15,"[3.4, 0.5, 14.8, 0.8, 1.0, 3.0, 3.9, 6.2, 6.2,...","[0, 0, 0, 0]"
1,2020-08-15,"[5.9, 9.1, 12.9, 15.3, 10.6, 5.5, 7.3, 17.6, 1...","[0, 0, 1, 0]"
2,2020-08-15,"[5.1, 4.9, 8.2, 3.1, 3.2, 3.7, 3.8, 5.0, 6.1, ...","[0, 0, 2, 0]"
3,2020-08-15,"[0.0, 1.5, 10.3, 0.0, 0.0, 0.9, 1.0, 3.4, 3.7,...","[0, 0, 3, 0]"
4,2020-08-15,"[3.7, 2.0, 5.4, 1.2, 2.6, 3.6, 3.5, 5.4, 4.2, ...","[0, 0, 4, 0]"


In [None]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import boto3, time, s3fs, json, warnings, os
import urllib.request
from datetime import date, timedelta
import numpy as np
import pandas as pd
import geopandas as gpd
from multiprocessing import Pool

# the train test split date is used to split each time series into train and test sets
train_test_split_date = date.today() - timedelta(days = 30)

# the sampling frequency determines the number of hours per sample
# and is used for aggregating and filling missing values
frequency = '1'

# prediction length is how many hours into future to predict values for
prediction_length = 48

# context length is how many prior time steps the predictor needs to make a prediction
context_length = 3

warnings.filterwarnings('ignore')

session = boto3.Session()
sagemaker_session = sagemaker.Session()
region = session.region_name
account = session.client('sts').get_caller_identity().get('Account')
bucket_name = f"{account_id}-openaq-lab"
athena_s3_staging_dir = f's3://{bucket_name}/athena/results/'

s3 = boto3.client('s3')

# @todo to evaluate whether we should store existing model.tar.gz onto s3 bucket.

# processing Athena
def athena_query_table(query_file, wait=None):
    results_uri = athena_execute(query_file, 'csv', wait)
    return results_uri

def athena_execute(query_file, ext, wait):
    with open(query_file) as f:
        query_str = f.read()  
        
    athena = boto3.client('athena')
    s3_dest = athena_s3_staging_dir
    query_id = athena.start_query_execution(
        QueryString= query_str, 
         ResultConfiguration={'OutputLocation': athena_s3_staging_dir}
    )['QueryExecutionId']
        
    results_uri = f'{athena_s3_staging_dir}{query_id}.{ext}'
        
    start = time.time()
    while wait == None or wait == 0 or time.time() - start < wait:
        result = athena.get_query_execution(QueryExecutionId=query_id)
        status = result['QueryExecution']['Status']['State']
        if wait == 0 or status == 'SUCCEEDED':
            break
        elif status in ['QUEUED','RUNNING']:
            continue
        else:
            raise Exception(f'query {query_id} failed with status {status}')

            time.sleep(3) 

    return results_uri       

def get_sydney_openaq_data(sql_query_file_path = "/opt/ml/processing/sql/sydney.dml"):
    query_results_uri = athena_query_table(sql_query_file_path)
    print (f'reading {query_results_uri}')
    raw = pd.read_csv(query_results_uri, parse_dates=['timestamp'])
    return raw

def featurize(raw):

    def fill_missing_hours(df):
        df = df.reset_index(level=categorical_levels, drop=True)                                    
        index = pd.date_range(df.index.min(), df.index.max(), freq='1H')
        return df.reindex(pd.Index(index, name='timestamp'))

    # Sort and index by location and time
    categorical_levels = ['country', 'city', 'location', 'parameter']
    index_levels = categorical_levels + ['timestamp']
    indexed = raw.sort_values(index_levels, ascending=True)
    indexed = indexed.set_index(index_levels)
    # indexed.head()    
    
    # Downsample to hourly samples by maximum value
    downsampled = indexed.groupby(categorical_levels + [pd.Grouper(level='timestamp', freq='1H')]).max()

    # Back fill missing values
    filled = downsampled.groupby(level=categorical_levels).apply(fill_missing_hours)
    filled[filled['value'].isnull()].groupby('location').count().describe()
    
    filled['value'] = filled['value'].interpolate().round(2)
    filled['point_latitude'] = filled['point_latitude'].fillna(method='pad')
    filled['point_longitude'] = filled['point_longitude'].fillna(method='pad')

    # Create Features
    aggregated = filled.reset_index(level=4)\
        .groupby(level=categorical_levels)\
        .agg(dict(timestamp='first', value=list, point_latitude='first', point_longitude='first'))\
        .rename(columns=dict(timestamp='start', value='target'))    
    aggregated['id'] = np.arange(len(aggregated))
    aggregated.reset_index(inplace=True)
    aggregated.set_index(['id']+categorical_levels, inplace=True)
    
    metadata = gpd.GeoDataFrame(
        aggregated.drop(columns=['target','start']), 
        geometry=gpd.points_from_xy(aggregated.point_longitude, aggregated.point_latitude), 
        crs={"init":"EPSG:4326"}
    )
    metadata.drop(columns=['point_longitude', 'point_latitude'], inplace=True)
    # set geometry index
    metadata.set_geometry('geometry')

    # Add Categorical features
    level_ids = [level+'_id' for level in categorical_levels]
    for l in level_ids:
        aggregated[l], index = pd.factorize(aggregated.index.get_level_values(l[:-3]))

    aggregated['cat'] = aggregated.apply(lambda columns: [columns[l] for l in level_ids], axis=1)
    features = aggregated.drop(columns=level_ids+ ['point_longitude', 'point_latitude'])
    features.reset_index(level=categorical_levels, inplace=True, drop=True)
    
    return features

def filter_dates(df, min_time, max_time, frequency):
    min_time = None if min_time is None else pd.to_datetime(min_time)
    max_time = None if max_time is None else pd.to_datetime(max_time)
    interval = pd.Timedelta(frequency)
    
    def _filter_dates(r): 
        if min_time is not None and r['start'] < min_time:
            start_idx = int((min_time - r['start']) / interval)
            r['target'] = r['target'][start_idx:]
            r['start'] = min_time
        
        end_time = r['start'] + len(r['target']) * interval
        if max_time is not None and end_time > max_time:
            end_idx = int((end_time - max_time) / interval)
            r['target'] = r['target'][:-end_idx]
            
        return r
    
    filtered = df.apply(_filter_dates, axis=1) 
    filtered = filtered[filtered['target'].str.len() > 0]
    return filtered

def split_train_test_data(features, days = 30):
    train_test_split_date = date.today() - timedelta(days = days)
    train = filter_dates(features, None, train_test_split_date, '1H')
    test = filter_dates(features, train_test_split_date, None, '1H')
    return train, test

raw = get_sydney_openaq_data()
features = featurize(raw)
train, test = split_train_test_data(features)

# upload dataset to S3.
local_data_path = 'data'
os.makedirs(local_data_path, exist_ok = True)
train.to_json(f'{local_data_path}/train.json', orient='records', lines = True)
train_data_uri = sagemaker_session.upload_data(path=f'{local_data_path}/train.json', key_prefix = 'preprocessing_data')

test.to_json(f'{local_data_path}/test.json', orient='records', lines = True) 
test_data_uri =  sagemaker_session.upload_data(path=f'{local_data_path}/test.json', key_prefix = 'preprocessing_data')

features.to_json(f'{local_data_path}/all_data.json', orient='records', lines = True)
all_data_uri = sagemaker_session.upload_data(path=f'{local_data_path}/all_data.json', key_prefix = 'preprocessing_data')


Upload the pre processing script.

In [16]:
PREPROCESSING_SCRIPT_LOCATION = "preprocessing.py"

input_code = sagemaker_session.upload_data(
    PREPROCESSING_SCRIPT_LOCATION,
    bucket = sagemaker_session.default_bucket(),
    key_prefix = "processing/code"
)

S3 locations of preprocessing output and training data.

The `ScriptProcessor` class lets you run a command inside the container, which you can use to run your own script.

In [None]:
from sagemaker.processing import ScriptProcessor

preprocessing_processor = ScriptProcessor(
    command = ['python3'],
    image_uri = processing_repository_uri,
    role = role,
    instance_count = 1,
    instance_type = 'ml.m5.xlarge'
)