# Data Collector

In this jupyter notebook we will try to collect data of the images specified in the ProfileData under different workloads of the server.

In [1]:
import importlib

# Imports
import time
import logging
import random
import datetime
import pprint
import traceback
pp = pprint.PrettyPrinter()

import RabbitServerInfo as info
import DockerRemoteAPI as api
from ELSbeat import metricbeat
import LoadTester
import Manager

# for jupyter problems
importlib.reload(LoadTester)
importlib.reload(api)
importlib.reload(info)
importlib.reload(Manager)

from Manager import PackageInfo, ApiServer

# logging
logging.basicConfig(level=logging.FATAL)
LOGGER = logging.getLogger()

# Data Analysis
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
profiler = ApiServer(name='profiler1', ip='10.2.6.177', api_port='7999', id='hkg-profiler-1')
workers = [
    ApiServer(name='profiler1', ip='10.2.6.177', api_port='7999', id='hkg-profiler-1'),
]

package_oltp = PackageInfo(image_name='YOUR_HUB_ID/oltp-automated',
                      test_url='http://10.2.6.171/oltp', mem_limit="512m", cpu_percent_limit=50, post_delay=300,
                      device_read_bps=[{"Path": "/dev/sda", "Rate": 7497318}],
                      device_write_bps=[{"Path": "/dev/sda", "Rate": 7497318}],)
package_fileio = PackageInfo(image_name='YOUR_HUB_ID/fileio-automated',
                      test_url='http://10.2.6.171/fileio', mem_limit="512m", cpu_percent_limit=50, post_delay=60,
                      device_read_bps=[{"Path": "/dev/sda", "Rate": 7497318}],
                      device_write_bps=[{"Path": "/dev/sda", "Rate": 7497318}],)
package_cpu = PackageInfo(image_name='YOUR_HUB_ID/cpu-automated',
                      test_url='http://10.2.6.171/linpack', mem_limit="512m", cpu_percent_limit=50, post_delay=30,
                      device_read_bps=[{"Path": "/dev/sda", "Rate": 7497318}],
                      device_write_bps=[{"Path": "/dev/sda", "Rate": 7497318}],)

manager = Manager.Manager(profiler=profiler, workers=workers, elastic_server_ip='10.2.6.181', elastic_server_port='9200')

In [3]:
warmup_time = '10s'
test_time = '1m'
accepted_stats=['avg']

test_mode = "Interactive"

manager.MIN_PRE_CONTAINER_COUNT = 0

if test_mode == "RPS":
    tests_targets = [
        {
            'package': package_cpu,
            'target_rps_list': [1, 1.5, 2],
            'counts': [0, 1, 2, 3],
        },
        {
            'package': package_fileio,
            'target_rps_list': [.5, 1, 1.5],
            'counts': [0, 1, 2],
        },
        {
            'package': package_oltp,
            'target_rps_list': [.5, 1, 2],
            'counts': [0, 1, 2, 3],
        }
    ]
elif test_mode == "Interactive":
    tests_targets = [
        {
            'package': package_cpu,
            'target_users': [1,2,3],
            'counts': [0,1,2,3],
        },
        {
            'package': package_fileio,
            'target_users': [1,2,3],
            'counts': [0,1,2,3],
        },
        {
            'package': package_oltp,
            'target_users': [1,2,3],
            'counts': [0,1,2,3,4],
        }
    ]

In [None]:
test_time_secs = LoadTester.convert_to_seconds(test_time)
warmup_time_secs = LoadTester.convert_to_seconds(warmup_time)
pp.pprint(manager.get_worker_stats(test_time))

In [None]:
all_test_results = []

