In [1]:
%load_ext autoreload
%autoreload 2

from tqdm.auto import tqdm
import itertools

import os
import time
import traceback
from datetime import datetime
import pytz

# for processing
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# my imports
from helpers import kube
from helpers import workload
from helpers import util
from helpers import request_funcs

# config
my_timezone = os.getenv('PY_TZ', 'America/Toronto')

# small funcs
from_js_timestamp = lambda x: datetime.utcfromtimestamp(x/1000).astimezone(pytz.timezone(my_timezone))
def get_time_with_tz():
    return datetime.now().astimezone(pytz.timezone(my_timezone))

fetching imagenet v2
resizing images


  0%|          | 0/100 [00:00<?, ?it/s]

converting to bentoml files


  0%|          | 0/100 [00:00<?, ?it/s]

extracting base64 files


  0%|          | 0/100 [00:00<?, ?it/s]

preprocessing for mobilenet


  0%|          | 0/100 [00:00<?, ?it/s]

In [2]:
# experiment that we want to perform
service_name = 'tfserving-resnetv2'
# service_name = 'bentoml-onnx-resnet50'
# service_name = 'bentoml-iris'
# service_name = 'tfserving-mobilenetv1'

override_url = f'http://localhost:3000/proxy/{service_name}'

cpu_m = 1000
ram_mb = 1000

rps_list = [5] * 60

In [3]:
config_names = [
    'bentoml-iris-250m-512mb',
    'bentoml-onnx-resnet50-250m-512mb',
    'tfserving-resnetv2-250m-512mb',
    'tfserving-mobilenetv1-250m-512mb',
]

workload_configs = {}
for exp_config_name in config_names:
    exp_file = f"deployments/{exp_config_name}.json"
    workload_spec = util.load_json_file(exp_file)
    workload_configs[workload_spec['name']] = workload_spec

In [4]:
# deploy the function
request_limit_override = f"'cpu={cpu_m}m,memory={ram_mb}Mi'"
print('Request Limit Overrid:', request_limit_override)

workload_spec = workload_configs[service_name]
# override request and limit values
workload_spec['opts']['--request'] = request_limit_override
workload_spec['opts']['--limit'] = request_limit_override
kn_command = kube.get_kn_command(**workload_spec)
print(kn_command)
!{kn_command}
print('waiting for settings to converge')
time.sleep(10)

Request Limit Overrid: 'cpu=1000m,memory=1000Mi'
kn service apply tfserving-resnetv2 --image ghcr.io/nimamahmoudi/tfserving-resnet:20210429213000 \
  --limit 'cpu=1000m,memory=1000Mi' \
  --request 'cpu=1000m,memory=1000Mi' \
  --port 5000 \
  -a autoscaling.knative.dev/target=1 \
  -a autoscaling.knative.dev/metric=concurrency
No changes to apply to service 'tfserving-resnetv2'.
Service 'tfserving-resnetv2' with latest revision 'tfserving-resnetv2-00079' (unchanged) is available at URL:
http://tfserving-resnetv2.default.kn.nima-dev.com
waiting for settings to converge


In [5]:
# call the request function with proper arguments
def call_request_func():
    request_func = request_funcs.workload_funcs[service_name]
    result = request_func(url=override_url)

    return {
        'response_time_ms': result['response_time_ms'],
        'request_id': result['headers']['X-Request-Id'],
        'queue_position': int(result['headers']['X-SmartProxy-queuePosition']),
        'received_at': from_js_timestamp(int(result['headers']['X-SmartProxy-receivedAt'])),
        'response_at': from_js_timestamp(int(result['headers']['X-SmartProxy-responseAt'])),
        'upstream_response_time': int(result['headers']['X-SmartProxy-upstreamResponseTime']),
        'upstream_request_count': int(result['headers']['X-SmartProxy-upstreamRequestCount']),
        'response_time_ms_server': int(result['headers']['X-SmartProxy-responseTime']),
        'queue_time_ms': int(result['headers']['X-SmartProxy-queueTime']),
    }

call_request_func()

{'response_time_ms': 3579.294,
 'request_id': '2cda9c33-0a59-4ad8-9f99-fb11a7f09b7e',
 'queue_position': 0,
 'received_at': datetime.datetime(2021, 5, 20, 12, 2, 4, 330000, tzinfo=<DstTzInfo 'America/Toronto' EDT-1 day, 20:00:00 DST>),
 'response_at': datetime.datetime(2021, 5, 20, 12, 2, 7, 896000, tzinfo=<DstTzInfo 'America/Toronto' EDT-1 day, 20:00:00 DST>),
 'upstream_response_time': 2564,
 'upstream_request_count': 1,
 'response_time_ms_server': 3566,
 'queue_time_ms': 1002}

