Skip to content

Commit

Permalink
hidden --profile option to enable profiling of warc writer thread and…
Browse files Browse the repository at this point in the history
… periodic logging of memory usage info; at shutdown, close stats db and unregister from service registry; logging improvements
  • Loading branch information
nlevitt committed Jan 27, 2016
1 parent 7eb82ab commit e3a5717
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ warcs
build
dist
.tox
out.*
17 changes: 10 additions & 7 deletions warcprox/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ def __init__(self, proxy=None, warc_writer_thread=None,

def debug_mem(self):
self.logger.info("self.proxy.recorded_url_q.qsize()=%s", self.proxy.recorded_url_q.qsize())
if self.proxy.stats_db and hasattr(self.proxy.stats_db, "_executor"):
self.logger.info("self.proxy.stats_db._executor._work_queue.qsize()=%s",
self.proxy.stats_db._executor._work_queue.qsize())
with open("/proc/self/status") as f:
for line in f:
fields = line.split()
Expand Down Expand Up @@ -118,6 +115,7 @@ def _service_heartbeat(self):
'port': self.options.port,
}
status_info['load'] = 1.0 * self.proxy.recorded_url_q.qsize() / (self.proxy.recorded_url_q.maxsize or 100)
status_info['queue_size'] = self.proxy.recorded_url_q.qsize()

self.status_info = self.service_registry.heartbeat(status_info)
self.logger.debug("status in service registry: %s", self.status_info)
Expand Down Expand Up @@ -154,9 +152,9 @@ def utcoffset(self, dt): return datetime.timedelta(0)
if self.service_registry and (not hasattr(self, "status_info") or (datetime.datetime.now(utc) - self.status_info["last_heartbeat"]).total_seconds() > self.HEARTBEAT_INTERVAL):
self._service_heartbeat()

# if (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60:
# self.debug_mem()
# last_mem_dbg = datetime.datetime.utcnow()
if self.options.profile and (datetime.datetime.utcnow() - last_mem_dbg).total_seconds() > 60:
self.debug_mem()
last_mem_dbg = datetime.datetime.utcnow()

time.sleep(0.5)
except:
Expand All @@ -176,10 +174,15 @@ def utcoffset(self, dt): return datetime.timedelta(0)
# wait for threads to finish
self.warc_writer_thread.join()

if self.warc_writer_thread.dedup_db is not None:
if self.proxy.stats_db:
self.proxy.stats_db.close()
if self.warc_writer_thread.dedup_db:
self.warc_writer_thread.dedup_db.close()

proxy_thread.join()
if self.playback_proxy is not None:
playback_proxy_thread.join()

if self.service_registry and hasattr(self, "status_info"):
self.service_registry.unregister(self.status_info["id"])

2 changes: 2 additions & 0 deletions warcprox/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def _build_arg_parser(prog=os.path.basename(sys.argv[0])):
default=None, help='kafka capture feed topic')
arg_parser.add_argument('--queue-size', dest='queue_size', default=1000,
help=argparse.SUPPRESS)
arg_parser.add_argument('--profile', action='store_true', default=False,
help=argparse.SUPPRESS)
arg_parser.add_argument('--version', action='version',
version="warcprox {}".format(warcprox.__version__))
arg_parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
Expand Down
3 changes: 2 additions & 1 deletion warcprox/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import warcprox
import threading
import rethinkdb as r
import datetime

def _empty_bucket(bucket):
return {
Expand Down Expand Up @@ -160,7 +161,7 @@ def _update_batch(self):

if not self._stop.is_set():
self._timer = threading.Timer(0.5, self._update_batch)
self._timer.name = "RethinkCaptures-batch-insert-timer"
self._timer.name = "RethinkStats-batch-update-timer-%s" % datetime.datetime.utcnow().isoformat()
self._timer.start()
else:
self.logger.info("finished")
Expand Down
9 changes: 5 additions & 4 deletions warcprox/warcproxy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
# vim:set sw=4 et:
#
"""
WARC writing MITM HTTP/S proxy
Expand Down Expand Up @@ -151,8 +150,8 @@ def _enforce_limits(self, warcprox_meta):
key, limit = item
bucket0, bucket1, bucket2 = key.rsplit(".", 2)
value = self.server.stats_db.value(bucket0, bucket1, bucket2)
# self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s",
# warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize())
self.logger.debug("warcprox_meta['limits']=%s stats['%s']=%s recorded_url_q.qsize()=%s",
warcprox_meta['limits'], key, value, self.server.recorded_url_q.qsize())
if value and value >= limit:
body = "request rejected by warcprox: reached limit {}={}\n".format(key, limit).encode("utf-8")
self.send_response(420, "Reached limit")
Expand Down Expand Up @@ -369,7 +368,7 @@ def __init__(self, ca=None, recorded_url_q=None, stats_db=None, options=warcprox
if recorded_url_q is not None:
self.recorded_url_q = recorded_url_q
else:
self.recorded_url_q = queue.Queue()
self.recorded_url_q = queue.Queue(maxsize=options.queue_size or 1000)

self.stats_db = stats_db

Expand All @@ -383,6 +382,8 @@ def server_close(self):
self.logger.info('WarcProxy shutting down')
http_server.HTTPServer.server_close(self)

def handle_error(self, request, client_address):
self.logger.warn("exception processing request %s from %s", request, client_address, exc_info=True)

class WarcProxy(socketserver.ThreadingMixIn, SingleThreadedWarcProxy):
pass
10 changes: 9 additions & 1 deletion warcprox/writerthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ def __init__(self, recorded_url_q=None, writer_pool=None, dedup_db=None, listene
self.idle = None

def run(self):
cProfile.runctx('self._run()', globals(), locals(), sort='cumulative')
if self.options.profile:
cProfile.runctx('self._run()', globals(), locals(), sort='cumulative')
else:
self._run()

def _run(self):
while not self.stop.is_set():
try:
self.name = 'WarcWriterThread(tid={})'.format(warcprox.gettid())
while True:
try:
if self.stop.is_set():
qsize = self.recorded_url_q.qsize()
if qsize % 50 == 0:
self.logger.info("%s urls left to write", qsize)

recorded_url = self.recorded_url_q.get(block=True, timeout=0.5)
self.idle = None
if self.dedup_db:
Expand Down

0 comments on commit e3a5717

Please sign in to comment.