Permalink
Browse files

fixed the redis protocol for big data

  • Loading branch information...
1 parent bf4e13c commit ea52e2d2826ccdec81717a990a03c3ee85ad1db2 @tarekziade tarekziade committed Dec 10, 2012
Showing with 38 additions and 14 deletions.
  1. +38 −14 vaurien/protocols/redis.py
@@ -17,38 +17,62 @@ class Redis(BaseProtocol):
"""
name = 'redis'
+ def _find(self, source, buffer, char, dest):
+ pos = buffer.find(char)
+ while pos == -1:
+ data = self._get_data(source)
+ if data == '':
+ return -1
+ dest.sendall(data)
+ buffer += data
+ pos = buffer.find(char)
+ return pos, buffer
+
def _handle(self, source, dest, to_backend):
""" see http://redis.io/topics/protocol
"""
- # Getting the request.
- buffer = self._get_data(source)
- if not buffer:
- self._abort_handling(to_backend, dest)
- return False
+ # grabbing data
+ bytepos, buffer = self._find(source, '', CRLF, dest)
+ num_args = int(buffer[1:bytepos])
+
+ for arg in range(num_args):
+ # next CRLF
+ buffer = buffer[bytepos + len(CRLF):]
+ bytepos, buffer = self._find(source, buffer, CRLF, dest)
+
+ # reading the number of bytes
+ num_bytes = int(buffer[1:bytepos])
+ data_start = bytepos + len(CRLF)
+ data_end = data_start + num_bytes
- # Sending the request to the backend.
- dest.sendall(buffer)
+ # reading the data (next CRLF)
+ buffer = buffer[data_start:]
+ __, buffer = self._find(source, buffer, CRLF, dest)
+ data = buffer[:num_bytes]
+ bytepos = num_bytes
# Getting the answer back and sending it over.
- buffer_size = self.option('buffer')
- buffer = dest.recv(buffer_size)
+ buffer = self._get_data(dest)
source.sendall(buffer)
- already_read = len(buffer)
if buffer[0] in ('+', '-', ':'):
# simple reply, we're good
return False # disconnect mode ?
+ buffer_size = self.option('buffer')
+ bytepos, buffer = self._find(dest, buffer, CRLF, source)
+
if buffer[0] == '$':
# bulk reply
- size = int(buffer[1:buffer.find(CRLF)])
- left_to_read = size - already_read
+ size = int(buffer[1:bytepos])
+ left_to_read = (size - len(buffer) + len(buffer[:bytepos]) +
+ len(CRLF) * 2)
if left_to_read > 0:
for chunk in chunked(left_to_read, buffer_size):
- data = source.recv(chunk)
+ data = self._get_data(dest, chunk)
buffer += data
- dest.sendall(data)
+ source.sendall(data)
return False # disconnect mode ?

0 comments on commit ea52e2d

Please sign in to comment.