if test_mode == "RPS":
    all_test_results = []
    for test_counter in range(100):
        print('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA')
        print("Test Counter: ", test_counter)
        for i in range(3):
            try:
                print('===========================')
                print("Performing test #", i + 1)

                profiles = pd.read_csv('ProfileTable_' + test_mode + '.csv')
                profiles.set_index('Unnamed: 0', inplace=True)
                profile_id = random.randint(0, profiles.shape[0] - 1)
                profile_row = profiles.iloc[profile_id, :]

                target_profile = {
                    'package': PackageInfo(image_name=profile_row['package_name'],
                                           test_url=profile_row['package_test_url'] + "test",
                                           mem_limit=profile_row['package_mem_limit'],
                                           device_read_bps=eval(profile_row['package_device_read_bps']),
                                           device_read_iops=eval(profile_row['package_device_read_iops']),
                                           device_write_bps=eval(profile_row['package_device_write_bps']),
                                           device_write_iops=eval(profile_row['package_device_write_iops']),
                                           cpu_percent_limit=profile_row['package_cpu_percent_limit'],
                                           post_delay=profile_row['package_post_delay'],
                                           environment=['postfix=test']),
                    'target_rps': profile_row['target_rps'],
                }

                profile_pre_dict = profile_row.to_dict()
                all_dict = {}
                for key in profile_pre_dict:
                    all_dict[key + '_profile'] = profile_pre_dict[key]

                print(datetime.datetime.now())
                print(target_profile)
                test_states = manager.prepare_worker(tests_targets)
                pp.pprint(test_states)

                # Get statistics before adding the new one
                time.sleep(test_time_secs)
                stats = manager.get_worker_stats(test_time)
                for key in stats:
                    all_dict[key + "_pre"] = stats[key]

                load_tester = manager.prepare_worker_test(target_profile, test_time=test_time, warmup_time=warmup_time, 
                                                          test_mode=test_mode)

                # Get statistics after adding the new one
                time.sleep(test_time_secs + warmup_time_secs / 2)

                stats = manager.get_worker_stats(test_time)
                for key in stats:
                    all_dict[key + "_post"] = stats[key]

                load_tester.wait_for_test_results()
                test_results = load_tester.results()
                print('++++++++++++++')
                print(load_tester.url)
                load_tester.print_results()
                for key in test_results:
                    all_dict[key + "_post"] = test_results[key]

                manager.stop_test_load()
                print(datetime.datetime.now())

                all_dict['test_state'] = str(test_states)

                all_test_results.append(all_dict)
            except Exception as e:
                print("Error: ", str(e))
                traceback.print_exc()
                time.sleep(10)
                continue

        # Write results to CSV file
        test_res = pd.DataFrame(data=all_test_results)
        test_res.to_csv('Data/'+test_mode+'Dataset_' + str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')) + '.csv')
        
elif test_mode == "Interactive":
    profiles = pd.read_csv('ProfileTable_' + test_mode + '.csv')
    profiles.set_index('Unnamed: 0', inplace=True)
    
    all_test_results = []
    for test_counter in range(100):
        print('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA')
        print("Test Counter: ", test_counter)
        for i in range(10):
            try:
                print('===========================')
                print("Performing test #", i + 1)
                profile_id = random.randint(0, profiles.shape[0] - 1)
                profile_row = profiles.iloc[profile_id, :]

                target_profile = {
                    'package': PackageInfo(image_name=profile_row['package_name'],
                                           test_url=profile_row['package_test_url'] + "test",
                                           mem_limit=profile_row['package_mem_limit'],
                                           device_read_bps=eval(profile_row['package_device_read_bps']),
                                           device_read_iops=eval(profile_row['package_device_read_iops']),
                                           device_write_bps=eval(profile_row['package_device_write_bps']),
                                           device_write_iops=eval(profile_row['package_device_write_iops']),
                                           cpu_percent_limit=int(profile_row['package_cpu_percent_limit']),
                                           post_delay=profile_row['package_post_delay'],
                                           environment=['postfix=test']),
                    'num_of_clients': profile_row['num_of_clients'],
                }

                profile_pre_dict = profile_row.to_dict()
                all_dict = {}
                for key in profile_pre_dict:
                    all_dict[key + '_profile'] = profile_pre_dict[key]

                print(datetime.datetime.now())
                print(target_profile)
                test_states = manager.prepare_worker(tests_targets, test_mode=test_mode)
                pp.pprint(test_states)

                # Get statistics before adding the new one
                time.sleep(test_time_secs)
                stats = manager.get_worker_stats(test_time)
                for key in stats:
                    all_dict[key + "_pre"] = stats[key]

                load_tester = manager.prepare_worker_test(target_profile, test_time=test_time, 
                                                          warmup_time=warmup_time, test_mode=test_mode)

                # Get statistics after adding the new one
                time.sleep(test_time_secs + warmup_time_secs - 1)

                stats = manager.get_worker_stats(test_time)
                for key in stats:
                    all_dict[key + "_post"] = stats[key]

                load_tester.wait_for_test_results()
                test_results = load_tester.results()
                print('++++++++++++++')
                print(load_tester.url)
                load_tester.print_results()
                for key in test_results:
                    all_dict[key + "_post"] = test_results[key]

                manager.stop_test_load()
                print(datetime.datetime.now())

                all_dict['test_state'] = str(test_states)

                all_test_results.append(all_dict)
            except Exception as e:
                print("Error: ", str(e))
                traceback.print_exc()
                time.sleep(10)
                continue

        # Write results to CSV file
        test_res = pd.DataFrame(data=all_test_results)
        test_res.to_csv('Data/'+test_mode+'Dataset_' + str(datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')) + '.csv')

In [None]:
# Check the results
test_res