# Experimenting With APIs

In this notebook, we will be experimenting with the APIs and create helper functions.

In [1]:
# general imports
import time
import re
from collections import namedtuple, deque
import random
import copy
from datetime import datetime
import pytz
import os

my_timezone = os.getenv('PY_TZ', 'America/Toronto')

# library imports
import requests
from tqdm.auto import tqdm
import pandas as pd
import numpy as np

# my library imports
from pacswg.timer import TimerClass
import pacswg

# my imports
from helpers import kube
from helpers import workload

## Kubernetes API

We use the kubernetes API to fetch the current status of each deployment and see how many instances they have.

In [2]:
# start watch thread
kube.start_watch_thread()
# print details of a deployment
# print(kube.live_deployments['helloworld-go-00001-deployment'])
# fetch a list of all knative deployments
# print(kube.get_knative_deployments())
# fetch latest revisions only

def get_knative_watch_info(kn_deploy_name):
    kn_latest_revs = kube.get_kn_latest_revs()
    if kn_deploy_name not in kn_latest_revs:
        return None
    bench_deployment = kn_latest_revs[kn_deploy_name]
    return kube.live_deployments[bench_deployment.deployment]

get_knative_watch_info('bench1')

starting watch thread...


{'kind': 'Deployment',
 'replicas': 0,
 'ready_replicas': 0,
 'last_update': 1611411975.519756}

## Deploying On Knative

