Permalink
Browse files

new redis client

  • Loading branch information...
1 parent cc1a418 commit eced9b90a30bf4023668ff30204c0009f535b5d1 @fiorix fiorix committed Feb 17, 2012
Showing with 22 additions and 20 deletions.
  1. +6 −1 cyclone/redis.py
  2. +1 −1 demos/httpauth/httpauthdemo_redis.py
  3. +15 −18 demos/redis/redisdemo.py
View
@@ -1062,7 +1062,7 @@ def messageReceived(self, pattern, channel, message):
pass
def replyReceived(self, reply):
- if type(reply) is types.ListType:
+ if isinstance(reply, list):
if reply[-3] == u"message":
self.messageReceived(None, *reply[-2:])
else:
@@ -1088,6 +1088,11 @@ def punsubscribe(self, patterns):
patterns = [patterns]
return self.execute_command("PUNSUBSCRIBE", *patterns)
+class SubscriberFactory(protocol.ReconnectingClientFactory):
+ maxDelay = 120
+ continueTrying = True
+ protocol = SubscriberProtocol
+
class ConnectionHandler(object):
def __init__(self, factory):
@@ -29,7 +29,7 @@
class Application(cyclone.web.Application):
def __init__(self):
# Defaults to localhost:6379, dbid=0
- redisdb = cyclone.redis.lazyRedisConnectionPool()
+ redisdb = cyclone.redis.lazyConnectionPool()
handlers = [
(r"/", IndexHandler, dict(redisdb=redisdb)),
View
@@ -23,16 +23,13 @@
import cyclone.web
import cyclone.redis
-from cyclone.redis.protocol import SubscriberProtocol
from twisted.python import log
from twisted.internet import defer, reactor
class Application(cyclone.web.Application):
def __init__(self):
- RedisMixin().makeItFunk("127.0.0.1", 6379, 0, 10)
-
handlers = [
(r"/text/(.+)", TextHandler),
(r"/queue/(.+)", QueueHandler),
@@ -42,6 +39,7 @@ def __init__(self):
static_path="./frontend/static",
template_path="./frontend/template",
)
+ RedisMixin.setup("127.0.0.1", 6379, 0, 10)
cyclone.web.Application.__init__(self, handlers, **settings)
@@ -50,19 +48,17 @@ class RedisMixin(object):
psconn = None
channels = collections.defaultdict(lambda: [])
- def makeItFunk(self, host, port, dbid, pool_size):
- cls = RedisMixin
-
+ @classmethod
+ def setup(self, host, port, dbid, poolsize):
# PubSub client connection
qf = cyclone.redis.SubscriberFactory()
qf.maxDelay = 20
- qf.continueTrying = True # Auto-reconnect
qf.protocol = QueueProtocol
reactor.connectTCP(host, port, qf)
# Normal client connection
- cls.dbconn = cyclone.redis.lazyRedisConnectionPool(host, port, db=dbid,
- pool_size=pool_size)
+ RedisMixin.dbconn = cyclone.redis.lazyConnectionPool(host, port,
+ dbid, poolsize)
def subscribe(self, channel):
if RedisMixin.psconn is None:
@@ -99,7 +95,9 @@ def unsubscribe_all(self, ign):
RedisMixin.psconn.unsubscribe(channel)
def broadcast(self, pattern, channel, message):
- peers = RedisMixin.channels.get(pattern or channel)
+ peers = self.channels.get(pattern or channel)
+ if not peers:
+ return
# Broadcast the message to all peers in channel
for peer in peers:
@@ -113,7 +111,7 @@ class TextHandler(cyclone.web.RequestHandler, RedisMixin):
@defer.inlineCallbacks
def get(self, key):
try:
- value = yield RedisMixin.dbconn.get(key)
+ value = yield self.dbconn.get(key)
except Exception, e:
log.err("Redis failed to get('%s'): %s" % (key, str(e)))
raise cyclone.web.HTTPError(503)
@@ -125,7 +123,7 @@ def get(self, key):
def post(self, key):
value = self.get_argument("value")
try:
- yield RedisMixin.dbconn.set(key, value)
+ yield self.dbconn.set(key, value)
except Exception, e:
log.err("Redis failed to set('%s', '%s'): %s" % (key, value, str(e)))
raise cyclone.web.HTTPError(503)
@@ -136,7 +134,7 @@ def post(self, key):
@defer.inlineCallbacks
def delete(self, key):
try:
- n = yield RedisMixin.dbconn.delete(key)
+ n = yield self.dbconn.delete(key)
except Exception, e:
log.err("Redis failed to del('%s'): %s" % (key, str(e)))
raise cyclone.web.HTTPError(503)
@@ -161,7 +159,7 @@ def get(self, channels):
functools.partial(RedisMixin.unsubscribe_all, self))
for channel in channels:
- RedisMixin.subscribe(self, channel)
+ self.subscribe(channel)
self.write("subscribed to %s\r\n" % channel)
self.flush()
@@ -170,8 +168,7 @@ def post(self, channel):
message = self.get_argument("message")
try:
- n = yield RedisMixin.dbconn.publish(channel,
- message.encode("utf-8"))
+ n = yield self.dbconn.publish(channel, message.encode("utf-8"))
except Exception, e:
log.msg("Redis failed to publish('%s', '%s'): %s" % \
(channel, repr(message), str(e)))
@@ -181,7 +178,7 @@ def post(self, channel):
self.finish("OK %d\r\n" % n)
-class QueueProtocol(SubscriberProtocol, RedisMixin):
+class QueueProtocol(cyclone.redis.SubscriberProtocol, RedisMixin):
def messageReceived(self, pattern, channel, message):
# When new messages are published to Redis channels or patterns,
# they are broadcasted to all HTTP clients subscribed to those
@@ -193,7 +190,7 @@ def connectionMade(self):
# If we lost connection with Redis during operation, we
# re-subscribe to all channels once the connection is re-established.
- for channel in RedisMixin.channels:
+ for channel in self.channels:
if "*" in channel:
self.psubscribe(channel)
else:

0 comments on commit eced9b9

Please sign in to comment.