In [6]:
sample_reqs = [call_request_func() for _ in range(10)]
sample_reqs = [d for d in sample_reqs if d is not None]
pd.DataFrame(data=sample_reqs)

Unnamed: 0,response_time_ms,request_id,queue_position,received_at,response_at,upstream_response_time,upstream_request_count,response_time_ms_server,queue_time_ms
0,1231.654,64183de3-60fb-4ff4-8e6e-766fa804513b,0,2021-05-20 12:02:08.272000-04:00,2021-05-20 12:02:09.499000-04:00,224,1,1227,1003
1,1197.446,297f531f-418c-4f20-858e-90d5f646d85f,0,2021-05-20 12:02:09.507000-04:00,2021-05-20 12:02:10.700000-04:00,192,1,1193,1001
2,1183.854,5e573f7a-e483-4167-9ea3-cc2f3dec77fd,0,2021-05-20 12:02:10.708000-04:00,2021-05-20 12:02:11.887000-04:00,178,1,1179,1001
3,1295.762,42d2324c-ad16-4473-b9d2-de478b377b18,0,2021-05-20 12:02:11.896000-04:00,2021-05-20 12:02:13.187000-04:00,289,1,1291,1002
4,1211.55,ac81b5af-0c7b-4664-a972-495649fe9760,0,2021-05-20 12:02:13.195000-04:00,2021-05-20 12:02:14.401000-04:00,205,1,1206,1001
5,1186.048,675e33e0-e9fe-4f91-94cc-9e9dd2ee5b94,0,2021-05-20 12:02:14.409000-04:00,2021-05-20 12:02:15.591000-04:00,181,1,1182,1001
6,1208.725,0ca55bc5-1adf-40e5-9e74-0d0ca5bd483e,0,2021-05-20 12:02:15.597000-04:00,2021-05-20 12:02:16.801000-04:00,202,1,1204,1002
7,1173.017,83ca1cdf-a523-4c34-aaac-f35676fcc388,0,2021-05-20 12:02:16.810000-04:00,2021-05-20 12:02:17.978000-04:00,164,1,1168,1004
8,1202.773,39703455-460b-46a3-b1ea-90d2762b39df,0,2021-05-20 12:02:17.985000-04:00,2021-05-20 12:02:19.184000-04:00,197,1,1199,1002
9,1231.634,b13a7a6b-fdff-4288-a5ea-ed99e095bcf2,0,2021-05-20 12:02:19.190000-04:00,2021-05-20 12:02:20.417000-04:00,226,1,1227,1001


In [7]:
# adding exception handling to create worker func
def worker_func():
    try:
        return call_request_func()
    except Exception:
        print('exception occured:')
        traceback.print_exc()
        return None

In [9]:
# my library imports
from pacswg.timer import TimerClass
import pacswg

# start workload generator
wg = pacswg.WorkloadGenerator(worker_func=worker_func, rps=0, worker_thread_count=100)
wg.start_workers()
timer = TimerClass()

print("============ Experiment Started ============")
print("Time Started:", get_time_with_tz())

for rps in tqdm(rps_list):
    wg.set_rps(rps)
    timer.tic()
    # apply each for one minute
    while timer.toc() < 60:
        wg.fire_wait()

# get the results
wg.stop_workers()
all_res = wg.get_stats()
total_reqs = len(all_res)
all_res = [d for d in all_res if d is not None]
success_reqs = len(all_res)

print("Total Requests Made:", total_reqs)
print("Successful Requests Made:", success_reqs)

Time Started: 2021-05-20 12:02:21.783987-04:00


  0%|          | 0/3 [00:00<?, ?it/s]

Total Requests Made: 938
Successful Requests Made: 938


In [10]:
# collect the results
df_res = pd.DataFrame(data=all_res)
# save the results
now = get_time_with_tz()
res_name = now.strftime('res-%Y-%m-%d_%H-%M-%S')
res_folder = f'results/trace1/{service_name}'
# make the directory and file names
! mkdir -p {res_folder}
requests_results_filename = f'{res_name}_reqs.csv'
df_res.to_csv(os.path.join(res_folder, requests_results_filename))
print('Results Name:', res_name)

Results Name: res-2021-05-20_12-05-42
