In [1]:
%%capture
import requests
from azureml.core.model import Model
from azureml.core.workspace import Workspace
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.environment import Environment
from azureml.core.model import InferenceConfig
from azureml.core.webservice import LocalWebservice
from azureml.core.webservice import AciWebservice, Webservice
from azureml.exceptions import WebserviceException
from azureml.core.authentication import InteractiveLoginAuthentication

In [48]:
%%writefile scoring_service.py
import json
import psycopg2
import joblib
import tensorflow as tf
from tensorflow.python import keras
import numpy as np
import pandas as pd
from datetime import timedelta
from dateutil import tz
from dateutil.parser import parse
from sklearn.preprocessing import StandardScaler
from tensorflow.python.keras.models import load_model
from azureml.core.model import Model
from azureml.contrib.services.aml_response import AMLResponse


HOST = '34.221.143.98'
DATABASE = 'azure_ai'
USERNAME = 'postgres'
PASSWORD = 'postgres'
TIME_SHIFT_STEPS = 24 * 4
CAMERA_ID = 76
PREDICTION_STEPS = 24 * 4  # one day 15 min intervals


def init():
    global model,scaler,database,prediction_serivce 
    model_path = Model.get_model_path('traffic-prediction-cam-76' ,version=3)
    model = load_model(model_path + '/model.h5',custom_objects={"adam": tf.keras.optimizers.Adam ,"mae":tf.keras.losses.mean_absolute_error})
    model.compile(optimizer='adam' ,loss='mae')
    scaler = joblib.load(model_path + '/scaler.joblib')
    database = Database(HOST, DATABASE, USERNAME, PASSWORD)
    prediction_serivce = Prediction(database,model,scaler, TIME_SHIFT_STEPS)
  

        # Handle requests to the service
def run(data):
    try:
        data=json.loads(data)

        predictions = prediction_serivce.predict(data['cameraId'], data['steps'])
        response=AMLResponse(predictions,200)
        response.headers['Access-Control-Allow-Origin']='*'
        return predictions
    except Exception as e:
        error = str(e)
        return error
    

class Database:
    def __init__(self, host, database, username, password):
        try:
            conn = psycopg2.connect(host=host, database=database, user=username, password=password)
            self.cursor = conn.cursor()
        except Exception as err:
            print(f'Failed to connect to Postgres database: {err}')

    def fetch_recent_history(self, camera_id, number_of_records, aggregation_minutes_interval=15):
        """Fetches the  top {} most recent records for the {entity} from the database"""
        query = f"SELECT   date_trunc('hour', time) + (((date_part('minute', time)::integer / {aggregation_minutes_interval}::integer) * {aggregation_minutes_interval}::integer) || ' minutes')::interval AS time_interval, " \
                f"avg(count) FROM public.count where camera_id={camera_id} and label in ('car','bus','truck')  GROUP BY time_interval order by time_interval desc   limit {number_of_records};"
        self.cursor.execute(query)
        return self.cursor.fetchall()


