Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
Added keep-alive messages from load balancers to workers to detect wo…
Browse files Browse the repository at this point in the history
…rkers that have gone away uncleanly.
  • Loading branch information
Jim Fulton committed Jun 2, 2014
1 parent b8ae3ff commit ce84bad
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/zc/resumelb/README.txt
Expand Up @@ -242,6 +242,9 @@ maintaining ZooKeeper trees.
Change History
==============

- Added keep-alive messages from load balancers to workers to detect
workers that have gone away uncleanly.

0.7.1 (2012-10-17)
------------------

Expand Down
7 changes: 7 additions & 0 deletions src/zc/resumelb/lb.py
Expand Up @@ -437,9 +437,16 @@ def content():
del self.requests[rno]

def disconnected(self):
# keep-alive messages send after disconnecting can cause
# disconnected to be called a second time. To avoid this, we
# replace the method on the instance with a noop.
self.disconnected = already_disconnected
self.pool.remove(self)
zc.resumelb.util.Worker.disconnected(self)

def already_disconnected():
pass

def parse_addr(addr):
host, port = addr.split(':')
return host, int(port)
Expand Down
19 changes: 18 additions & 1 deletion src/zc/resumelb/lb.test
Expand Up @@ -35,7 +35,7 @@ to these addresses. Let's wait for it to do so:
When the workers get connections, they send the lb their resumes:

>>> worker1, worker2 = [w.socket for w in workers]
>>> from zc.resumelb.util import read_message, write_message
>>> from zc.resumelb.util import write_message

>>> write_message(worker1, 0, {'h1.com': 10.0})
>>> write_message(worker2, 0, {'h2.com': 10.0})
Expand All @@ -50,6 +50,12 @@ supposed to go.
>>> app1 = webtest.TestApp(lb.handle_wsgi)
>>> g1 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])

>>> import zc.resumelb.util
>>> def read_message(sock): # filter out keep alives
... while 1: # we'll look at them later
... rno, data = zc.resumelb.util.read_message(sock)
... if rno != 0 or data is not None:
... return rno, data
>>> rno, env1 = read_message(worker1)

>>> rno
Expand Down Expand Up @@ -194,6 +200,17 @@ We'll interleave data for the 2 responses on worker1
>>> g2.value.status, g2.value.body == '2'*10000
('200 OK', True)

Keep-alive messages
===================

Periodically, the lb sends keep-alive messages to workers:

>>> gevent.sleep(.15)
>>> zc.resumelb.util.read_message(worker1)
(0, None)

workers ignore these.

Disconnecting clients
=====================

Expand Down
7 changes: 6 additions & 1 deletion src/zc/resumelb/protocol.txt
Expand Up @@ -8,7 +8,12 @@ consist of:
- 4-byte request number
- Marshalled data

The marshalled data can be:
If request number is 0, the marshalled data:

- resume dict from worker to lb
- None from lb->worker, ignored keep alive

For request number > 0, the marshalled data can be:

- environment dict from LB->worker
- string part of HTTP message body
Expand Down
5 changes: 5 additions & 0 deletions src/zc/resumelb/tests.py
Expand Up @@ -423,6 +423,11 @@ def setUp(test):
test.globs['print_response'] = print_response
test.globs['spawn'] = spawn
test.globs['Worker'] = FauxWorker
old_write_keepalive_interval = zc.resumelb.util.LBWorker.write_keepalive_interval
zc.resumelb.util.LBWorker.write_keepalive_interval = 0.1
zope.testing.setupstack.register(
test, setattr, zc.resumelb.util.LBWorker,
'write_keepalive_interval', old_write_keepalive_interval)

def zkSetUp(test):
setUp(test)
Expand Down
13 changes: 11 additions & 2 deletions src/zc/resumelb/util.py
Expand Up @@ -55,7 +55,7 @@ def write_message(sock, rno, *a):
try:
sent = sock.send(data)
except socket.error, err:
if err.args[0] in disconnected_errors:
if err.args[0] in disconnected_errors or sock.closed:
logger.debug("write_message disconnected %s", sock)
raise Disconnected()
else:
Expand All @@ -65,8 +65,13 @@ def write_message(sock, rno, *a):
def writer(writeq, sock, multiplexer):
get = writeq.get
write_message_ = write_message
timeout = multiplexer.write_keepalive_interval
while 1:
rno, data = get()
try:
rno, data = get(True, timeout)
except gevent.queue.Empty:
rno = 0
data = None
try:
write_message_(sock, rno, data)
except Disconnected:
Expand Down Expand Up @@ -191,6 +196,8 @@ class Worker:

ReadQueue = gevent.queue.Queue

write_keepalive_interval = None

def connected(self, socket, addr=None):
if addr is None:
addr = socket.getpeername()
Expand Down Expand Up @@ -230,6 +237,8 @@ def disconnected(self):

class LBWorker(Worker):

write_keepalive_interval = 0.1

ReadQueue = BufferedQueue

def connected(self, socket, addr=None):
Expand Down
11 changes: 11 additions & 0 deletions src/zc/resumelb/worker.test
Expand Up @@ -194,6 +194,17 @@ In this example, we've also requested a very large output.
<BLANKLINE>
1200000

Keep-alive messages
===================

Occassionally, the server will send keep-alive messages to the worker.
These are ignored. They're used by the server to detect that a worker
has been disconnected uncleanly.

>>> write_message(worker_socket, 0, None)

These messages are ignored by the worker.

Multiple connections (multiple load balancers)
==============================================

Expand Down

0 comments on commit ce84bad

Please sign in to comment.