In [1]:
import json
import os
import glob
import pandas
import statistics

## The path to the env dir of the experimental campaign

In [2]:
RESULT_PATH = "../test_case_1"

## Inserting some ombt code (this could be removed if used as a library)

In [3]:
import math

class Stats(object):
    """Manage a single statistic"""
    def __init__(self, min=None, max=None, total=0, count=0,
                 sum_of_squares=0, distribution=None):
        self.min = min
        self.max = max
        self.total = total
        self.count = count
        self.sum_of_squares = sum_of_squares
        # distribution of values grouped by powers of 10
        self.distribution = distribution or dict()

    @classmethod
    def from_dict(cls, values):
        if 'distribution' in values:
            # hack alert!
            # when a Stats is passed via an RPC call it appears as if the
            # distribution map's keys are converted from int to str.
            # Fix that by re-indexing the distribution map:
            new_dict = dict()
            old_dict = values['distribution']
            for k in old_dict.keys():
                new_dict[int(k)] = old_dict[k];
            values['distribution'] = new_dict
        return Stats(**values)

    def to_dict(self):
        new_dict = dict()
        for a in ["min", "max", "total", "count", "sum_of_squares"]:
            new_dict[a] = getattr(self, a)
        new_dict["distribution"] = self.distribution.copy()
        return new_dict

    def update(self, value):
        self.total += value
        self.count += 1
        self.sum_of_squares += value**2
        self.min = min(self.min, value) if self.min else value
        self.max = max(self.max, value) if self.max else value
        log = int(math.log10(value)) if value >= 1.0 else 0
        base = 10**log
        index = int(value/base)  # 0..9
        if log not in self.distribution:
            self.distribution[log] = [0 for i in range(10)]
        self.distribution[log][index] += 1

    def reset(self):
        self.__init__()

    def average(self):
        return (self.total / float(self.count)) if self.count else 0

    def std_deviation(self):
        return math.sqrt((self.sum_of_squares / float(self.count))
                         - (self.average() ** 2)) if self.count else -1

    def merge(self, stats):
        if stats.min is not None and self.min is not None:
            self.min = min(self.min, stats.min)
        else:
            self.min = self.min or stats.min
        if stats.max is not None and self.max is not None:
            self.max = max(self.max, stats.max)
        else:
            self.max = self.max or stats.max

        self.total += stats.total
        self.count += stats.count
        self.sum_of_squares += stats.sum_of_squares
        for k in stats.distribution.keys():
            if k in self.distribution:
                self.distribution[k] = [z for z in map(lambda a, b: a + b,
                                                       stats.distribution[k],
                                                       self.distribution[k])]
            else:
                self.distribution[k] = stats.distribution[k]

    def __str__(self):
        return "min=%i, max=%i, avg=%f, std-dev=%f" % (self.min, self.max,
                                                       self.average(),
                                                       self.std_deviation())

    def print_distribution(self):
        keys = list(self.distribution.keys())
        keys.sort()
        for order in keys:
            row = self.distribution[order]
            # order=0, index=0 is special case as it is < 1.0, for all orders >
            # 0, index 0 is ignored since everthing < 10^order is accounted for
            # in index 9 of the (order - 1) row
            index = 0 if order == 0 else 1
            while index < len(row):
                print("[%d..<%d):  %d" %
                      ((10 ** int(order)) * index,
                       (10 ** int(order)) * (index + 1),
                       row[index]))
                index += 1

