# BYOA Tutorial - Prophet Forecasting en Sagemaker
La siguiente notebook muestra como integrar algoritmos propios a Amazon Sagemaker.
Vamos a recorrer el camino para armar un pipeline de inferencia sobre el algoritmo Prophet para series de tiempo.
El algoritmo se instala en un container de docker y luego nos sirve para hacer entrenamientos del modelo e inferencias en un endpoint.


## Paso 1: Armado del dataset
Vamos a trabajar con un dataset publico que debemos bajar de Kaggle.
Dicho dataset se denomina:
_Avocado Prices: Historical data on avocado prices and sales volume in multiple US markets_
y puede bajarse de: https://www.kaggle.com/neuromusic/avocado-prices/download
Una vez bajado, debemos subirlo al mismo directorio donde estamos ejecutando esta notebook.
El siguiente codigo prepara el dataset para que pueda entenderlo Prophet:

In [1]:
import pandas as pd

# Nos quedamos solo con la fecha y las ventas
df = pd.read_csv('avocado.csv')
df = df[['Date', 'AveragePrice']].dropna()

df['Date'] = pd.to_datetime(df['Date'])
df = df.set_index('Date')

# Dejamos 1 solo registro por día con el promedio de ventas
daily_df = df.resample('D').mean()
d_df = daily_df.reset_index().dropna()

# Formateamos los nombre de columnas como los espera Prophet
d_df = d_df[['Date', 'AveragePrice']]
d_df.columns = ['ds', 'y']
d_df.head()

# Guardamos el dataset resultante como avocado_daily.csv
d_df.to_csv("avocado_daily.csv",index = False , columns = ['ds', 'y'] )

# Paso 2: Empaquetar y subir el algoritmo para usarlo con Amazon SageMaker

### Una visión general de Docker

Docker proporciona una forma simple de empaquetar código en un _image_ que es totalmente autónomo. Una vez que tenga una imagen, puede usar Docker para ejecutar un _container_ basado en esa imagen. Ejecutar un contenedor es igual que ejecutar un programa en la máquina, excepto que el contenedor crea un entorno totalmente autónomo para que el programa se ejecute. Los contenedores están aislados entre sí y del entorno host, por lo que la forma en que configura el programa es la forma en que se ejecuta, sin importar dónde lo ejecute.

Docker es más poderoso que los administradores de entorno como conda o virtualenv porque (a) es completamente independiente del lenguaje y (b) comprende todo su entorno operativo, incluidos los comandos de inicio, las variables de entorno, etc.

De alguna manera, un contenedor Docker es como una máquina virtual, pero es mucho más ligero. Por ejemplo, un programa que se ejecuta en un contenedor puede iniciarse en menos de un segundo y muchos contenedores pueden ejecutarse en la misma máquina física o instancia de máquina virtual.

Docker utiliza un archivo simple llamado `Dockerfile` para especificar cómo se ensambla la imagen.
Amazon SagMaker utiliza Docker para permitir a los usuarios entrenar e implementar algoritmos.

En Amazon SageMaker, los contenedores Docker se invocan de cierta manera para el entrenamiento y de una forma ligeramente diferente para el hosting. En las siguientes secciones se describe cómo crear contenedores para el entorno de SageMaker.


### Cómo Amazon SageMaker ejecuta el contenedor Docker

Debido a que puede ejecutar la misma imagen en formación o hosting, Amazon SageMaker ejecuta el contenedor con el argumento `train` o `serve`. La forma en que su contenedor procesa este argumento depende del contenedor:

* En el ejemplo aquí, no definimos un `ENTRYPOINT `en el Dockerfile para que Docker ejecute el comando `train` en tiempo de entrenamiento y `serve` en tiempo de servicio. En este ejemplo, los definimos como scripts ejecutables de Python, pero podrían ser cualquier programa que queramos iniciar en ese entorno.
* Si especifica un programa como «ENTRYPOINT» en el archivo Dockerfile, ese programa se ejecutará al inicio y su primer argumento será `train` o `serve`. El programa puede entonces examinar ese argumento y decidir qué hacer.
* Si está construyendo contenedores separados para entrenamiento y hosting (o construyendo solo para uno u otro), puede definir un programa como «ENTRYPOINT» en el archivo Dockerfile e ignorar (o verificar) el primer argumento pasado. 

