Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
need to replace current header dictionary
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Jan 5, 2017
1 parent f607398 commit 1d5ea68
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 64 deletions.
3 changes: 1 addition & 2 deletions pulsar/apps/wsgi/__init__.py
Expand Up @@ -66,7 +66,7 @@ def hello(environ, start_response):
from .middleware import (clean_path_middleware, authorization_middleware,
wait_for_body_middleware, middleware_in_executor)
from .response import AccessControl, GZipMiddleware
from .wrappers import EnvironMixin, WsgiResponse, WsgiRequest, cached_property
from .wrappers import WsgiResponse, WsgiRequest, cached_property
from .server import HttpServerResponse, test_wsgi_environ, AbortWsgi
from .route import route, Route
from .handlers import WsgiHandler, LazyWsgi
Expand Down Expand Up @@ -106,7 +106,6 @@ def hello(environ, start_response):
'GZipMiddleware',
#
# WSGI Wrappers
'EnvironMixin',
'WsgiResponse',
'WsgiRequest',
'cached_property',
Expand Down
3 changes: 1 addition & 2 deletions pulsar/apps/wsgi/formdata.py
Expand Up @@ -157,8 +157,7 @@ def parse(self):
self.buffer = bytearray()

if isinstance(inp, HttpBodyReader):
return ensure_future(self._consume(inp, boundary),
loop=inp.reader._loop)
return inp.reader._loop.create_task(self._consume(inp, boundary))
else:
producer = BytesProducer(inp)
return producer(self._consume, boundary)
Expand Down
5 changes: 2 additions & 3 deletions pulsar/apps/wsgi/server.py
Expand Up @@ -16,7 +16,7 @@
import time
import os
import io
from asyncio import wait_for, ensure_future, sleep
from asyncio import wait_for, sleep
from wsgiref.handlers import format_date_time
from urllib.parse import urlparse, unquote

Expand Down Expand Up @@ -248,8 +248,7 @@ def data_received(self, data):
self.transport,
self.cfg.stream_buffer,
loop=self._loop)
ensure_future(self._response(self.wsgi_environ()),
loop=self._loop)
self._loop.create_task(self._response(self.wsgi_environ()))
body = parser.recv_body()
if body:
self._body_reader.feed_data(body)
Expand Down
72 changes: 20 additions & 52 deletions pulsar/apps/wsgi/wrappers.py
Expand Up @@ -13,14 +13,6 @@
several utility methods for manipulating headers and asynchronous content.
Environ Mixin
=====================
.. autoclass:: EnvironMixin
:members:
:member-order: bysource
.. _app-wsgi-request:
Wsgi Request
Expand Down Expand Up @@ -240,7 +232,7 @@ def is_streamed(self):
This is usually `True` if a generator is passed to the response object.
"""
try:
len(self.content)
len(self._content)
except TypeError:
return True
return False
Expand Down Expand Up @@ -332,27 +324,26 @@ def __getitem__(self, header):
return self.headers[header]


class EnvironMixin:
"""A wrapper around a WSGI_ environ.
Instances of this class have the :attr:`environ` attribute as their
only private data. Every other attribute is stored in the :attr:`environ`
itself at the ``pulsar.cache`` wsgi-extension key.
.. attribute:: environ
WSGI_ environ dictionary
class WsgiRequest:
"""A wsgi request
"""
__slots__ = ('environ',)

def __init__(self, environ, name=None):
def __init__(self, environ, app_handler=None, urlargs=None):
self.environ = environ
if pulsar_cache not in environ:
environ[pulsar_cache] = AttributeDictionary()
self.cache.mixins = {}
self.cache.logger = LOGGER
if name:
self.cache.mixins[name] = self
self.cache.cfg = environ.get('pulsar.cfg', {})
if app_handler:
self.cache.app_handler = app_handler
self.cache.urlargs = urlargs

def __repr__(self):
return self.path

def __str__(self):
return self.__repr__()

@property
def cache(self):
Expand All @@ -376,33 +367,6 @@ def _loop(self):
if c:
return c._loop