class TestResults(object):
    """Client results of a test run.
    """
    def __init__(self, start_time=None, stop_time=None, latency=None,
                 msgs_ok=0, msgs_fail=0, errors=None):
        super(TestResults, self).__init__()
        self.start_time = start_time
        self.stop_time = stop_time
        self.latency = latency or Stats()
        self.msgs_ok = msgs_ok  # count of successful msg transfers
        self.msgs_fail = msgs_fail  # count of failed msg transfers
        self.errors = errors or dict()  # error msgs and counts

    @classmethod
    def from_dict(cls, values):
        if 'latency' in values:
            values['latency'] = Stats.from_dict(values['latency'])
        if 'errors' in values:
            values['errors'] = values['errors'].copy()
        return TestResults(**values)

    def to_dict(self):
        new_dict = dict()
        for a in ['start_time', 'stop_time', 'msgs_ok', 'msgs_fail']:
            new_dict[a] = getattr(self, a)
        new_dict['latency'] = self.latency.to_dict()
        new_dict['errors'] = self.errors.copy()
        return new_dict

    def error(self, reason):
        key = str(reason)
        self.errors[key] = self.errors.get(key, 0) + 1

    def reset(self):
        self.__init__()

    def merge(self, results):
        self.start_time = (min(self.start_time, results.start_time)
                           if self.start_time and results.start_time
                           else (self.start_time or results.start_time))
        self.stop_time = (max(self.stop_time, results.stop_time)
                              if self.stop_time and results.stop_time
                          else (self.stop_time or results.stop_time))
        self.msgs_ok += results.msgs_ok
        self.msgs_fail += results.msgs_fail
        self.latency.merge(results.latency)
        for err in results.errors:
            self.errors[err] = self.errors.get(err, 0) + results.errors[err]

    def print_results(self):
        if self.msgs_fail:
            print("Error: %d message transfers failed"
                  % self.msgs_fail)
        if self.errors:
            print("Error: errors detected:")
            for err in self.errors:
                print("  '%s' (occurred %d times)" % (err, self.errors[err]))

        total = self.msgs_ok + self.msgs_fail
        print("Total Messages: %d" % total)

        delta_time = self.stop_time - self.start_time
        print("Test Interval: %f - %f (%f secs)" % (self.start_time,
                                                    self.stop_time,
                                                    delta_time))

        if delta_time > 0.0:
            print("Aggregate throughput: %f msgs/sec" % (float(total)/delta_time))

        latency = self.latency
        if latency.count:
            print("Latency %d samples (msecs): Average %f StdDev %f"
                  " Min %f Max %f"
                  % (latency.count,
                     latency.average(), latency.std_deviation(),
                     latency.min, latency.max))
            print("Latency Distribution: ")
            latency.print_distribution()


In [4]:
def load_stats(param):
    """Loads the stats for the controller output file."""
    
    controller_docker = os.path.join(RESULT_PATH, param["backup_dir"], "*controller*.log")
    # beware of the files _docker.log that would also match
    # take [0] to get rid of them for now
    files = glob.glob(controller_docker)
    controller_log = files[0]
    a = []
    with open(controller_log) as f:
        a = f.readlines()
        return json.loads(a[0]), json.loads(a[1])


In [5]:
params = []
with open(os.path.join(RESULT_PATH, "./params.json")) as f:
    params = json.load(f)

In [6]:
def build_agg_results(results):
    agg = TestResults()
    for result in results:
        result["latency"] = Stats(**result["latency"])
        agg.merge(TestResults(**result))
        
    duration = agg.stop_time - agg.start_time
    total = agg.msgs_ok + agg.msgs_fail
    rate = float(total)/duration
    result = agg.to_dict()
    result["rate"] = rate
    return result

def build_msgs_stats(results, msg_type):
    # NOTE(msimonin): we don't expect a TestResult here
    msgs = [r[msg_type] for r in results]
    return {
        "mean": statistics.mean(msgs),
        #"stdev": statistics.stdev(msgs),
        "min": min(msgs),
        "max": max(msgs)
    }
        
    
for param in params:
    clients, servers = load_stats(param)
    # what has been seen by ombt
    param["_ombt_clients"] = len(clients.values())
    param["_ombt_servers"] = len(servers.values())
    param["_ombt_msgs_sent_ok"] = build_msgs_stats(clients.values(), "msgs_ok")
    param["_ombt_msgs_received_ok"] = build_msgs_stats(servers.values(), "msgs_ok")
    #param["_raw_servers_test_result"] = servers
    #param["_raw_clients_test_result"] = clients
    param["_agg_servers"] = build_agg_results(servers.values())
    param["_agg_clients"] = build_agg_results(clients.values())


In [7]:
with open("params_calculated.json", "w") as f:
    json.dump(params, f)

In [8]:
def augment(mydict, myparams, in_key, out_key=None):
    out_key = out_key or in_key
    mydict.update({out_key: [p[in_key] for p in myparams]})

