Skip to content

Commit

Permalink
feature(zk-dump): add support for profiling latencies by path/type
Browse files Browse the repository at this point in the history
This adds two new flags to zk-dump:

* --measure-latency to collect & report per path/type latencies
* --aggregation-depth to do this per aggregated path

This is quite useful to find slow operations, specially
for x-DC clusters.

Signed-off-by: Raul Gutierrez S <rgs@itevenworks.net>
  • Loading branch information
rgs1 committed Jun 19, 2015
1 parent ebd85c2 commit a5d9a4e
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Bug Handling

Features
********
-
- zk-dump: add support for profiling latencies by path/type

Breaking changes
****************
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mock
nose
psutil>=2.1.0
scapy==2.3.1
tabulate
twitter.common.app==0.3.3
twitter.common.collections==0.3.3
twitter.common.decorators==0.3.3
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_version():
'hexdump',
'psutil>=2.1.0',
'scapy==2.3.1',
'tabulate',
'twitter.common.app==0.3.3',
'twitter.common.collections==0.3.3',
'twitter.common.decorators==0.3.3',
Expand Down
128 changes: 118 additions & 10 deletions zktraffic/cli/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from zktraffic.base.zookeeper import OpCodes

import colors

from tabulate import tabulate
from twitter.common import app
from twitter.common.log.options import LogOptions

Expand Down Expand Up @@ -67,8 +69,12 @@ def setup():
help='If unable to to deserialize a packet, print it out')
app.add_option('--count-requests', default=0, type=int,
help='Count N requests and report a summary (default: group by path)')
app.add_option('--measure-latency', default=0, type=int,
help='Measure latency of N pairs of requests and replies (default: group by path')
app.add_option('--group-by', default='path', type=str,
help='Only makes sense with --count-requests. Possible values: path or type')
help='Used with --count-requests or --measure-latency. Possible values: path or type')
app.add_option("--aggregation-depth", default=0, type=int,
help="Aggregate paths up to a certain depth. Used with --count-requests or --measure-latency")
app.add_option('--version', default=False, action='store_true')


Expand Down Expand Up @@ -125,6 +131,10 @@ def simple_write(self, *msgs):
sys.stdout.write("%s%s %s" % (right_arrow(i), format_timestamp(m.timestamp), m))
sys.stdout.flush()

def cancel(self):
""" will be called on KeyboardInterrupt """
pass


class DefaultPrinter(BasePrinter):
def __init__(self, colors, loopback):
Expand Down Expand Up @@ -196,9 +206,9 @@ def event_handler(self, evt):

class CountPrinter(BasePrinter):
""" use to accumulate up to N requests and then print a summary """
def __init__(self, count, group_by, loopback):
def __init__(self, count, group_by, loopback, aggregation_depth):
super(CountPrinter, self).__init__(False, loopback)
self.count, self.group_by = count, group_by
self.count, self.group_by, self.aggregation_depth = count, group_by, aggregation_depth
self.seen = 0
self.requests = defaultdict(int)

Expand Down Expand Up @@ -227,11 +237,98 @@ def _add(self, msg):

# eventually we should grab a lock here, but as of now
# this is only called from a single thread.
key = msg.path if self.group_by == "path" else msg.name
if self.group_by == "path":
key = msg.path
else:
key = msg.name
self.requests[key] += 1
self.seen += 1


class LatencyPrinter(BasePrinter):
""" measures latencies between requests and replies """
def __init__(self, count, group_by, loopback):
super(LatencyPrinter, self).__init__(False, loopback)
self._count, self._group_by = count, group_by
self._seen = 0
self._latencies_by_group = defaultdict(list)
self._requests_by_client = defaultdict(Requests)
self._replies = deque()
self._report_done = False

def run(self):
self.wait_for_requests()
self.report()

def wait_for_requests(self):
""" spin until we've collected all requests """
while self._seen < self._count:
try:
rep = self._replies.popleft()
except IndexError:
time.sleep(0.001)
continue

reqs = self._requests_by_client[rep.client].pop(rep.xid)
if not reqs:
continue

req = reqs[0]
key = req.path if self._group_by == "path" else req.name
latency = rep.timestamp - req.timestamp
self._latencies_by_group[key].append(latency)
self._seen += 1

# update status
sys.stdout.write("\rCollecting (%d/%d)" % (self._seen, self._count))
sys.stdout.flush()

def report(self):
""" calculate & display latencies """

# TODO: this should be protected by a lock
if self._report_done:
return
if self._seen < self._count: # for wait_for_requests to finish
self._seen = self._count
self._report_done = True

# clear the line
sys.stdout.write("\r")

results = {}
for key, latencies in self._latencies_by_group.items():
result = {}
result["avg"] = sum(latencies) / len(latencies)
results[key] = result

headers = [self._group_by, "avg"]
data = []

# sort by avg latency
for key, result in sorted(results.items(), key=lambda it: it[1]["avg"], reverse=True):
data.append(tuple([key, result["avg"]]))

sys.stdout.write("%s\n" % tabulate(data, headers=headers))
sys.stdout.flush()

def cancel(self):
""" if we were interrupted, but haven't reported; do it now """
self.report()

def request_handler(self, req):
# close requests don't have a reply, so ignore
if req.opcode != OpCodes.CLOSE:
self._requests_by_client[req.client].add(req)

def reply_handler(self, rep):
self._replies.append(rep)

def event_handler(self, evt):
""" events are asynchronously generated by the server, so we can't measure latency """
pass


def expand_hosts(hosts):
""" given a list of hosts, expand to its IPs """
ips = set()
Expand Down Expand Up @@ -259,6 +356,12 @@ def get_ips(host, port=0):
return ips


def validate_group_by(group_by):
if group_by not in ["path", "type"]:
sys.stderr.write("Unknown value for --group-by, use 'path' or 'type'.\n")
sys.exit(1)


def main(_, options):

if options.version:
Expand Down Expand Up @@ -289,12 +392,17 @@ def main(_, options):

loopback = options.iface in ["lo", "lo0"]

if options.count_requests > 0:
if options.group_by not in ["path", "type"]:
sys.stderr.write("Unknown value for --group-by, use 'path' or 'type'.\n")
sys.exit(1)
if options.count_requests > 0 and options.measure_latency > 0:
sys.stderr.write("The flags --count-requests and --measure-latency can't be mixed.\n")
sys.exit(1)

p = CountPrinter(options.count_requests, options.group_by, loopback)
if options.count_requests > 0:
validate_group_by(options.group_by)
p = CountPrinter(options.count_requests, options.group_by, loopback, options.aggregation_depth)
elif options.measure_latency > 0:
validate_group_by(options.group_by)
p = LatencyPrinter(
options.measure_latency, options.group_by, loopback, options.aggregation_depth)
elif options.unpaired:
p = UnpairedPrinter(options.colors, loopback)
else:
Expand All @@ -314,7 +422,7 @@ def main(_, options):
while p.isAlive():
time.sleep(0.5)
except (KeyboardInterrupt, SystemExit):
pass
p.cancel()

# shutdown sniffer
sniffer.stop()
Expand Down

0 comments on commit a5d9a4e

Please sign in to comment.