def __getattr__(self, name):
mixin = self.cache.mixins.get(name)
if mixin is None:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
return mixin

def get(self, key, default=None):
"""Shortcut to the :attr:`environ` get method."""
return self.environ.get(key, default)


class WsgiRequest(EnvironMixin):
"""An :class:`EnvironMixin` for wsgi requests."""
def __init__(self, environ, app_handler=None, urlargs=None):
super().__init__(environ)
self.cache.cfg = environ.get('pulsar.cfg', {})
if app_handler:
self.cache.app_handler = app_handler
self.cache.urlargs = urlargs

def __repr__(self):
return self.path

def __str__(self):
return self.__repr__()

@cached_property
def first_line(self):
env = self.environ
Expand Down Expand Up @@ -514,7 +478,7 @@ def is_secure(self):

@property
def path(self):
"""Shortcut to the :attr:`~EnvironMixin.environ` ``PATH_INFO`` value.
"""Shortcut to the :attr:`~.environ` ``PATH_INFO`` value.
"""
return self.environ.get('PATH_INFO', '/')

Expand All @@ -539,6 +503,10 @@ def content_type_options(self):
else:
return None, {}

def get(self, key, default=None):
"""Shortcut to the :attr:`environ` get method."""
return self.environ.get(key, default)

def data_and_files(self, data=True, files=True, stream=None):
"""Retrieve body data.
Expand Down Expand Up @@ -588,7 +556,7 @@ def _data_and_files(self, data=True, files=True, stream=None, future=None):
@cached_property
def url_data(self):
"""A (cached) dictionary containing data from the ``QUERY_STRING``
in :attr:`~.EnvironMixin.environ`.
in :attr:`~.environ`.
"""
return query_dict(self.environ.get('QUERY_STRING', ''),
encoding=self.encoding)
Expand Down
1 change: 1 addition & 0 deletions pulsar/async/access.py
Expand Up @@ -88,6 +88,7 @@ def loop_factory():
try: # add uvloop if available
import uvloop
EVENT_LOOPS['uv'] = uvloop.Loop
# Future = uvloop.Future
except Exception: # pragma nocover
pass

Expand Down
4 changes: 2 additions & 2 deletions pulsar/async/mailbox.py
Expand Up @@ -59,7 +59,7 @@ async def example():
from pulsar.utils.websocket import frame_parser
from pulsar.utils.string import gen_unique_id

from .access import get_actor, isawaitable, create_future, ensure_future
from .access import get_actor, isawaitable, create_future
from .futures import task
from .proxy import actor_identity, get_proxy, get_command, ActorProxy
from .protocols import Protocol
Expand Down Expand Up @@ -171,7 +171,7 @@ def data_received(self, data):
message = pickle.loads(msg.body)
except Exception as e:
raise ProtocolError('Could not decode message body: %s' % e)
ensure_future(self._on_message(message), loop=self._loop)
self._loop.create_task(self._on_message(message))
msg = self._parser.decode()

########################################################################
Expand Down
6 changes: 3 additions & 3 deletions pulsar/async/protocols.py
Expand Up @@ -3,7 +3,7 @@

from pulsar.utils.internet import nice_address, format_address

from .futures import task, Future, ensure_future
from .futures import task, Future
from .events import EventHandler, AbortEvent
from .mixins import FlowControl, Timeout

Expand Down Expand Up @@ -165,7 +165,7 @@ def start(self, request=None):
conn._producer._requests_processed = p + 1
self.bind_event('post_request', self._finished)
self._request = request
return ensure_future(self._start(), loop=self._loop)
return self._loop.create_task(self._start())

def abort_request(self):
"""Abort the request.
Expand Down Expand Up @@ -350,7 +350,7 @@ def close(self):
self._transport.close()
except Exception:
pass
self._closed = ensure_future(self._close())
self._closed = self._loop.create_task(self._close())
return self.event('connection_lost')

def abort(self):
Expand Down

0 comments on commit 1d5ea68

Please sign in to comment.