In [9]:
extraction = {}
augment(extraction, params, "_ombt_clients")
augment(extraction, params, "_ombt_servers")
#augment(extraction, params, "executor")
augment(extraction, params, "call_type")
extraction.update({
    "server_rate": [p["_agg_servers"]["rate"] for p in params]
})
extraction.update({
    "server_ok": [p["_agg_servers"]["msgs_ok"] for p in params]
})
extraction.update({
    "server_fail": [p["_agg_servers"]["msgs_fail"] for p in params]
})

extraction.update({
    "client_rate": [p["_agg_clients"]["rate"] for p in params]
})
extraction.update({
    "client_fail": [p["_agg_servers"]["msgs_fail"] for p in params]
})

#extraction.update({
#    "msgs_received_ok_min": [p["_ombt_msgs_received_ok"]["min"] for p in params]
#})

In [10]:
df = pandas.DataFrame(extraction)
df 

Unnamed: 0,_ombt_clients,_ombt_servers,call_type,client_fail,client_rate,server_fail,server_ok,server_rate
0,1,2,rpc-cast,0,9.75867,0,600,9.774918
1,1,4,rpc-cast,0,9.634269,0,600,9.650028
2,1,2,rpc-call,0,9.259531,0,600,9.274885
3,1,1,rpc-call,0,9.241458,0,600,9.256776
4,1,4,rpc-call,0,9.217608,0,600,9.232568
5,1,1,rpc-cast,0,9.774943,0,600,9.791177
6,2,2,rpc-call,0,18.518315,0,1200,18.548778
7,2,4,rpc-cast,0,19.458665,0,1200,19.490836
8,2,1,rpc-call,0,18.595975,0,1200,18.626868
9,2,1,rpc-cast,0,19.487089,0,1200,19.520133


In [11]:
params

