# Abalone CI/CD Model Example

>__NOTE:__ This Notebook uses the _Python 3 (Data Science)_ Kernel.

## Configuring the Model Preproceesing, Training and Evaluation Script

### Setup

In [None]:
%%writefile model.py
import os
import sys
import json
import re
import traceback
import pathlib
import tarfile
import tensorflow as tf
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn import preprocessing
from sklearn.metrics import mean_squared_error
tf.get_logger().setLevel("ERROR")


prefix = "/opt/ml"
processing_path = os.path.join(prefix, "processing")
preprocessing_input_path = os.path.join(processing_path, "input/data")
preprocessing_output_path = os.path.join(processing_path, "output")
training_input_path = os.path.join(prefix, "input/data")
evaluation_input_path = os.path.join(processing_path, "input")
evaluation_output_path = os.path.join(processing_path, "output/evaluation")
output_path = os.path.join(prefix, "output")
model_path = os.path.join(prefix, "model")
param_path = os.path.join(prefix, "input/config/hyperparameters.json")

### Preprocessing Function

In [None]:
%%writefile -a model.py


def preprocess():
    print("Preprocessing mode")
    column_names = ["sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", "rings"]
    try:
        print("Loading 'raw' data")
        abalone_data = pd.read_csv(os.path.join(preprocessing_input_path, "abalone.data"), names=column_names)
        data = abalone_data[["rings", "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight"]]
        y = data.rings.values.reshape(len(data), 1)
        del data["rings"]
        data = pd.get_dummies(data).to_numpy()
        X = np.concatenate((y, data), axis=1)
        training, validation, testing = np.split(X, [int(.8*len(X)), int(.95*len(X))])
        pd.DataFrame(training).to_csv(os.path.join(preprocessing_output_path, "training/training.csv"), header=False, index=False)
        pd.DataFrame(validation).to_csv(os.path.join(preprocessing_output_path, "training/validation.csv"), header=False, index=False)
        pd.DataFrame(testing).to_csv(os.path.join(preprocessing_output_path, "testing/testing.csv"), header=False, index=False)
        
    except Exception as e:
        trc = traceback.format_exc()
        with open(os.path.join(output_path, "failure"), "w") as f:
            f.write("Exception during preprocessing: {}".format(str(e)+'\\n'+trc))
        print("Exception during preprocessing: {}".format(str(e)+'\\n'+trc), file=sys.stderr)
        sys.exit(255)

### Training Function

In [None]:
%%writefile -a model.py


def train():
    print("Training mode")
    try:
        channel_name = "training"
        training_path = os.path.join(training_input_path, channel_name)
        params = {}
        with open(param_path, "r") as f:
            is_float = re.compile(r'^\d+(?:\.\d+)$')
            is_integer = re.compile(r'^\d+$')
            for key,value in json.load(f).items():
                if is_float.match(value) is not None:
                    value = float(value)
                elif is_integer.match(value) is not None:
                    value = int(value)
                params[key] = value

        input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
        if len(input_files) == 0:
            raise ValueError((f"There are no files in {training_path}.\\n" +
                              f"This usually indicates that the channel ({channel_name}) was incorrectly specified,\\n" +
                              "the data specification in S3 was incorrectly specified or the role specified\\n" +
                              "does not have permission to access the data."))
        column_names = ["rings", "length", "diameter", "height", "whole weight", "shucked_weight", "viscera_weight", "shell_weight", "sex_F", "sex_I", "sex_M"]
        train_data = pd.read_csv(os.path.join(training_path, "training.csv"), sep=',', names=column_names)
        val_data = pd.read_csv(os.path.join(training_path, "validation.csv"), sep=',', names=column_names)
        train_y = train_data["rings"].to_numpy()
        train_X = train_data.drop(["rings"], axis=1).to_numpy()
        val_y = val_data["rings"].to_numpy()
        val_X = val_data.drop(["rings"], axis=1).to_numpy()
        train_X = preprocessing.normalize(train_X)
        val_X = preprocessing.normalize(val_X)
        network_layers = [
            Dense(64, activation="relu", kernel_initializer="normal", input_dim=10),
            Dense(64, activation="relu"),
            Dense(1, activation="linear")
        ]
        model = Sequential(network_layers)
        model.compile(optimizer="adam", loss="mse", metrics=["mae", "accuracy"])
        model.summary()
        model.fit(train_X, train_y, validation_data=(val_X, val_y),
                  batch_size=params.get("batch_size"), epochs=params.get("epochs"),
                  shuffle=True, verbose=1
        )
        print("Saving Model")
        model.save(filepath=os.path.join(model_path, "model.h5"), overwrite=True, include_optimizer=False, save_format="h5")

    except Exception as e:
        trc = traceback.format_exc()
        with open(os.path.join(output_path, "failure"), "w") as f:
            f.write("Exception during training: {}".format(str(e) + '\\n' + trc))
        print("Exception during training: {}".format(str(e) + '\\n' + trc), file=sys.stderr)
        sys.exit(255)