class Prediction:
    def __init__(self, database,model,scaler, history_time_steps):
        self.database = database
        self.model = model
        self.scaler = scaler
        self.TIME_SHIFT_STEPS = history_time_steps
        self.TIME_INTERVALS = timedelta(minutes=15)
        self.TIME_ZONES = 'America/Edmonton'

    def predict(self, camera_id, prediction_steps):
        inputs, historical_dates = self._prepare_inputs(camera_id, self.TIME_SHIFT_STEPS)
        X_train_hourOfDay_pred, X_train_dayOfWeek_pred, X_train_dayOfYear_pred, X_train_pred = inputs
        prediction_dates = self.create_dates(historical_dates, prediction_steps)
        predictions = []
        for i in range(prediction_steps):
            prediction = self.model.predict([X_train_hourOfDay_pred, X_train_dayOfWeek_pred, X_train_dayOfYear_pred,
                                             X_train_pred[:, :, :TIME_SHIFT_STEPS]])
            predictions.append(self.scaler.inverse_transform(prediction)[0][0])
            X_train_pred = np.insert(X_train_pred, 0, prediction[0][0], axis=2)
        
        pred_response = []
        for date, prediction in zip(prediction_dates, predictions):
            pred_response.append({"time": date.isoformat(), "count": float(prediction)})
        historical_hourly = self.get_hourly_historical(camera_id, 24)
        return {"prediction": pred_response, "historical": historical_hourly}

    def _prepare_inputs(self, camera_id, steps):
        records = self.database.fetch_recent_history(camera_id, 2 * self.TIME_SHIFT_STEPS)
        historical = pd.DataFrame(records, columns=['phenomenonTime', 'result'])
        historical['result'] = historical['result'].astype(float)
        historical.set_index(pd.DatetimeIndex(historical['phenomenonTime']), inplace=True)
        historical.drop(['phenomenonTime'], inplace=True, axis=1)

        historical[['result']] = self.scaler.transform(historical['result'].values.reshape(-1, 1))
        print(historical.head())
        train_pred, y_train_pred = self.get_timeseries(historical.copy(), self.TIME_SHIFT_STEPS, 'result')
       
        # Training
        data = historical.copy()
        data.drop(['result'], inplace=True, axis=1)
        data['hourOfDay'] = data.index.hour
        data['dayOfWeek'] = data.index.dayofweek
        data['dayOfYear'] = data.index.dayofyear
        train_cat_pred = self.get_categorical(data, self.TIME_SHIFT_STEPS)
        x_train_pred = np.array(train_pred['result_merged'].values.tolist()[0]).reshape(-1, 1, self.TIME_SHIFT_STEPS)
        x_train_hourOfDay_pred = np.array(train_cat_pred['hourOfDay_merged'].values.tolist()[0]).reshape(-1,
                                                                                                         self.TIME_SHIFT_STEPS)
        x_train_dayOfWeek_pred = np.array(train_cat_pred['dayOfWeek_merged'].values.tolist()[0]).reshape(-1,
                                                                                                         self.TIME_SHIFT_STEPS)
        x_train_dayOfYear_pred = np.array(train_cat_pred['dayOfYear_merged'].values.tolist()[0]).reshape(-1,
                                                                                                         self.TIME_SHIFT_STEPS)
        return [x_train_hourOfDay_pred, x_train_dayOfWeek_pred, x_train_dayOfYear_pred, x_train_pred], train_pred.index

    def get_timeseries(self, data, steps, target_column):
        y = None
        df = None
        for column in data.columns.tolist():
            df = data[[column]].copy()
            for i in range(1, steps + 1):
                df[f'{column}{i}'] = df[column].shift(-i)
            df.dropna(inplace=True, axis=0)

            if column == target_column:
                y = df[f'{column}{steps}'].values
            df.drop([f'{column}{steps}'], inplace=True, axis=1)
            df[f'{column}_merged'] = df.values.tolist()
            df.drop([f'{column}'], inplace=True, axis=1)
            for i in range(1, steps):
                df.drop([f'{column}{i}'], inplace=True, axis=1)

        return df, y

    def get_categorical(self, data, steps):
        merged = None
        for column in data.columns.tolist():
            df = data[[column]]
            for i in range(1, steps + 1):
                df[f'{column}{i}'] = df[column].shift(-i)
            df.dropna(inplace=True, axis=0)
            df.drop([f'{column}{steps}'], inplace=True, axis=1)
            df[f'{column}_merged'] = df.values.tolist()
            df.drop([f'{column}'], inplace=True, axis=1)
            for i in range(1, steps):
                df.drop([f'{column}{i}'], inplace=True, axis=1)

            if merged is None:
                merged = df.copy()
            else:
                merged[f'{column}_merged'] = df[[f'{column}_merged']]

        return merged

    def create_dates(self, hist_dates, prediction_steps):
        most_recent_date = hist_dates[0]
        most_recent_date = most_recent_date.replace(tzinfo=tz.gettz(self.TIME_ZONES))
        pred_dates = []
        for i in range(1, prediction_steps + 1):
            pred_dates.append(most_recent_date + i * self.TIME_INTERVALS)
        return pred_dates 
    
    def get_hourly_historical(self, camera_id, limit=24):
        records = self.database.fetch_recent_history(camera_id=camera_id, number_of_records=limit,
                                                     aggregation_minutes_interval=60)
        records = [
            {'time': record[0].replace(tzinfo=tz.gettz(self.TIME_ZONES)).isoformat(), 'count': float(record[1])}
            for record in records]
        return records

Overwriting scoring_service.py


In [49]:
def initialize():
    subscription_id = "979884b7-8494-4a3d-abd7-e9e63d1f5d90"  
    resource_group = "azure-ai-hackathon-ml"  
    workspace_name = "azure-ai-hackathon-ws"  
    workspace_region = "West US 2"  
   
    interactive_auth = InteractiveLoginAuthentication()
    ws = Workspace.get(
        name=workspace_name,
        subscription_id=subscription_id,
        resource_group=resource_group,
        auth=interactive_auth
    )
    return ws