[{'_agg_clients': {'errors': {},
   'latency': {'count': 600,
    'distribution': {'0': [0, 329, 220, 19, 11, 9, 3, 0, 1, 2],
     '1': [0, 5, 1, 0, 0, 0, 0, 0, 0, 0]},
    'max': 20.053863525390625,
    'min': 1.4388561248779297,
    'sum_of_squares': 4709.644185425077,
    'total': 1398.9717960357666},
   'msgs_fail': 0,
   'msgs_ok': 600,
   'rate': 9.758670380043927,
   'start_time': 1513767626.946077,
   'stop_time': 1513767688.429863},
  '_agg_servers': {'errors': {},
   'latency': {'count': 600,
    'distribution': {'0': [0, 0, 231, 290, 32, 15, 8, 8, 4, 3],
     '1': [0, 8, 1, 0, 0, 0, 0, 0, 0, 0]},
    'max': 20.769834518432617,
    'min': 2.418994903564453,
    'sum_of_squares': 9392.176728169943,
    'total': 2131.2007904052734},
   'msgs_fail': 0,
   'msgs_ok': 600,
   'rate': 9.774917681787988,
   'start_time': 1513767626.948911,
   'stop_time': 1513767688.330502},
  '_ombt_clients': 1,
  '_ombt_msgs_received_ok': {'max': 300, 'mean': 300, 'min': 300},
  '_ombt_msgs_sent_o

## recovering metrics from influxdb

### Initializing docker

In [12]:
import docker
from influxdb import InfluxDBClient

client = docker.from_env()

### Getting the interesting metrics from influx

In [44]:
import shutil
import tarfile
import time
from datetime import datetime


RABBITMQ_METRICS = {
    "rabbitmq_overview" : ["messages_delivered", "messages_published", "queues", "connections", "consumers", "exchanges"],
    "rabbitmq_node": ["mem_used"],
}


for param in params:
    # get experimentation boundaries
    start_time = max(param['_agg_clients']['start_time'], param['_agg_servers']['start_time'])
    stop_time = max(param['_agg_clients']['stop_time'], param['_agg_servers']['stop_time'])
    duration = stop_time - start_time
    start_utc = datetime.utcfromtimestamp(start_time)
    stop_utc = datetime.utcfromtimestamp(stop_time)
    print("start=%s, stop=%s" % (start_utc, stop_utc))
    # NOTE(msimonin): need to check big tar.gz behaviour
    tar = os.path.join(RESULT_PATH, param['backup_dir'], 'influxdb-data.tar.gz')
    tarfile.open(tar).extractall()
    #docker run --name influxdb -v $(pwd)/influxdb-data:/var/lib/influxdb -p 8083:8083 -p 8086:8086 -ti influxdb
    try:
        container = client.containers.run(
            'influxdb',
            detach=True,
            ports={'8086/tcp': 8086, '8083/tcp': 8083},
            volumes={os.path.join(os.getcwd(), 'influxdb-data'): {'bind': '/var/lib/influxdb', 'mode': 'rw'}}
        )    
        influx = InfluxDBClient(database='telegraf', timeout=600)
        # TODO(msimonin): make a tcp socket retry test on port 8083
        time.sleep(5)
        
        for serie, fields in RABBITMQ_METRICS.items():
            select = ["max(%s) as %s" % (field, field) for field in fields]
            select = ", ".join(select)
            query = "select %s from %s" % (select, serie)
            result = influx.query(query)
            # Put all metrics in params
            result = list(result.get_points())[0]
            param["_%s" % serie] = dict([[f, result[f]] for f in fields])
            
    except Exception as e:
        print(e)
    finally:
        container.remove(force=True)
        shutil.rmtree("influxdb-data")

start=2017-12-20 11:00:26.948911, stop=2017-12-20 11:01:28.429863
start=2017-12-20 11:03:51.740378, stop=2017-12-20 11:04:54.014989
start=2017-12-20 11:06:54.026870, stop=2017-12-20 11:07:58.820400
start=2017-12-20 11:09:49.252147, stop=2017-12-20 11:10:54.172407
start=2017-12-20 11:12:32.647069, stop=2017-12-20 11:13:37.737175
start=2017-12-20 11:15:21.114927, stop=2017-12-20 11:16:22.493403
start=2017-12-20 11:18:00.986661, stop=2017-12-20 11:19:05.783962
start=2017-12-20 11:20:58.598087, stop=2017-12-20 11:22:00.264299
start=2017-12-20 11:23:43.079623, stop=2017-12-20 11:24:47.605570
start=2017-12-20 11:27:55.789390, stop=2017-12-20 11:28:57.363517
start=2017-12-20 11:30:38.033616, stop=2017-12-20 11:31:43.016458
start=2017-12-20 11:33:29.161193, stop=2017-12-20 11:34:30.694697
start=2017-12-20 11:36:04.012677, stop=2017-12-20 11:37:06.006248
start=2017-12-20 11:38:50.939353, stop=2017-12-20 11:39:56.372391
start=2017-12-20 11:41:39.361536, stop=2017-12-20 11:42:44.289844
start=2017

In [48]:
extraction = {}
augment(extraction, params, "_ombt_clients")
augment(extraction, params, "_ombt_servers")
#augment(extraction, params, "executor")
augment(extraction, params, "call_type")
# put all rabbitmq stuffs
for serie, fields in RABBITMQ_METRICS.items():
    for field in fields:
        extraction.update({
            field : [p["_%s" % serie][field] for p in params]
        })


In [49]:
df = pandas.DataFrame(extraction)
df

Unnamed: 0,_ombt_clients,_ombt_servers,call_type,connections,consumers,exchanges,mem_used,messages_delivered,messages_published,queues
0,1,2,rpc-cast,5,7,11,72753152,602,602,6
1,1,4,rpc-cast,7,13,11,76439552,602,602,10
2,1,2,rpc-call,6,7,11,75595776,1202,1202,6
3,1,1,rpc-call,4,4,11,73408512,1186,1186,4
4,1,4,rpc-call,10,13,11,77037568,1202,1202,10
5,1,1,rpc-cast,4,4,11,73506816,547,547,4
6,2,2,rpc-call,9,8,12,77139968,2404,2404,7
7,2,4,rpc-cast,10,14,12,76935168,1204,1204,11
8,2,1,rpc-call,7,5,12,76660736,2199,2199,5
9,2,1,rpc-cast,7,5,12,74444800,1141,1141,5
