Permalink
Browse files

import from github.com/bitly/nsq/pynsq v0.3

  • Loading branch information...
0 parents commit 4be09fd3eb358d69a8d84b7341a1b0b1011bb3c8 @mreiferson mreiferson committed Oct 31, 2012
Showing with 814 additions and 0 deletions.
  1. +2 −0 .gitignore
  2. +94 −0 README.md
  3. +68 −0 nsq/BackoffTimer.py
  4. +405 −0 nsq/NSQReader.py
  5. +12 −0 nsq/__init__.py
  6. +108 −0 nsq/async.py
  7. +59 −0 nsq/nsq.py
  8. +50 −0 nsq/sync.py
  9. +16 −0 setup.py
@@ -0,0 +1,2 @@
+dist
+*.pyc
@@ -0,0 +1,94 @@
+## pynsq
+
+`pynsq` is a Python NSQ client library.
+
+It provides a high-level reader library for building consumers and two low-level modules for both
+sync and async communication over the NSQ protocol (if you wanted to write your own high-level
+functionality).
+
+The async module is built on top of the [Tornado IOLoop][tornado] and as such requires `tornado` be
+installed:
+
+`$ pip install tornado`
+
+### Reader
+
+Reader provides high-level functionality for building robust NSQ consumers in Python on top of the
+async module.
+
+Multiple reader instances can be instantiated in a single process (to consume from multiple
+topics/channels at once). Each specifying a set of tasks that will be called for each message over
+that channel. Tasks are defined as a dictionary of string names -> callables passed as
+`all_tasks` during instantiation.
+
+`preprocess_method` defines an optional callable that can alter the message data before other task
+functions are called.
+
+`validate_method` defines an optional callable that returns a boolean as to weather or not this
+message should be processed.
+
+`async` determines whether handlers will do asynchronous processing. If set to True, handlers must
+accept a keyword argument called `finisher` that will be a callable used to signal message
+completion (with a boolean argument indicating success).
+
+The library handles backoff as well as maintaining a sufficient RDY count based on the # of
+producers and your configured `max_in_flight`.
+
+Here is an example that demonstrates synchronous message processing:
+
+```python
+import nsq
+
+def task1(message):
+ print message
+ return True
+
+def task2(message):
+ print message
+ return True
+
+all_tasks = {"task1": task1, "task2": task2}
+r = nsq.Reader(all_tasks, lookupd_http_addresses=['http://127.0.0.1:4161'],
+ topic="nsq_reader", channel="asdf")
+nsq.run()
+```
+
+And async:
+
+```python
+"""
+This is a simple example of async processing with nsq.Reader.
+
+It will print "deferring processing" twice, and then print
+the last 3 messages that it received.
+
+Note in particular that we pass the `async=True` argument to Reader(),
+and also that we cache a different finisher callable with
+each message, to be called when we have successfully finished
+processing it.
+"""
+import nsq
+
+buf = []
+
+def process_message(message, finisher):
+ global buf
+ # cache both the message and the finisher callable for later processing
+ buf.append((message, finisher))
+ if len(buf) >= 3:
+ print '****'
+ for msg, finish_fxn in buf:
+ print msg
+ finish_fxn(True) # use finish_fxn to tell NSQ of success
+ print '****'
+ buf = []
+ else:
+ print 'deferring processing'
+
+all_tasks = {"task1": process_message}
+r = nsq.Reader(all_tasks, lookupd_http_addresses=['http://127.0.0.1:4161'],
+ topic="nsq_reader", channel="async", async=True)
+nsq.run()
+```
+
+[tornado]: https://github.com/facebook/tornado
@@ -0,0 +1,68 @@
+from decimal import Decimal
+
+
+def _Decimal(v):
+ if not isinstance(v, Decimal):
+ return Decimal(str(v))
+ return v
+
+
+class BackoffTimer(object):
+ """
+ This is a timer that is smart about backing off exponentially when there are problems
+ """
+ def __init__(self, min_interval, max_interval, ratio=.25, short_length=10, long_length=250):
+ assert isinstance(min_interval, (int, float, Decimal))
+ assert isinstance(max_interval, (int, float, Decimal))
+
+ self.min_interval = _Decimal(min_interval)
+ self.max_interval = _Decimal(max_interval)
+
+ self.max_short_timer = (self.max_interval - self.min_interval) * _Decimal(ratio)
+ self.max_long_timer = (self.max_interval - self.min_interval) * (1 - _Decimal(ratio))
+ self.short_unit = self.max_short_timer / _Decimal(short_length)
+ self.long_unit = self.max_long_timer / _Decimal(long_length)
+
+ self.short_interval = Decimal(0)
+ self.long_interval = Decimal(0)
+
+ def success(self):
+ """Update the timer to reflect a successfull call"""
+ self.short_interval -= self.short_unit
+ self.long_interval -= self.long_unit
+ self.short_interval = max(self.short_interval, Decimal(0))
+ self.long_interval = max(self.long_interval, Decimal(0))
+
+ def failure(self):
+ """Update the timer to reflect a failed call"""
+ self.short_interval += self.short_unit
+ self.long_interval += self.long_unit
+ self.short_interval = min(self.short_interval, self.max_short_timer)
+ self.long_interval = min(self.long_interval, self.max_long_timer)
+
+ def get_interval(self):
+ return float(self.min_interval + self.short_interval + self.long_interval)
+
+
+def test_timer():
+ timer = BackoffTimer(.1, 120, long_length=1000)
+ assert timer.get_interval() == .1
+ timer.success()
+ assert timer.get_interval() == .1
+ timer.failure()
+ interval = '%0.2f' % timer.get_interval()
+ assert interval == '3.19'
+ assert timer.min_interval == Decimal('.1')
+ assert timer.short_interval == Decimal('2.9975')
+ assert timer.long_interval == Decimal('0.089925')
+
+ timer.failure()
+ interval = '%0.2f' % timer.get_interval()
+ assert interval == '6.27'
+ timer.success()
+ interval = '%0.2f' % timer.get_interval()
+ assert interval == '3.19'
+ for i in range(25):
+ timer.failure()
+ interval = '%0.2f' % timer.get_interval()
+ assert interval == '32.41'
Oops, something went wrong.

0 comments on commit 4be09fd

Please sign in to comment.