### Evaluation Function

In [None]:
%%writefile -a model.py


def load_model():
    print("Load Pre-Trained Model")
    model_path = os.path.join(evaluation_input_path, "model/model.tar.gz")
    with tarfile.open(model_path) as tar_file:
        tar_file.extractall(".")
    model = tf.keras.models.load_model("model.h5")
    model.compile(optimizer="adam", loss="mse")
    return model


def save_report(rmse, mse):
    print("Saving Evaluation Report")
    report = {
        'regression_metrics': {
            'rmse': {
                'value': rmse
            },
            'mse': {
                'value': mse,
            },
        },
    }
    
    pathlib.Path(evaluation_output_path).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{evaluation_output_path}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report))


def evaluate():
    try:
        print("Evaluation mode")
        model = load_model()
        truths = []
        predictions = []
        column_names = ["rings", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", "sex_F", "sex_I", "sex_M"]
        data_path = os.path.join(evaluation_input_path, "data/testing.csv")
        data = pd.read_csv(data_path, names=column_names)
        y = data["rings"].to_numpy()
        X = data.drop(["rings"], axis=1).to_numpy()
        X = preprocessing.normalize(X)
        for row in range(len(X)):
            payload = [X[row].tolist()]
            result = model.predict(payload)
            print(f"Result: {result[0][0]}")
            predictions.append(float(result[0][0]))
            truths.append(float(y[row]))
        mse = mean_squared_error(truths, predictions)
        print(f"Mean Squared Error: {mse}")
        rmse = mean_squared_error(truths, predictions, squared=False)
        print(f"Root Mean Squared Error: {rmse}")
        save_report(rmse, mse)
        
    except Exception as e:
        trc = traceback.format_exc()
        with open(os.path.join(output_path, "failure"), "w") as f:
            f.write("Exception during evaluation: {}".format(str(e) + '\\n' + trc))
        print("Exception during evaluation: {}".format(str(e) + '\\n' + trc), file=sys.stderr)
        sys.exit(255)

---

## Create the Application

### Container entrypoint

In [None]:
%%writefile app.py
#!/usr/bin/env python

import json
import io
import sys
import os
import signal
import traceback
import flask
import multiprocessing
import subprocess
import tarfile
import model
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn import preprocessing


prefix = "/opt/ml"
model_path = os.path.join(prefix, "model")
sys.path.insert(0,model_path)
model_cache = {}

class PredictionService(object):
    tf_model = None
    @classmethod
    def get_model(cls):
        if cls.tf_model is None:
            cls.tf_model = load_model()
        return cls.tf_model

    @classmethod
    def predict(cls, input):
        tf_model = cls.get_model()
        return tf_model.predict(input)

def load_model():
    model = tf.keras.models.load_model(os.path.join(model_path, "model.h5"))
    model.compile(optimizer="adam", loss="mse")
    return model

def sigterm_handler(nginx_pid, gunicorn_pid):
    try:
        os.kill(nginx_pid, signal.SIGQUIT)
    except OSError:
        pass
    try:
        os.kill(gunicorn_pid, signal.SIGTERM)
    except OSError:
        pass

    sys.exit(0)

def start_server(timeout, workers):
    print(f"Starting the inference server with {model_server_workers} workers")
    subprocess.check_call(["ln", "-sf", "/dev/stdout", "/var/log/nginx/access.log"])
    subprocess.check_call(["ln", "-sf", "/dev/stderr", "/var/log/nginx/error.log"])
    nginx = subprocess.Popen(["nginx", "-c", "/opt/program/nginx.conf"])
    gunicorn = subprocess.Popen(["gunicorn",
                                 "--timeout", str(timeout),
                                 "-k", "gevent",
                                 "-b", "unix:/tmp/gunicorn.sock",
                                 "-w", str(workers),
                                 "wsgi:app"])

    signal.signal(signal.SIGTERM, lambda a, b: sigterm_handler(nginx.pid, gunicorn.pid))
    pids = set([nginx.pid, gunicorn.pid])
    while True:
        pid, _ = os.wait()
        if pid in pids:
            break
    sigterm_handler(nginx.pid, gunicorn.pid)
    print("Inference server exiting")


app = flask.Flask(__name__)


@app.route("/ping", methods=["GET"])
def ping():
    health = PredictionService.get_model() is not None
    status = 200 if health else 404
    return flask.Response(response="\n", status=status, mimetype="application/json")


@app.route("/invocations", methods=["POST"])
def invoke():
    data = None
    if flask.request.content_type == "text/csv":
        payload = np.fromstring(flask.request.data.decode('utf-8'), sep=",")
        data = payload.reshape(1, -1)
    else:
        return flask.Response(response="Invalid request data type, only 'text/csv' is supported.", status=415, mimetype="text/plain")
    predictions = PredictionService.predict(data)
    out = io.StringIO()
    pd.DataFrame({"results": predictions.flatten()}).to_csv(out, header=False, index=False)
    result = out.getvalue()
    print(f"Prediction Result: {result}")
    return flask.Response(response=result, status=200, mimetype="text/csv")


if __name__ == "__main__":
    print(f"Tensorflow Version: {tf.__version__}")
    if len(sys.argv) < 2 or ( not sys.argv[1] in [ "serve", "train", "preprocess", "evaluate"] ):
        raise Exception("Invalid argument: you must specify 'train' for training mode, 'serve' for predicting mode, 'preprocess' for preprocessing mode or 'evaluate' for evaluation mode.") 
    preprocess = sys.argv[1] == "preprocess"
    train = sys.argv[1] == "train"
    evaluate = sys.argv[1] == "evaluate"
    if preprocess:
        model.preprocess()
    elif train:
        model.train()
    elif evaluate:
        model.evaluate()
    else:
        cpu_count = multiprocessing.cpu_count()
        model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 60)
        model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))
        start_server(model_server_timeout, model_server_workers)

