Skip to content

Commit

Permalink
fix pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
vgol authored and kmerenkov committed Aug 17, 2010
1 parent b495a1b commit cde8a0c
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions brukva/client.py
Expand Up @@ -14,6 +14,15 @@ def __init__(self, kind, channel, body):
self.channel = channel
self.body = body

class CmdLine(object):
def __init__(self, cmd, *args, **kwargs):
self.cmd = cmd
self.args = args
self.kwargs = kwargs

def __repr__(self):
return self.cmd + '(' + str(self.args) + ',' + str(self.kwargs) + ')'

def string_keys_to_dict(key_string, callback):
return dict([(key, callback) for key in key_string.split()])

Expand Down Expand Up @@ -219,7 +228,7 @@ def execute_command(self, cmd, callbacks, *args, **kwargs):
error = Exception('todo')
else:
try:
error, response = yield self.process_data(data, cmd_line) # Fixme, what if error occured during process_data ?
error, response = yield self.process_data(data, cmd_line)
result = self.format_reply(cmd, response)
except Exception, e:
error, result = e, None
Expand Down Expand Up @@ -649,29 +658,23 @@ def on_unsubscribed(self, result):
def publish(self, channel, message, callbacks=None):
self.execute_command('PUBLISH', callbacks, channel, message)

@process
def listen(self, callbacks=None):
# 'LISTEN' is just for exception information, it is not actually sent anywhere
callbacks = callbacks or []
if not hasattr(callbacks, '__iter__'):
callbacks = [callbacks]
if self.on_message not in callbacks:
callbacks = list(callbacks) + [self.on_message]
self.schedule('LISTEN', callbacks)
self.try_to_loop()

def on_message(self, _result):
if self.subscribed:
self.schedule('LISTEN', self.current_task.callbacks)
self.try_to_loop()

class CmdLine(object):
def __init__(self, cmd, *args, **kwargs):
self.cmd = cmd
self.args = args
self.kwargs = kwargs
yield self.connection.queue_wait()
while self.subscribed:
data = yield async(self.connection.readline)()
try:
error, response = yield self.process_data(data, CmdLine('LISTEN'))
result = self.format_reply('LISTEN', response)
except Exception, e:
error, result = e, None

def __repr__(self):
return self.cmd + '(' + str(self.args) + ',' + str(self.kwargs) + ')'
self.call_callbacks(callbacks, (error, result) )

class RespLine(object):
def _init_(self, cmd, *args, **kwargs):
Expand All @@ -685,7 +688,7 @@ def __init__(self, command_stack = None, callbacks = None):
self.callbacks = callbacks or []
self.responses = []

def format_request(self): #FIXME transactional `MULTI` & `EXEC` add here maybe
def format_request(self):
return ''.join([format(c.cmd, *c.args, **c.kwargs) for c in self.command_stack])

class Pipeline(Client):
Expand All @@ -694,8 +697,8 @@ def __init__(self, *args, **kwargs):
self.pipe_task = None

def execute_command(self, cmd, callbacks, *args, **kwargs):
if cmd in ('AUTH', 'SELECT'):
raise Exception('not implntd')
if cmd in ('AUTH'):
raise Exception('403')
if not self.pipe_task:
self.pipe_task = PipeTask()
self.pipe_task.command_stack.append(CmdLine(cmd, *args, **kwargs))
Expand Down

0 comments on commit cde8a0c

Please sign in to comment.