Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDY handling on new connections #38

Merged
merged 1 commit into from
Jul 10, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions nsq/Reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(self, topic, channel, message_handler=None, name=None,
self.lookupd_poll_interval = lookupd_poll_interval
self.lookupd_poll_jitter = lookupd_poll_jitter
self.heartbeat_interval = int(heartbeat_interval * 1000)
self.random_rdy_ts = time.time()

self.backoff_timer = BackoffTimer.BackoffTimer(0, max_backoff_duration)
self.backoff_timeout = None
Expand Down Expand Up @@ -313,7 +314,19 @@ def _handle_message(self, conn, message):
self.total_rdy = max(self.total_rdy - 1, 0)
conn.in_flight += 1

self._maybe_update_rdy(conn)
rdy_conn = conn
if len(self.conns) > self.max_in_flight:
# if all connections aren't getting RDY
# occsionally randomize which connection gets RDY
time_since_random_rdy = time.time() - self.random_rdy_ts
if time_since_random_rdy > 30:
self.random_rdy_ts = time.time()
conns_with_no_rdy = [c for c in self.conns.itervalues() if not c.rdy]
rdy_conn = random.choice(conns_with_no_rdy)
if rdy_conn is not conn:
logging.info('[%s:%s] redistributing RDY to %s', conn.id, self.name, rdy_conn.id)

self._maybe_update_rdy(rdy_conn)

success = False
try:
Expand Down Expand Up @@ -383,7 +396,7 @@ def _start_backoff_block(self):

self.backoff_timeout = self.ioloop.add_timeout(time.time() + backoff_interval, self._finish_backoff_block)

def _rdy_disabled(self, conn, value):
def _rdy_retry(self, conn, value):
conn.rdy_timeout = None
self._send_rdy(conn, value)

Expand All @@ -394,14 +407,19 @@ def _send_rdy(self, conn, value):

if value and self.disabled():
logging.info('[%s:%s] disabled, delaying RDY state change', conn.id, self.name)
rdy_disabled_callback = functools.partial(self._rdy_disabled, conn, value)
conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 15, rdy_disabled_callback)
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 15, rdy_retry_callback)
return

if value > conn.max_rdy_count:
value = conn.max_rdy_count

if (self.total_rdy + value) > self.max_in_flight:
if not conn.rdy:
# if we're going from RDY 0 to non-0 and we couldn't because
# of the configured max in flight, try again
rdy_retry_callback = functools.partial(self._rdy_retry, conn, value)
conn.rdy_timeout = self.ioloop.add_timeout(time.time() + 5, rdy_retry_callback)
return

try:
Expand Down