#### Ejecutar el contenedor durante el entrenamiento

Cuando Amazon SageMaker ejecuta el entrenamiento, el script `train` se ejecuta como un programa normal de Python. Una serie de archivos están dispuestos para su uso, bajo el directorio `/opt/ml`:

    /opt/ml
    ├── input
    │   ├── config
    │   │   ├── hyperparameters.json
    │   │   └── resourceConfig.json
    │   └── data
    │       └── <channel_name>
    │           └── <input data>
    ├── model
    │   └── <model files>
    └── output
        └── failure

##### La entrada

* `/opt/ml/input/config` contiene información para controlar cómo se ejecuta el programa. `hyperparameters.json` es un diccionario con formato JSON de nombres de hiperparámetros a valores. Estos valores siempre serán cadenas, por lo que es posible que deba convertirlos. `ResourceConfig.json` es un archivo con formato JSON que describe el diseño de red utilizado para la formación distribuida. Dado que scikit-learn no admite entrenamiento distribuido, lo ignoraremos aquí.
* `/opt/ml/input/data/<channel_name>/` (para el modo Archivo) contiene los datos de entrada para ese canal. Los canales se crean en función de la llamada a CreateTrainingJob, pero generalmente es importante que los canales coincidan con lo que el algoritmo espera. Los archivos de cada canal se copiarán de S3 a este directorio, preservando la estructura de árbol indicada por la estructura de clave S3. 
* `/opt/ml/input/data/<channel_name>_<epoch_number>`(para el modo Pipe) es la tubería para una época determinada. Las épocas comienzan en cero y suben por uno cada vez que las lees. No hay límite en el número de épocas que puede ejecutar, pero debe cerrar cada tubería antes de leer la siguiente época.
    
##### La salida

* `/opt/ml/model/` es el directorio donde se escribe el modelo que genera su algoritmo. Su modelo puede estar en cualquier formato que desee. Puede ser un solo archivo o un árbol de directorios completo. SagMaker empaquetará cualquier archivo de este directorio en un archivo comprimido tar. Este archivo estará disponible en la ubicación S3 devuelta en el resultado `DescribeTrainingJob`.
* `/opt/ml/output` es un directorio donde el algoritmo puede escribir un archivo `failure` que describe por qué el trabajo falló. El contenido de este archivo se devolverá en el campo `FailureReason` del resultado `DescribeTrainingJob`. Para los trabajos que tienen éxito, no hay razón para escribir este archivo, ya que se ignorará.

#### Ejecutando el contenedor durante el hosting

El hosting tiene un modelo muy diferente al de entrenamiento porque debe responder a las solicitudes de inferencia que llegan a través de HTTP. En este ejemplo, utilizamos codigo escrito en Python recomendado para proporcionar un servicio robusto y escalable de solicitudes de inferencia:

Amazon SagMaker utiliza dos URL en el contenedor:

* `/ping` recibirá solicitudes `GET` de la infraestructura. Devuelve 200 si el contenedor está abierto y acepta solicitudes.
* `/invocations` es el punto final que recibe solicitudes `POST` de inferencia del cliente. El formato de la solicitud y la respuesta depende del algoritmo. Si el cliente suministró los encabezados `ContentType` y `Accept`, éstos también se pasarán. 

El contenedor tendrá los archivos de modelo en el mismo lugar en el que se escribieron durante el entrenamiento:

    /opt/ml
    └── model
        └── <model files>


### Partes del Container

En el directorio `container` están todos los componentes que necesita para empaquetar el algoritmo de muestra para Amazon SageManager:

    .
    ├── Dockerfile
    ├── build_and_push.sh
    └── decision_trees
        ├── nginx.conf
        ├── predictor.py
        ├── serve
        ├── train
        └── wsgi.py


Vamos a ver cada uno:

* __`Dockerfile`__ describe cómo construir la imagen de contenedor Docker. Más detalles a continuación.
* __`build_and_push.sh`__ es un script que utiliza Dockerfile para construir sus imágenes de contenedor y luego lo publica (push) a ECR. Invocaremos los comandos directamente más adelante en este notebook, pero se puede copiar y ejecutar el script para otros algoritmos.
* __`prophet`__ es el directorio que contiene los archivos que se instalarán en el contenedor.
* __`local_test`__ es un directorio que muestra cómo probar el nuevo contenedor en cualquier equipo que pueda ejecutar Docker, incluida una Notebook Instance de Amazon SageMaker. Con este método, puede iterar rápidamente utilizando pequeños conjuntos de datos para eliminar cualquier error estructural antes de utilizar el contenedor con Amazon SageMaker. 

Los archivos que vamos a poner en el contenedor son:

* __`nginx.conf`__ es el archivo de configuración para el front-end nginx. Generalmente, debería poder tomar este archivo tal como está.
* __`predictor.py`__ es el programa que realmente implementa el servidor web Flask y las predicciones de Prophet para esta aplicación. 
* __`serve`__ es el programa iniciado cuando se inicia el contenedor para hosting. Simplemente lanza el servidor gunicorn que ejecuta múltiples instancias de la aplicación Flask definida en `predictor.py`. Debería poder tomar este archivo tal como está.
* __`train`__ es el programa que se invoca cuando se ejecuta el contenedor para el entrenamiento. 
* __`wsgi.py`__ es un pequeño envoltorio utilizado para invocar la aplicación Flask. Debería poder tomar este archivo tal como está.

En resumen, los dos archivos con codigo especifico de Prophet son `train` y `predictor.py`.

### El archivo Dockerfile

El archivo Dockerfile describe la imagen que queremos crear. Es una descripción de la instalación completa del sistema operativo del sistema que desea ejecutar. Un contenedor Docker que se ejecuta es bastante más ligero que un sistema operativo completo, sin embargo, porque aprovecha Linux en la máquina host para las operaciones básicas. 

Para este ejemplo, comenzaremos desde una instalación estándar de Ubuntu y ejecutaremos las herramientas normales para instalar las cosas que necesita Prophet. Finalmente, agregamos el código que implementa Prophet al contenedor y configuramos el entorno correcto para que se ejecute correctamente.

El siguiente es el Dockerfile:

In [2]:
!cat container/Dockerfile

# Build an image that can do training and inference in SageMaker
# This is a Python 3 image that uses the nginx, gunicorn, flask stack
# for serving inferences in a stable way.

FROM ubuntu:16.04

MAINTAINER Amazon AI <sage-learner@amazon.com>

