Skip to content

Commit

Permalink
bang bang bang...
Browse files Browse the repository at this point in the history
  • Loading branch information
vgol authored and kmerenkov committed Aug 17, 2010
1 parent 658bbc0 commit 79638df
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 51 deletions.
80 changes: 45 additions & 35 deletions brukva/client.py
Expand Up @@ -5,17 +5,21 @@
from adisp import async, process

from functools import partial
try:
from collections import namedtuple
except:
from namedtuple import namedtuple
from datetime import datetime
from brukva.exceptions import RedisError, ConnectionError, ResponseError, InvalidResponse

class Message(object):
def __init__(self, kind, channel, body):
self.kind = kind
self.channel = channel
self.body = body

Message = namedtuple('Message', 'kind channel body')
Task = namedtuple('Task', 'command callbacks command_args command_kwargs')

class Task(object):
def __init__(self, command, callbacks, command_args, command_kwargs):
self.command = command
self.callbacks = callbacks
self.command_args = command_args
self.command_kwargs = command_kwargs

def string_keys_to_dict(key_string, callback):
return dict([(key, callback) for key in key_string.split()])
Expand Down Expand Up @@ -71,6 +75,8 @@ def __init__(self, host, port, timeout=None, io_loop=None):

self.in_progress = False

self.read_queue = []

def connect(self):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
Expand Down Expand Up @@ -100,6 +106,10 @@ def read(self, length, callback):
def readline(self, callback):
self._stream.read_until('\r\n', callback)

def try_to_perform_read(self):
if not self.in_progress and self.read_queue:
self.in_progress = True
self._io_loop.add_callback(partial(self.read_queue.pop(0), None) )

class Client(object):
def __init__(self, host='localhost', port=6379, io_loop=None):
Expand Down Expand Up @@ -138,6 +148,8 @@ def __init__(self, host='localhost', port=6379, io_loop=None):

self._pipeline = None



def zset_score_pairs(self, response):
if not response or not 'WITHSCORES' in self.current_task.command_args:
return response
Expand Down Expand Up @@ -652,25 +664,25 @@ def on_message(self, _result):
self.schedule('LISTEN', self.current_task.callbacks)
self.try_to_loop()



class CmdLine(object):
def __init__(self, cmd, *args, **kwargs):
self.cmd = cmd
self.args = args
self.kwargs = kwargs

def __repr__(self):
return self.cmd + '(' + str(self.args) + ',' + str(self.kwargs) + ')'

class RespLine(object):
def _init_(self, cmd, *args, **kwargs):
self.cmd = cmd
self.args = args
self.kwargs = kwargs

class PipeTask(object):
def __init__(self, command_stack = [], callbacks = []):
self.command_stack = command_stack
self.callbacks = callbacks

def __init__(self, command_stack = None, callbacks = None):
self.command_stack = command_stack or []
self.callbacks = callbacks or []
self.responses = []

def format_request(self): #FIXME transactional `MULTI` & `EXEC` add here maybe
Expand All @@ -680,27 +692,31 @@ def format_request(self): #FIXME transactional `MULTI` & `EXEC` add here maybe
class Pipeline(Client):
def __init__(self, *args, **kwargs):
super(Pipeline, self).__init__(*args, **kwargs)
if 'transaction' in kwargs:
self.transaction = kwargs['transaction']
self.reset()

def reset(self):
self.pipe_task = None

def execute_command(self, cmd, callbacks, *args, **kwargs):
if cmd in ('AUTH', 'SELECT'):
raise Exception('not implntd')
if not self.pipe_task:
self.pipe_task = PipeTask()

self.pipe_task.command_stack.append(CmdLine(cmd, *args, **kwargs))

@async
def queue_wait(self, callback):
self.connection.read_queue.append(callback)
self.connection.try_to_perform_read()


def read_done(self):
self.connection.in_progress = False
self.connection.try_to_perform_read()

@process
def execute(self, callbacks):
while self.connection.in_progress:
time.sleep(0.1)
#import ipdb; ipdb.set_trace()
pipe_task = self.pipe_task
self.reset()
self.pipe_task = None

if callbacks is None:
callbacks = []
Expand All @@ -711,36 +727,30 @@ def execute(self, callbacks):
try:
self.connection.write(request)
except IOError:
self.reset()
self.pipe_task = None
self._sudden_disconnect(callbacks)
return

pipe_task.callbacks = callbacks

responses = []
num = 0
total = len(pipe_task.command_stack)
current = None

yield self.queue_wait()

self.connection.in_progress = True
while num < total:
while len(responses) < total:
data = yield async(self.connection.readline)()
if not data:
break

response = yield self.process_data(data)


responses.append(response)
num += 1

self.connection.in_progress = False
self.read_done()

result = []
for i in xrange(len(pipe_task.command_stack)):
res = self.format_reply(pipe_task.command_stack[i].cmd, responses[i])
result.append(res)
result = [
self.format_reply(cmd_line.cmd, resp)
for cmd_line, resp in zip(pipe_task.command_stack, responses)
]

self.call_callbacks(callbacks, result)

Expand Down
47 changes: 31 additions & 16 deletions test_pipe.py
@@ -1,8 +1,27 @@
#! /usr/bin/env python

from functools import partial
import time
import os

import brukva

c = brukva.Client()
c.connect()

c.set('gt', 'er')
def delayed(dt, cmd, *args, **kwargs):
c._io_loop.add_timeout(
time.time()+dt,
partial(cmd, *args, **kwargs)
)

def ac(cmd, *args, **kwargs):
c._io_loop.add_callback(
partial(cmd, *args, **kwargs)
)

# FIXME!
#c.set('gt', 'er')

p = c.pipeline()

Expand All @@ -13,23 +32,19 @@
p.smembers('zar')
p.scard('zar')


stt = time.time()
def on_resp(res):
from pprint import pprint
pprint(repr(res))
pprint(res)
#print "%d(2)" %
print (time.time() - stt)

from functools import partial
import time
ac( p.execute, [on_resp,])

delayed(0.1, p.set, 'aaa', '132')
delayed(0.1, p.set, 'bbb', 'eft')
delayed(0.1, p.mget, ('aaa', 'bbb'))
delayed(0.1, p.execute, [on_resp,])

c._io_loop.add_timeout(
time.time()+0.1,
partial( p.execute, [on_resp,] )
)
c._io_loop.add_timeout(
time.time()+2.0,
c._io_loop.stop
)
#c._io_loop.add_callback(
# partial( p.execute, [on_resp,] )
#)
delayed(0.2, os.sys.exit)
c.connection._stream.io_loop.start()

0 comments on commit 79638df

Please sign in to comment.