Skip to content

Commit

Permalink
Check for errors in streamed chunks.
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed May 9, 2014
1 parent 284ddff commit 7a0e9b6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 4 additions & 2 deletions vumi/persist/riak_manager.py
Expand Up @@ -173,7 +173,7 @@ def decode_chunked_response(self, headers, body):
from email import message_from_string
msg = message_from_string(fake_email)
if msg.is_multipart():
return self.decode_chunks(msg.get_payload())
return self.decode_chunks(msg.get_payload(), headers, body)

payload = msg.get_payload()
if not payload.strip():
Expand All @@ -185,10 +185,12 @@ def decode_chunked_response(self, headers, body):

return result

def decode_chunks(self, chunks):
def decode_chunks(self, chunks, headers, body):
phase_results = {}
for chunk in chunks:
part = json.loads(chunk.get_payload())
if 'error' in part:
self.raise_mapred_error(headers, body)
phase_results.setdefault(part['phase'], []).extend(part['data'])
# NOTE: We discard all but the last phase received.
return phase_results[max(phase_results.keys())]
6 changes: 4 additions & 2 deletions vumi/persist/txriak_manager.py
Expand Up @@ -206,7 +206,7 @@ def decode_chunked_response(self, headers, body):
from email import message_from_string
msg = message_from_string(fake_email)
if msg.is_multipart():
return self.decode_chunks(msg.get_payload())
return self.decode_chunks(msg.get_payload(), headers, body)

payload = msg.get_payload()
if not payload.strip():
Expand All @@ -218,10 +218,12 @@ def decode_chunked_response(self, headers, body):

return result

def decode_chunks(self, chunks):
def decode_chunks(self, chunks, headers, body):
phase_results = {}
for chunk in chunks:
part = self.decodeJson(chunk.get_payload())
if 'error' in part:
self.raise_mapred_error(headers, body)
phase_results.setdefault(part['phase'], []).extend(part['data'])
# NOTE: We discard all but the last phase received.
return phase_results[max(phase_results.keys())]

0 comments on commit 7a0e9b6

Please sign in to comment.