Skip to content
This repository has been archived by the owner on Jan 21, 2021. It is now read-only.

Commit

Permalink
Fixed: Applications or middleware that didn't call the WSGI
Browse files Browse the repository at this point in the history
start_response function before returning an iterator weren't handled
properly.
  • Loading branch information
Jim Fulton committed Oct 27, 2014
1 parent 21cf1d8 commit 3a2b3b0
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
4 changes: 4 additions & 0 deletions src/zc/resumelb/README.txt
Expand Up @@ -242,6 +242,10 @@ maintaining ZooKeeper trees.
Change History
==============

- Fixed: Applications or middleware that didn't call the WSGI
start_response function before returning an iterator weren't handled
properly.

0.7.3 (2014-06-04)
------------------

Expand Down
38 changes: 37 additions & 1 deletion src/zc/resumelb/tests.py
Expand Up @@ -25,6 +25,7 @@
import os
import pprint
import re
import sys
import time
import traceback
import unittest
Expand Down Expand Up @@ -89,6 +90,16 @@ def gsleep(dur=0):
def app():
return bobo.Application(bobo_resources=__name__)

def generator_app(env, start):
start('200 OK', [('Content-type', 'text/plain'), ('Content-length', '2')])
yield 'h'
yield 'i'

def generator_app0(env, start):
start('200 OK', [('Content-type', 'text/plain'), ('Content-length', '0')])
if False:
yield 'hi'

#
###############################################################################

Expand Down Expand Up @@ -122,7 +133,7 @@ def print_response(worker_socket, rno, size_only=False):
if size_only:
size += len(data)
else:
print data,
sys.stdout.write(data)
else:
break
if size_only:
Expand Down Expand Up @@ -401,6 +412,31 @@ def dont_reset_worker_backlogs_if_they_already_have_them():
9
"""

def generator_apps():
r"""PEP 333 allows WSGI apps to be implemented as generators.
This means that start_response may not becalled until after an app
has returned it's iterator (but before the first value is yielded.
Let's make sure workers handle this correctly.
>>> worker = zc.resumelb.worker.Worker(
... zc.resumelb.tests.generator_app, ('127.0.0.1', 0))
>>> worker_socket = gevent.socket.create_connection(worker.addr)
>>> from zc.resumelb.util import read_message, write_message
>>> read_message(worker_socket)
(0, {})
>>> env = newenv('', '/')
>>> write_message(worker_socket, 1, env, '')
>>> print_response(worker_socket, 1)
1 200 OK
Content-length: 2
Content-type: text/plain
<BLANKLINE>
hi
"""

def test_classifier(env):
return "yup, it's a test"

Expand Down
29 changes: 22 additions & 7 deletions src/zc/resumelb/worker.py
Expand Up @@ -68,14 +68,25 @@ def start_response(status, headers, exc_info=None):
response[0] = (status, headers)

try:
body = app(env, start_response)
body = iter(app(env, start_response))
try:
first = body.next()
except StopIteration:
first = ''
if hasattr(body, 'close'):
body.close()
body = ()
except Exception:
error("Uncaught application exception for %s" % trno)
import webob
body = webob.Response(
'A system error occurred', status=500)(env, start_response)
body = iter(
webob.Response(
'A system error occurred', status=500
)(env, start_response)
)
first = body.next()

return response[0], body
return response[0], first, body

if tracelog:
info = logging.getLogger(tracelog).info
Expand All @@ -101,7 +112,7 @@ def log(self, msg=None, code='-'):
def call_app_w_tracelog(trno, env):
log(trno, 'C')
env[tracelog_key] = ApplicationTraceLog(trno)
response, body = call_app(trno, env)
response, first, body = call_app(trno, env)
content_length = [v for (h, v) in response[1]
if h.lower() == 'content-length']
content_length = content_length[-1] if content_length else '?'
Expand All @@ -114,7 +125,7 @@ def body_iter():
if hasattr(body, 'close'):
body.close()
log(trno, 'E')
return response, body_iter()
return response, first, body_iter()

if threads:
def call_app_w_threads(trno, env):
Expand Down Expand Up @@ -232,12 +243,16 @@ def handle(self, conn, rno, get, env):
if tracelog:
tracelog(trno, 'I', env.get('CONTENT_LENGTH') or '0')

response, body = self.call_app(trno, env)
response, first, body = self.call_app(trno, env)
try:
requests = conn.readers
if rno not in requests:
return # cancelled
conn.put((rno, response))
if rno not in requests:
return # cancelled
if first:
conn.put((rno, first))
for data in body:
if rno not in requests:
return # cancelled
Expand Down

0 comments on commit 3a2b3b0

Please sign in to comment.