def get_environment():
    environment = Environment("LocalDeploy")
    conda_dep = CondaDependencies()
    conda_dep.add_pip_package("absl-py")
    conda_dep.add_pip_package("astor")
    conda_dep.add_pip_package("cycler")
    conda_dep.add_pip_package("gast")
    conda_dep.add_pip_package("google-pasta")
    conda_dep.add_pip_package("grpcio")
    conda_dep.add_pip_package("h5py")
    conda_dep.add_pip_package("joblib")
    conda_dep.add_pip_package("Keras-Applications")
    conda_dep.add_pip_package("Keras-Preprocessing")
    conda_dep.add_pip_package("kiwisolver")
    conda_dep.add_pip_package("Markdown")
    conda_dep.add_pip_package("matplotlib")
    conda_dep.add_pip_package("numpy")
    conda_dep.add_pip_package("pandas")
    conda_dep.add_pip_package("protobuf")
    conda_dep.add_pip_package("pyparsing")
    conda_dep.add_pip_package("psycopg2-binary")
    conda_dep.add_pip_package("python-dateutil")
    conda_dep.add_pip_package("pytz")
    conda_dep.add_pip_package("scikit-learn")
    conda_dep.add_pip_package("scipy")
    conda_dep.add_pip_package("six")
    conda_dep.add_pip_package("tb-nightly==1.14.0a20190603")
    conda_dep.add_pip_package("tensorflow==2.0.0b1")
    conda_dep.add_pip_package("termcolor")
    conda_dep.add_pip_package("tf-estimator-nightly==1.14.0.dev2019060501")
    conda_dep.add_pip_package("Werkzeug")
    conda_dep.add_pip_package("wrapt")
    conda_dep.add_pip_package("azureml-core")
    conda_dep.add_pip_package("azureml-contrib-services")

    environment.python.conda_dependencies = conda_dep
    return environment

In [50]:
ws = initialize()
environment = get_environment()
inference_config = InferenceConfig(entry_script="score.py",
                                   environment=environment)

In [51]:
def deploy_local():
    deployment_config = LocalWebservice.deploy_configuration()
    model = Model(name='traffic-prediction-cam-76', workspace=ws, version=3)
    local_service = Model.deploy(ws, "local-deploy", [model], inference_config, deployment_config)
    local_service.wait_for_deployment()

def deploy_to_cloud():
    deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)
    model = Model(name='traffic-prediction-cam-76', workspace=ws, version=3)
    aci_service_name = 'traffic-pred-cam-service'
    try:
        service = Webservice(ws, name=aci_service_name)
        if service:
            service.delete()
    except WebserviceException as err:
        print(err)

    service = Model.deploy(ws, aci_service_name, [model, tokenizer, label_encoder], inference_config, deployment_config)

    service.wait_for_deployment(True)
    print(service.state)

## Local deployment

In [53]:
deploy_local()

Downloading model traffic-prediction-cam-76:3 to /tmp/azureml_jxkq5b4p/traffic-prediction-cam-76/3
Generating Docker build context.
Package creation Succeeded
Logging into Docker registry azureaihacka6d64e0ee.azurecr.io
Logging into Docker registry azureaihacka6d64e0ee.azurecr.io
Building Docker image from Dockerfile...
Step 1/5 : FROM azureaihacka6d64e0ee.azurecr.io/azureml/azureml_81db9aab80b622dc2fe2baa6f684293d
 ---> f65b3ea99657
Step 2/5 : COPY azureml-app /var/azureml-app
 ---> 198f580a7b49
Step 3/5 : COPY model_config_map.json /var/azureml-app/model_config_map.json
 ---> d84de580db61
Step 4/5 : RUN mv '/var/azureml-app/tmpxjwe_u7v.py' /var/azureml-app/main.py
 ---> Running in 067973b15d39
 ---> 81ed400296c9
Step 5/5 : CMD ["runsvdir","/var/runit"]
 ---> Running in 875104d801ca
 ---> 6e79bb0569cc
Successfully built 6e79bb0569cc
Successfully tagged local-deploy:latest
Container has been successfully cleaned up.
Image sha256:a1a8876659b6e69971dd8dbedad87e8f815f0b206ce1856acae4ef325

## Deploy the Model as Webservice on Azure Container Instance

In [None]:
deploy_to_cloud()

## Test the service 

In [45]:
response=requests.post('http://localhost:32869/score',json={
    'cameraId':76,
	'steps':96
})
response.json()

'{"prediction": [{"time": "2019-09-09T22:30:00-06:00", "count": 11.187052726745605}, {"time": "2019-09-09T22:45:00-06:00", "count": 11.339231491088867}, {"time": "2019-09-09T23:00:00-06:00", "count": 12.544694900512695}, {"time": "2019-09-09T23:15:00-06:00", "count": 14.295623779296875}, {"time": "2019-09-09T23:30:00-06:00", "count": 13.605803489685059}, {"time": "2019-09-09T23:45:00-06:00", "count": 12.726211547851562}, {"time": "2019-09-10T00:00:00-06:00", "count": 13.35285758972168}, {"time": "2019-09-10T00:15:00-06:00", "count": 13.09186840057373}, {"time": "2019-09-10T00:30:00-06:00", "count": 12.472402572631836}, {"time": "2019-09-10T00:45:00-06:00", "count": 13.82763385772705}, {"time": "2019-09-10T01:00:00-06:00", "count": 13.456625938415527}, {"time": "2019-09-10T01:15:00-06:00", "count": 11.666662216186523}, {"time": "2019-09-10T01:30:00-06:00", "count": 11.363595962524414}, {"time": "2019-09-10T01:45:00-06:00", "count": 14.081496238708496}, {"time": "2019-09-10T02:00:00-06:0