RUN apt-get -y update && apt-get install -y --no-install-recommends \
         wget \
         curl \
         python-dev \
         build-essential libssl-dev libffi-dev \
         libxml2-dev libxslt1-dev zlib1g-dev \
         nginx \
         ca-certificates \
    && rm -rf /var/lib/apt/lists/*

RUN curl -fSsL -O https://bootstrap.pypa.io/get-pip.py && \
    python get-pip.py && \
    rm get-pip.py
 
RUN pip --no-cache-dir install \
        numpy \
        scipy \
        sklearn \
        pandas \
        flask \
        gevent \
        gunicorn \
        pystan 

RUN pip --no-cache-dir install \
        fbprophet 
        
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

# Set up the program in th

### El archivo train

El archivo train describe la forma en la que vamos a realizar el entrenamiento.
El archivo Prophet-Docker/container/prophet/train contiene el codigo especifico de entrenamiento para Prophet.
Debemos modificar la funcion train() de la siguiente manera:

    def train():
        print('Starting the training.')
        try:
            # Read in any hyperparameters that the user passed with the training job
            with open(param_path, 'r') as tc:
                trainingParams = json.load(tc)
            # Take the set of files and read them all into a single pandas dataframe
            input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
            if len(input_files) == 0:
                raise ValueError(('There are no files in {}.\n' +
                                  'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                                  'the data specification in S3 was incorrectly specified or the role specified\n' +
                                  'does not have permission to access the data.').format(training_path, channel_name))
            raw_data = [ pd.read_csv(file, error_bad_lines=False ) for file in input_files ]
            train_data = pd.concat(raw_data)
            train_data.columns = ['ds', 'y']

            # Usamos Prophet para entrenar el modelo.
            clf = Prophet()
            clf = clf.fit(train_data)

            # save the model
            with open(os.path.join(model_path, 'prophet-model.pkl'), 'w') as out:
                pickle.dump(clf, out)
            print('Training complete.')


### El archivo predictor.py

El archivo predictor.py describe la forma en la que vamos a realizar las predicciones.
El archivo Prophet-Docker/container/prophet/predictor.py contiene el codigo especifico de prediccion para Prophet.
Debemos modificar la funcion predict() de la siguiente manera:

    def predict(cls, input):
        """For the input, do the predictions and return them.

        Args:
            input (a pandas dataframe): The data on which to do the predictions. There will be
                one prediction per row in the dataframe"""
        clf = cls.get_model()
        future = clf.make_future_dataframe(periods=int(input.iloc[0]))
        print(int(input.iloc[0]))
        print(input)
        forecast = clf.predict(future)
              
        return forecast.tail(int(input.iloc[0]))


Y luego la funcion transformation() de la siguiente manera: 

    def transformation():
        """Do an inference on a single batch of data. In this sample server, we take data as CSV, convert
        it to a pandas data frame for internal use and then convert the predictions back to CSV (which really
        just means one prediction per line, since there's a single column.
        """
        data = None

        # Convert from CSV to pandas
        if flask.request.content_type == 'text/csv':
            data = flask.request.data.decode('utf-8')
            s = StringIO.StringIO(data)
            data = pd.read_csv(s, header=None)
        else:
            return flask.Response(response='This predictor only supports CSV data', status=415, mimetype='text/plain')

        print('Invoked with {} records'.format(data.shape[0]))

        # Do the prediction
        predictions = ScoringService.predict(data)

        # Convert from numpy back to CSV
        out = StringIO.StringIO()
        pd.DataFrame({'results':[predictions]}, index=[0]).to_csv(out, header=False, index=False)
        result = out.getvalue()

        return flask.Response(response=result, status=200, mimetype='text/csv')
 

Basicamente modificamos la linea:

        pd.DataFrame({'results':predictions}).to_csv(out, header=False, index=False)
 
Por la linea:

        pd.DataFrame({'results':[predictions]}, index=[0]).to_csv(out, header=False, index=False)


# Parte 3: Uso de Prophet en Amazon SageMaker
Ahora que tenemos todos los archivos creados, vamos a utilizar Prophet en Sagemaker

## Armado del Container
Empezamos construyendo y registrando el container

In [3]:
%%time
%%sh

# The name of our algorithm
algorithm_name=sagemaker-prophet

cd container

chmod +x prophet/train
chmod +x prophet/serve

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build  -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}

Login Succeeded
Sending build context to Docker daemon  63.49kB
Step 1/11 : FROM ubuntu:16.04
 ---> c6a43cd4801e
Step 2/11 : MAINTAINER Amazon AI <sage-learner@amazon.com>
 ---> Using cache
 ---> c0ea7ed783e7
Step 3/11 : RUN apt-get -y update && apt-get install -y --no-install-recommends          wget          curl          python-dev          build-essential libssl-dev libffi-dev          libxml2-dev libxslt1-dev zlib1g-dev          nginx          ca-certificates     && rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> 17bd5ae1900b
Step 4/11 : RUN curl -fSsL -O https://bootstrap.pypa.io/get-pip.py &&     python get-pip.py &&     rm get-pip.py
 ---> Using cache
 ---> e1f1939e31e1
Step 5/11 : RUN pip --no-cache-dir install         numpy         scipy         sklearn         pandas         flask         gevent         gunicorn         pystan
 ---> Using cache
 ---> 8ff73a969fc2
Step 6/11 : RUN pip --no-cache-dir install         fbprophet
 ---> Using cache
 ---> 815dc3862860
Step 7/11 :

https://docs.docker.com/engine/reference/commandline/login/#credentials-store



CPU times: user 9.27 ms, sys: 594 µs, total: 9.87 ms
Wall time: 2.64 s


## Armado del Entorno de Entrenamiento
Inicializamos la sesion, rol de ejecucion. 

In [4]:
%%time
import boto3
import re

import os
import numpy as np
import pandas as pd
from sagemaker import get_execution_role

import sagemaker as sage
from time import gmtime, strftime


prefix = 'DEMO-prophet-byo'
role = get_execution_role()
sess = sage.Session()


CPU times: user 408 ms, sys: 40.3 ms, total: 448 ms
Wall time: 503 ms


# Subimos los datos a S3

In [5]:
WORK_DIRECTORY = 'data'
data_location = sess.upload_data(WORK_DIRECTORY, key_prefix=prefix)

## Entrenamos el modelo
Utilizando los datos subidos a S3, entrenamos el modelo levantando una instancia ml.c4.2xlarge. 
Sagemaker va a dejar el modelo entrenado en el directorio /output

In [6]:
%%time

account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name
image = '{}.dkr.ecr.{}.amazonaws.com/sagemaker-prophet:latest'.format(account, region)

tseries = sage.estimator.Estimator(image,
                       role, 
                        1, 
                        'ml.c4.2xlarge',
                       output_path="s3://{}/output".format(sess.default_bucket()),
                       sagemaker_session=sess)

tseries.fit(data_location)

2019-12-27 16:00:08 Starting - Starting the training job...
2019-12-27 16:00:09 Starting - Launching requested ML instances......
2019-12-27 16:01:13 Starting - Preparing the instances for training...
2019-12-27 16:01:58 Downloading - Downloading input data
2019-12-27 16:01:58 Training - Downloading the training image...
2019-12-27 16:02:34 Training - Training image download completed. Training in progress..[34mINFO:matplotlib.font_manager:font search path ['/usr/local/lib/python2.7/dist-packages/matplotlib/mpl-data/fonts/ttf', '/usr/local/lib/python2.7/dist-packages/matplotlib/mpl-data/fonts/afm', '/usr/local/lib/python2.7/dist-packages/matplotlib/mpl-data/fonts/pdfcorefonts'][0m
[34mINFO:matplotlib.font_manager:generated new fontManager[0m
[34mERROR:fbprophet:Importing matplotlib failed. Plotting will not work.[0m
[34mERROR:fbprophet:Importing plotly failed. Interactive plots will not work.[0m
[34mStarting the training.[0m
[34mINFO:fbprophet:Disabling weekly seasonality. R

## Armado de endpoint para inferencia
Utilizando el modelo recien entrenado, creamos un endpoint para inferencia hosteado en una instancia ml.c4.2xlarge

In [7]:
%%time

from sagemaker.predictor import csv_serializer
predictor = tseries.deploy(1, 'ml.m4.xlarge', serializer=csv_serializer)

---------------------------------------------------------------------------------------------------------------!CPU times: user 518 ms, sys: 39.5 ms, total: 557 ms
Wall time: 9min 20s


## Prueba de inferencia
Finalmente le pedimos al modelo que nos prediga las ventas de los proximos 30 dias.

In [8]:
%%time
p = predictor.predict("30")
print(p)

b'"            ds     trend  trend_lower  ...  yearly_lower  yearly_upper      yhat\n169 2018-03-26  1.473312     1.473312  ...     -0.076117     -0.076117  1.397195\n170 2018-03-27  1.472971     1.472971  ...     -0.072531     -0.072531  1.400440\n171 2018-03-28  1.472631     1.472631  ...     -0.068829     -0.068829  1.403802\n172 2018-03-29  1.472291     1.472291  ...     -0.065070     -0.065070  1.407221\n173 2018-03-30  1.471950     1.471950  ...     -0.061313     -0.061313  1.410637\n174 2018-03-31  1.471610     1.471610  ...     -0.057619     -0.057619  1.413991\n175 2018-04-01  1.471270     1.471270  ...     -0.054048     -0.054048  1.417222\n176 2018-04-02  1.470929     1.470929  ...     -0.050657     -0.050657  1.420273\n177 2018-04-03  1.470589     1.470589  ...     -0.047500     -0.047500  1.423089\n178 2018-04-04  1.470248     1.470241  ...     -0.044627     -0.044627  1.425622\n179 2018-04-05  1.469908     1.469861  ...     -0.042080     -0.042080  1.427828\n180 2018-04-0