Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Load Test deployed web application

In this notebook, we test the latency of the deployed web application by sending a number of duplicate questions as asychronous requests.

In [2]:
import asyncio
import json
import random
import urllib.request
from timeit import default_timer
import aiohttp
import nest_asyncio
from tqdm import tqdm
import requests
import pandas as pd
from utilities import text_to_json
from azureml.core.workspace import Workspace
from azureml.core.image import Image
from azureml.core.webservice import AksWebservice
from dotenv import set_key, get_key, find_dotenv

In [3]:
print(aiohttp.__version__) #3.3.2

3.3.2


In [4]:
nest_asyncio.apply()

In [5]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep="\n")

Found the config file in: /datadrive/MLAKSDeployAML/aml_config/config.json
fboyluamlsdkws
fboyluamlsdkrg
eastus2
edf507a2-6235-46c5-b560-fd463ba2e771


In [6]:
env_path = find_dotenv(raise_error_if_not_found=True)

Let's retrive the web service.

In [7]:
aks_service_name = get_key(env_path, 'aks_service_name')
aks_service = AksWebservice(ws, name=aks_service_name)
aks_service.name

'lgbmservice'

We will test our deployed service with 100 calls. We will only have 4 requests concurrently at any time. Feel free to try different values and see how the service responds.

In [8]:
NUMBER_OF_REQUESTS = 100  # Total number of requests
CONCURRENT_REQUESTS = 4   # Number of requests at a time

Get the scoring URL and API key of the service.

In [9]:
scoring_url = aks_service.scoring_uri
api_key = aks_service.get_keys()[0]

In [10]:
dupes_test_path = './data_folder/dupes_test.tsv'
dupes_test = pd.read_csv(dupes_test_path, sep='\t', encoding='latin1')
dupes_to_score = dupes_test.iloc[:NUMBER_OF_REQUESTS,4]

In [11]:
url_list = [[scoring_url, jsontext] for jsontext in dupes_to_score.apply(text_to_json)]

In [12]:
def decode(result):
    return json.loads(result.decode("utf-8"))

In [13]:
async def fetch(url, session, data, headers):
    start_time = default_timer()
    async with session.request("post", url, data=data, headers=headers) as response:
        resp = await response.read()
        elapsed = default_timer() - start_time
        return resp, elapsed

In [14]:
async def bound_fetch(sem, url, session, data, headers):
    # Getter function with semaphore.
    async with sem:
        return await fetch(url, session, data, headers)


In [15]:
async def await_with_progress(coros):
    results = []
    for f in tqdm(asyncio.as_completed(coros), total=len(coros)):
        result = await f
        results.append((decode(result[0]), result[1]))
    return results


In [16]:
async def run(url_list, num_concurrent=CONCURRENT_REQUESTS):
    headers = {
        "content-type": "application/json",
        "Authorization": ("Bearer " + api_key),
    }
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(num_concurrent)

    # Create client session that will ensure we dont open new connection
    # per each request.
    async with aiohttp.ClientSession() as session:
        for url, data in url_list:
            # pass Semaphore and session to every POST request
            task = asyncio.ensure_future(bound_fetch(sem, url, session, data, headers))
            tasks.append(task)
        return await await_with_progress(tasks)


Below we run the 100 requests against our deployed service.

In [17]:
loop = asyncio.get_event_loop()
start_time = default_timer()
complete_responses = loop.run_until_complete(
    asyncio.ensure_future(run(url_list, num_concurrent=CONCURRENT_REQUESTS))
)
elapsed = default_timer() - start_time
print("Total Elapsed {}".format(elapsed))
print("Avg time taken {0:4.2f} ms".format(1000 * elapsed / len(url_list)))

100%|██████████| 100/100 [00:02<00:00, 33.36it/s]

Total Elapsed 3.001682119909674
Avg time taken 30.02 ms





In [18]:
# Example response
complete_responses[0]

('[[5223, 6700, 0.9999965959866246], [750486, 750506, 0.01531029604225485], [14220321, 14220323, 0.007132754207166135], [3127429, 3127440, 0.004683669164453595], [4057440, 4060176, 0.003948233337061824], [126100, 4889658, 0.0019431114176480726], [8495687, 8495740, 0.0017439976058191318], [7837456, 14853974, 0.0017182658391146225], [111102, 111111, 0.0011247467797070546], [359494, 359509, 0.0006558239961484626], [11922383, 11922384, 0.0002469030382323802], [1129216, 1129270, 0.00015594519383403985], [203198, 1207393, 0.00015279426484568245], [171251, 171256, 7.232940221429718e-05], [8228281, 8228308, 6.509957889736847e-05], [4425318, 4425359, 4.544665737094924e-05], [15141762, 15171030, 4.4550726355687584e-05], [20279484, 20279485, 4.448114291098699e-05], [1822350, 1822769, 4.352134230653638e-05], [1885557, 1885660, 2.422904311163449e-05], [850341, 850346, 1.9769889568170553e-05], [7486085, 7486130, 1.757786527921662e-05], [5767325, 5767357, 1.6630344078064335e-05], [262427, 262511, 1.5

Let's use the number of original questions to count the succesful responses.

In [19]:
no_questions = len(eval(complete_responses[0][0]))

In [20]:
num_succesful = [len(eval(i[0])) for i in complete_responses].count(no_questions)
print("Succesful {} out of {}".format(num_succesful, len(url_list)))

Succesful 100 out of 100


Next, we will explore the real-time scoring in an [iPyWidget app](06_Real_Time_Scoring.ipynb).