Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
Merge pull request #6 from zopefoundation/collapse-writes
Browse files Browse the repository at this point in the history
Collapse writes and disable nagle's algorithm
  • Loading branch information
freddrake committed Jun 4, 2014
2 parents 4ebde07 + 995bb6c commit 1a041df
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 17 deletions.
2 changes: 1 addition & 1 deletion buildout.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[buildout]
extends = versions.cfg
develop = .
parts = py ctl
parts = py

[zookeeper]
recipe = zc.zookeeperrecipes:devtree
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
name, version = 'zc.resumelb', '0'
name, version = 'zc.resumelb', '0.7.2'

install_requires = [
'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
Expand Down
8 changes: 8 additions & 0 deletions src/zc/resumelb/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,17 @@ maintaining ZooKeeper trees.
Change History
==============

- Added some optimizations to reduce latency between load balancers
and workers.

0.7.2 (2014-06-02)
------------------

- Added keep-alive messages from load balancers to workers to detect
workers that have gone away uncleanly.

(Note that workers don't have to be updated.)

0.7.1 (2012-10-17)
------------------

Expand Down
9 changes: 6 additions & 3 deletions src/zc/resumelb/lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import llist
import logging
import re
import socket
import sys
import time
import webob
Expand Down Expand Up @@ -56,11 +57,13 @@ def connect(self, addr, workletts, version):
try:
while addr in self.worker_addrs:
try:
socket = gevent.socket.create_connection(addr)
Worker(self.pool, socket, addr, version)
sock = gevent.socket.create_connection(addr)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

Worker(self.pool, sock, addr, version)
except gevent.GreenletExit, v:
try:
socket.close()
sock.close()
except:
pass
raise
Expand Down
45 changes: 37 additions & 8 deletions src/zc/resumelb/util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from struct import pack, unpack
from marshal import loads, dumps, dump, load

import errno
import gevent.queue
import logging
Expand Down Expand Up @@ -62,21 +63,49 @@ def write_message(sock, rno, *a):
raise
data = data[sent:]

SEND_SIZE = 1 << 16
KEEP_ALIVE = pack(">II", 0, 1) + 'N'
def writer(writeq, sock, multiplexer):
get = writeq.get
write_message_ = write_message
timeout = multiplexer.write_keepalive_interval
Empty = gevent.queue.Empty
send = sock.send
dumps_ = dumps
pack_ = pack
while 1:
try:
rno, data = get(True, timeout)
except gevent.queue.Empty:
rno = 0
data = None
try:
write_message_(sock, rno, data)
except Disconnected:
multiplexer.disconnected()
return
except Empty:
data = KEEP_ALIVE
else:
to_send = []
append = to_send.append
nsend = 0
while 1:
data = dumps_(data)
append(pack_(">II", rno, len(data)))
append(data)
nsend += len(data) + 8
if nsend > SEND_SIZE:
break
try:
rno, data = get(False)
except Empty:
break
data = ''.join(to_send)

while data:
try:
sent = send(data)
except socket.error, err:
if err.args[0] in disconnected_errors or sock.closed:
logger.debug("write_message disconnected %s", sock)
multiplexer.disconnected()
return
else:
raise
data = data[sent:]


queue_size_bytes = 99999
Expand Down
6 changes: 3 additions & 3 deletions src/zc/resumelb/zk.test
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ Then we'll make a simpler GET request:
... ''')

>>> print sock.recv(9999) # doctest: +ELLIPSIS
HTTP/1.0 200 OK...
HTTP/1.1 200 OK...

If we create another worker, it will be seen by the load
balancer. This time, we're not going to run the worker in test mode.
Expand Down Expand Up @@ -418,7 +418,7 @@ Let's do a request.
... \r
... ''')
>>> print sock.recv(9999) # doctest: +ELLIPSIS
HTTP/1.0 200 OK...
HTTP/1.1 200 OK...

>>> sock.close()

Expand Down Expand Up @@ -492,7 +492,7 @@ Finally, let's test that:
... \r
... ''')
>>> print sock.recv(9999) # doctest: +ELLIPSIS
HTTP/1.0 200 OK...
HTTP/1.1 200 OK...

>>> lb_greenlet.kill()

Expand Down
2 changes: 1 addition & 1 deletion versions.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ versions = versions
#allow-picked-versions = false

[versions]
gevent = 1.0b2zc1
gevent = 1.0.1
greenlet = 0.4.0
llist = 0.3
manuel = 1.6.0
Expand Down

0 comments on commit 1a041df

Please sign in to comment.