diff --git a/brukva/client.py b/brukva/client.py index c8414c3..1c0caa7 100644 --- a/brukva/client.py +++ b/brukva/client.py @@ -14,6 +14,15 @@ def __init__(self, kind, channel, body): self.channel = channel self.body = body +class CmdLine(object): + def __init__(self, cmd, *args, **kwargs): + self.cmd = cmd + self.args = args + self.kwargs = kwargs + + def __repr__(self): + return self.cmd + '(' + str(self.args) + ',' + str(self.kwargs) + ')' + def string_keys_to_dict(key_string, callback): return dict([(key, callback) for key in key_string.split()]) @@ -219,7 +228,7 @@ def execute_command(self, cmd, callbacks, *args, **kwargs): error = Exception('todo') else: try: - error, response = yield self.process_data(data, cmd_line) # Fixme, what if error occured during process_data ? + error, response = yield self.process_data(data, cmd_line) result = self.format_reply(cmd, response) except Exception, e: error, result = e, None @@ -649,29 +658,23 @@ def on_unsubscribed(self, result): def publish(self, channel, message, callbacks=None): self.execute_command('PUBLISH', callbacks, channel, message) + @process def listen(self, callbacks=None): # 'LISTEN' is just for exception information, it is not actually sent anywhere callbacks = callbacks or [] if not hasattr(callbacks, '__iter__'): callbacks = [callbacks] - if self.on_message not in callbacks: - callbacks = list(callbacks) + [self.on_message] - self.schedule('LISTEN', callbacks) - self.try_to_loop() - - def on_message(self, _result): - if self.subscribed: - self.schedule('LISTEN', self.current_task.callbacks) - self.try_to_loop() -class CmdLine(object): - def __init__(self, cmd, *args, **kwargs): - self.cmd = cmd - self.args = args - self.kwargs = kwargs + yield self.connection.queue_wait() + while self.subscribed: + data = yield async(self.connection.readline)() + try: + error, response = yield self.process_data(data, CmdLine('LISTEN')) + result = self.format_reply('LISTEN', response) + except Exception, e: + error, result = e, None - def __repr__(self): - return self.cmd + '(' + str(self.args) + ',' + str(self.kwargs) + ')' + self.call_callbacks(callbacks, (error, result) ) class RespLine(object): def _init_(self, cmd, *args, **kwargs): @@ -685,7 +688,7 @@ def __init__(self, command_stack = None, callbacks = None): self.callbacks = callbacks or [] self.responses = [] - def format_request(self): #FIXME transactional `MULTI` & `EXEC` add here maybe + def format_request(self): return ''.join([format(c.cmd, *c.args, **c.kwargs) for c in self.command_stack]) class Pipeline(Client): @@ -694,8 +697,8 @@ def __init__(self, *args, **kwargs): self.pipe_task = None def execute_command(self, cmd, callbacks, *args, **kwargs): - if cmd in ('AUTH', 'SELECT'): - raise Exception('not implntd') + if cmd in ('AUTH'): + raise Exception('403') if not self.pipe_task: self.pipe_task = PipeTask() self.pipe_task.command_stack.append(CmdLine(cmd, *args, **kwargs))