# Load Test deployed web application

This notebook pulls some images and tests them against the deployed web application. We submit requests asychronously 
which should reduce the contribution of latency.

In [1]:
from urllib.parse import urlparse
import pandas as pd

from azure_utils.configuration.project_configuration import ProjectConfiguration
from azure_utils.machine_learning.utils import get_workspace_from_config
from azureml.core.webservice import AksWebservice

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


In [2]:
#ws = get_workspace_from_config()
from azureml.core.authentication import MsiAuthentication
from azureml.core import Workspace

msi_auth = MsiAuthentication()
ws = Workspace(subscription_id="109e56d8-d599-4905-ab89-be3f6c7e1662",
               resource_group="trial2",
               workspace_name="experiment2ml",
               auth=msi_auth)
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep="\n")

experiment2ml
trial2
eastus
109e56d8-d599-4905-ab89-be3f6c7e1662


Let's retrieve the web service.

In [17]:
from azureml.core.compute import AksCompute, ComputeTarget
# Set the resource group that contains the AKS cluster and the cluster name
resource_group = 'trial2'
cluster_name = 'aks'

# Attach the cluster to your workgroup. If the cluster has less than 12 virtual CPUs, use the following instead:
attach_config = AksCompute.attach_configuration(resource_group = resource_group,
                                        cluster_name = cluster_name,
                                        cluster_purpose = AksCompute.ClusterPurpose.DEV_TEST)
# attach_config = AksCompute.attach_configuration(resource_group = resource_group,
#                                          cluster_name = cluster_name)
aks_target = ComputeTarget.attach(ws, 'myaks2', attach_config)

In [10]:
from azureml.core.compute import AksCompute, ComputeTarget

# Use the default configuration (you can also provide parameters to customize this).
# For example, to create a dev/test cluster, use:
prov_config = AksCompute.provisioning_configuration(cluster_purpose = AksCompute.ClusterPurpose.DEV_TEST)
#prov_config = AksCompute.provisioning_configuration(cluster_purpose = AksCompute.ClusterPurpose.DEV_TEST)

aks_name = 'myaks2'
# Create the cluster
# aks_target = ComputeTarget.create(workspace = ws,
#                                     name = aks_name,
#                                     provisioning_configuration = prov_config)
aks_target = ComputeTarget(workspace = ws,name = aks_name)
# Wait for the create process to complete
#aks_target.wait_for_completion(show_output = True)

In [11]:
AksWebservice??

In [None]:
# from azureml.core.webservice import AksWebservice, Webservice
# from azureml.core.model import Model

# aks_target = AksCompute(ws,"myaks2")
# If deploying to a cluster configured for dev/test, ensure that it was created with enough
# cores and memory to handle this deployment configuration. Note that memory is also used by
# things such as dependencies and AML components.
# deployment_config = AksWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)
model=[get_model(model_name='question_match_model')]
#service = Model.deploy(ws, "myservice", [model], inference_config, deployment_config, aks_target)
# service.wait_for_deployment(show_output = True)
# print(service.state)
# print(service.get_logs())

In [None]:
1

In [3]:
#project_configuration = ProjectConfiguration("project.yml")
#aks_service_name = project_configuration.get_settings('aks_service_name')
aks_service = AksWebservice(ws, name="aksservice")

We will test our service concurrently but only have 4 concurrent requests at any time. We have only deployed one pod 
on one node and increasing the number of concurrent calls does not really increase throughput. Feel free to try 
different values and see how the service responds.

In [4]:
concurrent_requests = 4  # Number of requests at a time

Get the scoring URL and API key of the service.

In [5]:
scoring_url = aks_service.scoring_uri
api_key = aks_service.get_keys()[0]

Below we are going to use [Locust](https://locust.io/) to load test our deployed model. First we need to write the 
locustfile.

In [17]:
urlparse(scoring_url)

ParseResult(scheme='http', netloc='52.177.140.64:80', path='/api/v1/service/aksservice/score', params='', query='', fragment='')

In [11]:
! ls ./data_folder

balanced_pairs_test.tsv   dupes_test.tsv
balanced_pairs_train.tsv  questions.tsv


In [12]:
%%writefile locustfile.py
from locust import HttpLocust, TaskSet, task
import os
import pandas as pd
from utilities import text_to_json
from itertools import cycle

_NUMBER_OF_REQUESTS = os.getenv('NUMBER_OF_REQUESTS', 100)
dupes_test_path = './data_folder/dupes_test.tsv'
dupes_test = pd.read_csv(dupes_test_path, sep='\t', encoding='latin1')
dupes_to_score = dupes_test.iloc[:_NUMBER_OF_REQUESTS, 4]
_SCORE_PATH = os.getenv('SCORE_PATH', "/score")
_API_KEY = os.getenv('API_KEY')


class UserBehavior(TaskSet):
    def on_start(self):
        print('Running setup')
        self._text_generator = cycle(dupes_to_score.apply(text_to_json))
        self._headers = {
            "content-type": "application/json",
            'Authorization': ('Bearer {}'.format(_API_KEY))
        }

    @task
    def score(self):
        self.client.post(_SCORE_PATH,
                         data=next(self._text_generator),
                         headers=self._headers)


class WebsiteUser(HttpLocust):
    task_set = UserBehavior
    # min and max time to wait before repeating task
    min_wait = 10
    max_wait = 200

Overwriting locustfile.py


Below we define the locust command we want to run. We are going to run at a hatch rate of 10 and the whole test will 
last 1 minute. Feel free to adjust the parameters below and see how the results differ. The results of the test will 
be saved to two csv files **modeltest_requests.csv** and **modeltest_distribution.csv**

In [13]:
parsed_url = urlparse(scoring_url)
cmd = "locust -H {host} --no-web -c {users} -r {rate} -t {duration} --csv=modeltest --only-summary".format(
    host="{url.scheme}://{url.netloc}".format(url=parsed_url), 
    users=concurrent_requests,  # concurrent users
    rate=10,  # hatch rate (users / second)
    duration='1m',  # test duration
)

In [18]:
! #!/bin/bash 
! API_KEY={api_key} SCORE_PATH={parsed_url.path} {cmd}

/bin/sh: 1: locust: not found


In [17]:
! #!/bin/bash 
! API_KEY={api_key} SCORE_PATH={parsed_url.path} PYTHONPATH={os.path.abspath('../')} {cmd}

/bin/sh: 1: Syntax error: "(" unexpected


Here are the summary results of our test and below that the distribution infromation of those tests. 

In [18]:

parsed_url = urlparse(scoring_url)
cmd = "locust -H {host} --no-web -c {users} -r {rate} -t {duration} --csv=modeltest --only-summary".format(
    host="{url.scheme}://{url.netloc}".format(url=parsed_url),
    users=concurrent_requests,  # concurrent users
    rate=10,  # hatch rate (users / second)
    duration='1m',  # test duration
)

In [31]:
import os
os.path.abspath('../')

'/home/riversand/notebooks/az-ml-realtime-score'

In [27]:

! PYTHONPATH={os.path.abspath('./')}

In [15]:
pd.read_csv("modeltest_requests.csv")

FileNotFoundError: [Errno 2] File modeltest_requests.csv does not exist: 'modeltest_requests.csv'

In [None]:
pd.read_csv("modeltest_distribution.csv")

To tear down the cluster and all related resources go to the [tear down the cluster](07_TearDown.ipynb) notebook.