Copyright © 2022, SAS Institute Inc., Cary, NC, USA.  All Rights Reserved. 

# Scoring Data with Containerized Deployment over REST API
## Table of Contents
1. [Introduction](#Introduction)
1. [Gather Resources](#Gather-Resources)
1. [Find the Container Endpoint](#Find-the-Container-Endpoint)
1. [Prepare Data](#Prepare-Data)
1. [Make REST API Calls](#Make-REST-API-Calls)
    1. [Method 1: Sequential REST API Calls](#method1)
    1. [Method 2: Parallel REST API Calls](#method2)
    1. [Method 3: Asynchronous REST API Calls](#method3)
1. [Conclusion](#Conclusion)

## Introduction

This notebook, walks you through how to leverage the containerized deployment of a model or decision to score data. Data is passed to the container and results are returned using a REST API. You need a few things to make that REST API call. First, you need the container endpoint for the model or decision. This notebook goes over how to find that endpoint from the kubeconfig file and the published model or decision name. Next, your data must be in JSON format. This notebook also goes over how to take data from a CSV file and create a list of JSON-formatted transactions for each row of input data. With the endpoint and JSON-formatted data, you are ready to make the REST API call to score your data. The container scores a single row of data at a time, which is not an issue for on-demand scoring. For batch scoring, you can pass data to the container row-by-row, which can be a time consuming process. To speed up scoring batch data, you can make your requests in parallel or asynchronously. This notebook highlights all three methods so that you can use what works best for your use case. These methods are not specific to SAS or containerized deployments, but rather are methods for making multiple REST calls at once.

***
## Gather Resources 
First, import all the Python packages that you need to use. Depending on which parts of the notebook you are using, you might not need every package listed below. Thus, you can import only what you need to use. 

In [None]:
# Python Packages for Interacting with APIs and Input Data
import json
import requests
import pandas as pd
import getpass

In [None]:
# Python Packages for Using the Kubeconfig File to Find the Endpoint
from Kubernetes import client, config

In [None]:
# Python Packages for Making REST API Calls in Parallel
import multiprocessing
from multiprocessing import Pool

In [None]:
# Python Packages for Making REST API Calls Asynchronously 
import asyncio
import aiohttp
from aiohttp.client import ClientSession
import nest_asyncio
nest_asyncio.apply()

***
## Find the Container Endpoint
If you already know your container endpoint, you can run the first block of code and enter your container scoring endpoint. Otherwise, please create a directory in the same location as this notebook and name it _config_ and then upload your kubeconfig file into this directory and name it _kubeconfig_. [Click here](https://docs.microsoft.com/en-us/azure/aks/control-kubeconfig-access) for documentation about how to get the kubeconfig file from Azure Kubernetes Service. 
Next, run the second block of code and enter the kubeconfig context, the port number and the deployed model or decision container name. 

In [None]:
# Only run this block if you already know the scoring endpoint for your container.
scoredEndPoint = input("Container scoring endpoint: ")

In [None]:
# Only run this block if you are using the kubeconfig file to find the scoring endpoint for your container.
context = input("Kubeconfig context: ")
port_number = input("Service port number: ")
deployed_name = input("Deployed model or decision name: ")

config.load_kube_config('config/kubeconfig', context=context)
serviceName = deployed_name + "-service"

# Get the IP address of the container.
v1 = client.CoreV1Api()
pod_list = v1.list_namespaced_service("default")
for pod in pod_list.items:
    # print(pod.metadata.name)
    if pod.metadata.name == serviceName:
        selectedPod = pod
        break
    
podAddr = selectedPod.status.load_balancer.ingress[0].ip
    
containerUrl = "http://" + podAddr + ":" + port_number
r = requests.get(containerUrl + "/__env") 
for endpoint in r.json()['endpoints']:
    # print(endpoint)
    if (endpoint.endswith("/execute")):
        scoredEndPoint = containerUrl + endpoint

print("Container Scoring Endpoint: ", scoredEndPoint)

*** 
## Prepare Data
You now have your scoring endpoint. Next, you need to prepare your data in a JSON format. The following code block takes a CSV file and converts it into a JSON list, where each item in the list is one row of data from the CSV table. Please ensure that the column names in the CSV table match the input names that the deployed model or decision is expecting. 

In [None]:
csv_path = input("CSV File with Path: ")
df = pd.read_csv(csv_path)
jsonFile = df.to_json(orient='records', lines=True)
jsonList = jsonFile.split('\n')
del jsonList[len(jsonList)-1]

***
## Make REST API Calls
Your data is now correctly formatted, so you are ready to make the REST API call. As mentioned in the introduction to the notebook, you have a few options for how you can send the requests to the container. The first method sends each request one-at-a-time for each row/transaction and waits for a response back from the container before sending the next request. This method works well if you are looking to score just one or a few transactions. The next method uses parallelization to send multiple requests at once. This method makes a separate call from each CPU and can speed up processing time for larger data sets. The last method makes an asynchronous call, which makes all the requests one after the other, and then waits for the container to return the responses back. This method can drastically speed up processing time, but your requests can return out of order. Each option is included below so that you can determine which best works for your use case.

<a id="method1"></a>

### Method 1: Sequential REST API Calls

In [None]:
headers = {'Content-Type':'application/json'}
results = []
for x in jsonList:
    response = requests.post(scoredEndPoint, data=x, headers=headers)
    response = response.json()
    results.append(response)

Review the first response as a sanity check. 

In [None]:
results[0]

<a id="method2"></a>
### Method 2: Parallel REST API Calls

In [None]:
cpus = int(input("Number of CPUs: "))
cpus = int(cpus)

headers = {'Content-Type':'application/json'}
results = []

def score(data_json):  
    response = requests.post(scoredEndPoint, data=data_json, headers=headers)
    response = response.json()
    return response

with Pool(processes=4) as pool:
        for x in pool.imap_unordered(score, jsonList):
            results.append(x)

Review the first response as a sanity check. 

In [None]:
results[0]

<a id="method3"></a>
### Method 3: Asynchronous REST API Calls

In [None]:
tasks = []
async def postData(url:str, data:list, session:ClientSession):
    headers = {'Content-Type':'application/json'}
    async with session.post(url, data=data, headers=headers) as response:
        results= await response.json()
        return result

async def postAll(data:list):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for row in data:
            task = asyncio.ensure_future(postData(url=scoredEndPoint,
                                                  data=row,
                                                  session=session))
            tasks.append(task)
        allPostCalls = await asyncio.gather(*tasks, return_exceptions=True)
    return allPostCalls

tasks = asyncio.run(postAll(jsonList))

Review the first response as a sanity check. 

In [None]:
tasks[0]

***
## Conclusion
And just like that you can score data using a model or decision deployed into a container. Using the methods listed in this notebook, you can easily incorporate modeling and decisioning into applications and processes. 
***