### Nginx Configuration

In [None]:
%%writefile nginx.conf
worker_processes 1;
daemon off;

pid /tmp/nginx.pid;
error_log /var/log/nginx/error.log;

events {

}

http {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /var/log/nginx/access.log combined;
  
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }

  server {

    listen 8080 deferred;
    client_max_body_size 5m;

    keepalive_timeout 5;

    location ~ ^/(ping|invocations) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_pass http://gunicorn;
    }

    location / {
      return 404 "{}";
    }
  }
}

### Web Server Gateay Application

In [None]:
%%writefile wsgi.py
import app as myapp
app = myapp.app

---

## Create the Dockerfile

In [None]:
%%writefile Dockerfile
ARG REGION
FROM 763104351884.dkr.ecr.${REGION}.amazonaws.com/tensorflow-training:2.5.0-cpu-py37-ubuntu18.04
RUN apt-get update && apt-get install -y --no-install-recommends \
    nginx &&\
    rm -rf /var/lib/apt/lists/*
RUN pip install --upgrade pip
RUN pip install --no-cache-dir --upgrade \
    flask \
    gevent \
    gunicorn
RUN mkdir -p /opt/program
RUN mkdir -p /opt/ml
COPY app.py /opt/program
COPY model.py /opt/program
COPY nginx.conf /opt/program
COPY wsgi.py /opt/program
WORKDIR /opt/program
EXPOSE 8080
ENTRYPOINT ["python", "app.py"]