Skip to content


connection class completely refactored. encoding and command packing …
Browse files Browse the repository at this point in the history
…moved from client to connection. introduced concept of protocol parsers and implemented both a PythonParse and a hiredis parser. the parser class can be overridden in the __init__ of the connection if desired.
  • Loading branch information
andymccurdy committed May 12, 2011
1 parent 43aa231 commit 4d6320a
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 224 deletions.
91 changes: 33 additions & 58 deletions redis/
Expand Up @@ -2,7 +2,7 @@
import threading
import time
import warnings
from itertools import chain, imap, islice, izip
from itertools import chain, imap, islice, izip, starmap
from redis.connection import ConnectionPool, Connection
from redis.exceptions import (
Expand Down Expand Up @@ -227,49 +227,31 @@ def lock(self, name, timeout=None, sleep=0.1):
return Lock(self, name, timeout=timeout, sleep=sleep)

def _execute_command(self, command_name, command, **options):
def execute_command(self, *args, **options):
command_name = options.get('command_name', args[0])
subscription_command = command_name in self.SUBSCRIPTION_COMMANDS
if self.subscribed and not subscription_command:
raise PubSubError("Cannot issue commands other than SUBSCRIBE and "
"UNSUBSCRIBE while channels are open")
self.connection.send(command, self)
if subscription_command:
return None
return self.parse_response(command_name, **options)
except ConnectionError:
self.connection.send(command, self)
if subscription_command:
return None
return self.parse_response(command_name, **options)

def execute_command(self, *args, **options):
"Sends the command to the redis server and returns it's response"
cmds = ['$%s\r\n%s\r\n' % (len(enc_value), enc_value)
for enc_value in imap(self.encode, args)]
return self._execute_command(
'*%s\r\n%s' % (len(cmds), ''.join(cmds)),

def parse_response(self, command_name, catch_errors=False, **options):
def parse_response(self, command_name, **options):
"Parses a response from the Redis server"
response = self.connection.read_response(command_name, catch_errors)
response = self.connection.read_response()
if command_name in self.RESPONSE_CALLBACKS:
return self.RESPONSE_CALLBACKS[command_name](response, **options)
return response

def encode(self, value):
"Encode ``value`` using the instance's charset"
if isinstance(value, str):
return value
if isinstance(value, unicode):
return value.encode(self.encoding, self.errors)
# not a string or unicode, attempt to convert to a string
return str(value)

def get_connection(self, host, port, db, password, socket_timeout):
"Returns a connection object"
Expand Down Expand Up @@ -1274,8 +1256,10 @@ def __init__(self, connection, transaction, charset, errors):

def reset(self):
self.command_stack = []
if self.transaction:

def _execute_command(self, command_name, command, **options):
def execute_command(self, *args, **options):
Stage a command to be executed when execute() is next called
Expand All @@ -1287,66 +1271,57 @@ def _execute_command(self, command_name, command, **options):
At some other point, you can then run: pipe.execute(),
which will execute all commands queued in the pipe.
# if the command_name is 'AUTH' or 'SELECT', then this command
# must have originated after a socket connection and a call to
# _setup_connection(). run these commands immediately without
# buffering them.
if command_name in ('AUTH', 'SELECT'):
return super(Pipeline, self)._execute_command(
command_name, command, **options)
self.command_stack.append((command_name, command, options))
self.command_stack.append((args, options))
return self

def _execute_transaction(self, commands):
# wrap the commands in MULTI ... EXEC statements to indicate an
# atomic operation
all_cmds = ''.join([c for _1, c, _2 in chain(
(('', 'MULTI\r\n', ''),),
(('', 'EXEC\r\n', ''),)
self.connection.send(all_cmds, self)
all_cmds = ''.join(starmap(self.connection.pack_command,
[args for args, options in commands]))
# we don't care about the multi/exec any longer
commands = commands[1:-1]
# parse off the response for MULTI and all commands prior to EXEC
# the only data we care about is the response the EXEC, the last command
for i in range(len(commands)+1):
_ = self.parse_response('_')
# parse the EXEC. we want errors returned as items in the response
response = self.parse_response('_', catch_errors=True)
# parse the EXEC.
response = self.parse_response('_')

if response is None:
raise WatchError("Watched variable changed.")

if len(response) != len(commands):
raise ResponseError("Wrong number of response items from "
"pipeline execution")
# Run any callbacks for the commands run in the pipeline
# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
if not isinstance(r, Exception):
if cmd[0] in self.RESPONSE_CALLBACKS:
r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[2])
args, options = cmd
command_name = options.get('command_name', args[0])
if command_name in self.RESPONSE_CALLBACKS:
r = self.RESPONSE_CALLBACKS[command_name](r, **options)
return data

def _execute_pipeline(self, commands):
# build up all commands into a single request to increase network perf
all_cmds = ''.join([c for _1, c, _2 in commands])
self.connection.send(all_cmds, self)
data = []
for command_name, _, options in commands:
self.parse_response(command_name, catch_errors=True, **options)
return data
all_cmds = ''.join(starmap(self.connection.pack_command,
[args for args, options in commands]))
return [self.parse_response(
options.get('command_name', args[0]), **options)
for args, options in commands]

def execute(self):
"Execute all the commands in the current pipeline"
stack = self.command_stack
if self.transaction:
execute = self._execute_transaction
execute = self._execute_pipeline
stack = self.command_stack
return execute(stack)
except ConnectionError:
Expand Down

0 comments on commit 4d6320a

Please sign in to comment.