In this section, we will develop the functionality to deploy a given workload to **knative** using the `kn` CLI.
The other options were using [CRDs](https://stackoverflow.com/questions/61384188/how-to-deploy-a-knative-service-with-kubernetes-python-client-library),
or using [kubectl apply](https://developers.redhat.com/coderland/serverless/deploying-serverless-knative#invoking_your_service_from_the_command_line)
but `kn` seems to be more powerful.

In [3]:
# making sure kn is set up correctly
!kn service ls

NAME            URL                                            LATEST                AGE   CONDITIONS   READY   REASON
bench1          http://bench1.default.kn.nima-dev.com          bench1-00001          18h   3 OK / 3     True    
helloworld-go   http://helloworld-go.default.kn.nima-dev.com   helloworld-go-00001   45h   3 OK / 3     True    


In [4]:
workload_name = 'bench1'
image = 'ghcr.io/nimamahmoudi/conc-workloads-bench1:sha-5966a0e'
env = {
    'EXPERIMENT_NAME': 'TEST1',
    'REPORT_INTERVAL': '10',
    'SOCKETIO_SERVER': 'NO',
}
opts = {
    '--limit': "'cpu=250m,memory=256Mi'",
    '--concurrency-target': '1',
    # '--concurrency-limit': '10',
    # '--concurrency-utilization': '70',
    # '--autoscale-window': '60s',
}
annotations = {
    'autoscaling.knative.dev/panicThresholdPercentage': '1000',
}

workload_spec = {
    'name': workload_name,
    'image': image,
    'env': env,
    'opts': opts,
    'annotations': annotations,
}

def kn_change_opts_concurrency_target(new_target, workload_spec):
    workload_spec['opts']['--concurrency-target'] = new_target
    return opts

# to change options to have a new concurrency target
# kn_change_opts_concurrency_target(1, workload_spec)

kn_command = kube.get_kn_command(**workload_spec)
print(kn_command)
# to run the command, we can simply use:
# !{kn_command}

kn service apply bench1 --image ghcr.io/nimamahmoudi/conc-workloads-bench1:sha-5966a0e \
  --env EXPERIMENT_NAME=TEST1 \
  --env REPORT_INTERVAL=10 \
  --env SOCKETIO_SERVER=NO \
  --limit 'cpu=250m,memory=256Mi' \
  --concurrency-target 1 \
  -a autoscaling.knative.dev/panicThresholdPercentage=1000


# Workload Specification

In [5]:
# user defined workload function
def user_workload_func():
    http_path = "http://bench1.default.kn.nima-dev.com"

    cmds = {}
    cmds['sleep'] = 0
    cmds['sleep_till'] = 0
    cmds['stat'] = {"argv": 1}

    # cmds['cpu'] = {"n": 20000}

    cmds['sleep'] = 1000 + (random.random() * 200)

    # cmds['io'] = {"rd": 3, "size": "200K", "cnt": 5}
    # cmds['cpu'] = {"n": 10000}

    payload = {}
    payload['cmds'] = cmds

    res = requests.post(http_path, json=payload)
    if res.status_code >= 300:
        return False
    return True

# get ready count callback
get_ready_cb = lambda: get_knative_watch_info(workload_name)['ready_replicas']
print('ready callback:', get_ready_cb())
# create logger and check one execution of workload func
wlogger = workload.WorkloadLogger(get_ready_cb=get_ready_cb)
simple_worker_func = lambda: wlogger.worker_func(user_workload_func)
# add worker func to workload spec
workload_spec['simple_worker_func'] = simple_worker_func

simple_worker_func()

ready callback: 0


{'client_start_time': 1611411978.5687087,
 'client_end_time': 1611411984.2395618,
 'client_elapsed_time': 5.670853137969971,
 'start_conc': 1,
 'end_conc': 1,
 'success': True}

In [6]:
# wlogger.monitoring_thread.start()
# wlogger.record_conc_loop()
# wlogger.monitor_conc_loop()
wlogger.start_capturing()
time.sleep(7)
wlogger.stop_capturing()
wlogger.get_recorded_data()

starting threads
stopping threads...
Done.


{'ready_count': [1, 1, 1, 1],
 'total_conc': [0, 0, 0, 0],
 'conc_window_average': [0.0, 0.0, 0.0, 0.0],
 'time': [1611411984.6263804,
  1611411986.626806,
  1611411988.6311753,
  1611411990.6328735]}

# Specifying Single Experiment

In this section, we will build the foundation for running a single experiment. In a later section, we will
run several experiments to collect the necessary data.

In [7]:
def perform_experiment(rps, cc, base_workload_spec, exp_spec):
    rps_list = [rps] * exp_spec['time_mins']
    # get base workload
    workload_spec = copy.deepcopy(base_workload_spec)
    worker_func = workload_spec['simple_worker_func']
    del workload_spec['simple_worker_func']
    # change base workload cc
    kn_change_opts_concurrency_target(cc, workload_spec)
    # get kn command to change cc
    kn_command = kube.get_kn_command(**workload_spec)
    print('applying kn command:')
    print(kn_command)
    # apply the kn command
    !{kn_command}
    # wait for kn command to take effect
    time.sleep(10)
    print('kn apply done')

    # initialize
    wg = pacswg.WorkloadGenerator(worker_func=worker_func, rps=0, worker_thread_count=100)
    wg.start_workers()
    timer = TimerClass()
    # make sure that logger is stopped
    wlogger.stop_capturing()
    # start capturing
    wlogger.start_capturing()

    print("============ Experiment Started ============")
    print("Time Started:", datetime.now().astimezone(pytz.timezone(my_timezone)))

    for rps in tqdm(rps_list):
        wg.set_rps(rps)
        timer.tic()
        # apply each for one minute
        while timer.toc() < 60:
            wg.fire_wait()

    # get the results
    wg.stop_workers()
    all_res = wg.get_stats()
    print("Total Requests Made:", len(all_res))
    wlogger.stop_capturing()
    logger_data = wlogger.get_recorded_data()

    # collect the results
    df_res = pd.DataFrame(data=all_res)
    df_res['rps'] = rps
    df_logger = pd.DataFrame(data=logger_data)
    df_logger['rps'] = rps
    now = datetime.now().astimezone(pytz.timezone(my_timezone))
    res_name = now.strftime('res-%Y-%m-%d_%H-%M-%S')
    res_folder = f'results/{exp_spec["name"]}/'
    # make the directory and file names
    !mkdir -p {res_folder}
    requests_results_filename = f'{res_name}_reqs.csv'
    logger_results_filename = f'{res_name}_logger.csv'
    # save the results
    df_res.to_csv(os.path.join(res_folder, requests_results_filename))
    df_logger.to_csv(os.path.join(res_folder, logger_results_filename))

    print('Experiment Name:', exp_spec['name'])
    print('Results Name:', res_name)


# experiment specification
exp_spec = {
    'time_mins': 20,
    'name': 'bench1_sleep_1000_200'
}

# perform_experiment(rps=1, cc=1, base_workload_spec=workload_spec, exp_spec=exp_spec)

applying kn command:
kn service apply bench1 --image ghcr.io/nimamahmoudi/conc-workloads-bench1:sha-5966a0e \
  --env EXPERIMENT_NAME=TEST1 \
  --env REPORT_INTERVAL=10 \
  --env SOCKETIO_SERVER=NO \
  --limit 'cpu=250m,memory=256Mi' \
  --concurrency-target 1 \
  -a autoscaling.knative.dev/panicThresholdPercentage=1000
No changes to apply to service 'bench1'.
Service 'bench1' with latest revision 'bench1-00001' (unchanged) is available at URL:
http://bench1.default.kn.nima-dev.com
kn apply done
stopping threads...
Done.
starting threads
Time Started: 2021-01-23 09:26:44.028174-05:00


  0%|          | 0/3 [00:00<?, ?it/s]

Total Requests Made: 196
stopping threads...
Done.
Experiment Name: bench1_sleep_1000_200
Results Name: res-2021-01-23_09-29-48


# Performing A Series of Experiments

In [None]:
for cc in range(1,10,2):
    for rps in np.linspace(1,21,11):
        perform_experiment(rps=rps, cc=cc, base_workload_spec=workload_spec, exp_spec=exp_spec)