Skip to content
This repository has been archived by the owner on Nov 23, 2020. It is now read-only.

Commit

Permalink
trollius debug mode and threadsafe fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
lsbardel committed Jul 25, 2014
1 parent dec3fcc commit 341d490
Show file tree
Hide file tree
Showing 55 changed files with 778 additions and 634 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.rst
@@ -1,5 +1,6 @@
Ver. 0.8.5 - Development
Ver. 0.9.0 - Development
===========================
* Works with ``trollius`` 1.0
* Added :class:`.FlowControl` and :class:`.Timeout` protocol mixins,
:class:`.PulsarProtocol` now inherits from :class:`.FlowControl`
* Better handling of streaming responses by the wsgi server
Expand Down Expand Up @@ -30,7 +31,6 @@ Ver. 0.8.5 - Development
* Better :func:`.slugify` function
* :class:`.EventHandler` requires a valid :ref:`event loop <asyncio-event-loop>`
during initialisation
* Works with ``trollius`` 1.0

Ver. 0.8.4 - 2014-Jul-07
===========================
Expand Down
3 changes: 1 addition & 2 deletions MANIFEST.in
Expand Up @@ -3,8 +3,7 @@ include LICENSE
include README.rst
include docs/make.bat
include docs/Makefile
recursive-include pulsar *.json
recursive-include examples *.py *.js *.css *.html *.json
recursive-include examples *.py *.js *.css *.html
recursive-include pulsar/apps/test/plugins/htmlfiles *
recursive-include docs/source *
recursive-include extensions *.py *.pxd *.pyx *.h
Binary file modified examples/httpbin/assets/favicon.ico
Binary file not shown.
9 changes: 5 additions & 4 deletions examples/httpbin/manage.py
Expand Up @@ -85,6 +85,7 @@ def get(self, request):
html = request.html_document
html.head.title = title
html.head.links.append('httpbin.css')
html.head.links.append('favicon.ico', rel="icon", type='image/x-icon')
html.head.scripts.append('jquery')
html.head.scripts.append('httpbin.js')
ul = ul.render(request)
Expand Down Expand Up @@ -132,7 +133,7 @@ def getsize(self, request):

@route(title='Returns gzip encoded data')
def gzip(self, request):
response = yield self.info_data_response(request, gzipped=True)
response = self.info_data_response(request, gzipped=True)
coroutine_return(GZipMiddleware(10)(request.environ, response))

@route(title='Returns cookie data')
Expand Down Expand Up @@ -275,7 +276,7 @@ def info_data(self, request, **params):
if request.method in ENCODE_URL_METHODS:
data['args'] = dict(request.url_data)
else:
args, files = yield request.data_and_files()
args, files = request.data_and_files()
jfiles = MultiValueDict()
for name, parts in files.lists():
for part in parts:
Expand Down Expand Up @@ -310,9 +311,9 @@ class Site(wsgi.LazyWsgi):

def setup(self, environ):
router = HttpBin('/')
return wsgi.WsgiHandler([wsgi.clean_path_middleware,
return wsgi.WsgiHandler([wsgi.wait_for_body_middleware,
wsgi.clean_path_middleware,
wsgi.authorization_middleware,
wsgi.FileRouter('/favicon.ico', FAVICON),
wsgi.MediaRouter('media', ASSET_DIR,
show_indexes=True),
ws.WebSocket('/graph-data', Graph()),
Expand Down
1 change: 1 addition & 0 deletions examples/websocket/manage.py
Expand Up @@ -26,6 +26,7 @@ def on_open(self, websocket):
self.on_message(websocket, '')

def on_message(self, websocket, msg):
websocket.logger.debug('writing random data')
websocket.write(json.dumps([(i, random()) for i in range(100)]))


Expand Down
25 changes: 25 additions & 0 deletions examples/websocket/tests.py
Expand Up @@ -50,6 +50,15 @@ def tearDownClass(cls):
if cls.app_cfg is not None:
yield send('arbiter', 'kill_actor', cls.app_cfg.name)

def test_graph(self):
c = HttpClient()
handler = Echo(loop=self._loop)
ws = yield c.get(self.ws_uri, websocket_handler=handler)
self.assertEqual(ws.event('post_request').fired(), 0)
message = yield handler.get()
self.assertTrue(message)

class d:
def testHyBiKey(self):
w = WebSocket('/', None)
v = w.challenge_response('dGhlIHNhbXBsZSBub25jZQ==')
Expand Down Expand Up @@ -136,6 +145,22 @@ def test_close_sync(self):
self.assertEqual(ws.close_reason[0], 1001)
self.assertTrue(ws._connection.closed)

def test_home(self):
c = HttpClient()
response = yield c.get(self.uri)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.headers['content-type'],
'text/html; charset=utf-8')

def test_graph(self):
c = HttpClient()
handler = Echo(loop=self._loop)
fut = handler.get()
ws = yield c.get(self.ws_uri, websocket_handler=handler)
self.assertEqual(ws.event('post_request').fired(), 0)
message = yield fut
self.assertTrue(message)


@dont_run_with_thread
class TestWebSocketProcess(TestWebSocketThread):
Expand Down
18 changes: 9 additions & 9 deletions pulsar/apps/__init__.py
Expand Up @@ -65,7 +65,7 @@ def myhook(monitor):
from collections import namedtuple

import pulsar
from pulsar import (get_actor, coroutine_return, Config,
from pulsar import (get_actor, coroutine_return, Config, From,
multi_async, Future, ImproperlyConfigured)
from pulsar.utils.structures import OrderedDict

Expand Down Expand Up @@ -93,14 +93,14 @@ def get_application(name):


def _get_remote_app(actor, name):
cfg = yield actor.send('arbiter', 'run', _get_app, name)
cfg = yield From(actor.send('arbiter', 'run', _get_app, name))
coroutine_return(cfg.app() if cfg else None)


def _get_app(arbiter, name, safe=True):
monitor = arbiter.get_actor(name)
if monitor:
cfg = yield monitor.start_event
cfg = yield From(monitor.start_event)
if safe:
coroutine_return(cfg)
else:
Expand All @@ -118,23 +118,23 @@ def monitor_start(self, exc=None):
self.bind_event('on_info', monitor_info)
self.bind_event('stopping', monitor_stopping)
for callback in when_monitor_start:
yield callback(self)
yield From(callback(self))
self.monitor_task = lambda: app.monitor_task(self)
yield app.monitor_start(self)
yield From(app.monitor_start(self))
if not self.cfg.workers:
yield app.worker_start(self)
yield From(app.worker_start(self))
result = self.cfg
except Exception as exc:
yield self.stop(exc)
yield From(self.stop(exc))
start_event.set_result(None)
else:
start_event.set_result(result)


def monitor_stopping(self, exc=None):
if not self.cfg.workers:
yield self.app.worker_stopping(self)
yield self.app.monitor_stopping(self)
yield From(self.app.worker_stopping(self))
yield From(self.app.monitor_stopping(self))
coroutine_return(self)


Expand Down
4 changes: 2 additions & 2 deletions pulsar/apps/data/odm/manager.py
@@ -1,4 +1,4 @@
from pulsar import Event, wait_complete, chain_future, add_callback, task
from pulsar import Event, chain_future, add_callback, task

from .query import AbstractQuery, Query, QueryError, ModelNotFound
from ..store import Command
Expand Down Expand Up @@ -131,7 +131,7 @@ def query(self):
'''
return self.query_class(self)

@wait_complete
@task
def get(self, *args, **kw):
'''Get a single model
'''
Expand Down
8 changes: 4 additions & 4 deletions pulsar/apps/data/odm/mapper.py
@@ -1,6 +1,6 @@
from inspect import ismodule

from pulsar import EventHandler, multi_async, wait_complete
from pulsar import EventHandler, multi_async, task
from pulsar.utils.pep import native_str
from pulsar.utils.importer import import_module

Expand Down Expand Up @@ -254,11 +254,11 @@ def register_applications(self, applications, models=None, stores=None):
return list(self._register_applications(applications, models,
stores))

@wait_complete
@task
def search(self, *kw):
raise NotImplementedError

@wait_complete
@task
def create_tables(self, remove_existing=False):
'''Loop though :attr:`registered_models` and issue the
:meth:`.Manager.create_table` method.'''
Expand All @@ -267,7 +267,7 @@ def create_tables(self, remove_existing=False):
executed.append(manager.create_table(remove_existing))
return multi_async(executed, loop=self._loop)

@wait_complete
@task
def drop_tables(self):
'''Loop though :attr:`registered_models` and issue the
:meth:`.Manager.drop_table` method.'''
Expand Down
8 changes: 4 additions & 4 deletions pulsar/apps/data/odm/query.py
@@ -1,6 +1,6 @@
from collections import namedtuple

from pulsar import wait_complete
from pulsar import task
from pulsar.utils.pep import to_string, iteritems


Expand Down Expand Up @@ -214,7 +214,7 @@ def load_related(self, related, *fields):
follows the foreign-key relationship ``related``'''
return self

@wait_complete
@task
def count(self):
'''Count the number of objects selected by this :class:`Query`.
Expand All @@ -223,13 +223,13 @@ def count(self):
matched elements.'''
return self.compiled().count()

@wait_complete
@task
def all(self):
'''All objects selected by this :class:`Query`.
'''
return self.compiled().all()

@wait_complete
@task
def delete(self):
'''Delete all objects selected by this :class:`.Query`.
'''
Expand Down
26 changes: 15 additions & 11 deletions pulsar/apps/data/odm/transaction.py
@@ -1,4 +1,5 @@
from pulsar import EventHandler, InvalidOperation, chain_future, multi_async
from pulsar import (EventHandler, InvalidOperation, chain_future, multi_async,
in_loop)
from pulsar.utils.pep import iteritems

from .model import Model
Expand Down Expand Up @@ -137,10 +138,7 @@ def commit(self):
of transaction
'''
if self._executed is None:
executed = dict(((store, store.execute_transaction(commands)) for
store, commands in iteritems(self._commands)))
self._executed = multi_async(executed, loop=self._loop)
return self._executed
self._executed = self._commit()
else:
raise InvalidOperation('Transaction already executed.')

Expand All @@ -152,9 +150,15 @@ def wait(self, callback=None):
transaction.
:return: a :class:`~asyncio.Future`
'''
if self._executed is None:
self.commit()
if callback:
return chain_future(self._executed, callback)
else:
return self._executed
executed = self._executed
if executed is None:
executed = self.commit()
return chain_future(executed, callback) if callback else executed

# INTERNAL COMMIT METHOD
@in_loop
def _commit(self):
# Run this method in the event loop so that it is thread safe
executed = dict(((store, store.execute_transaction(commands)) for
store, commands in iteritems(self._commands)))
return multi_async(executed, loop=self._loop)
8 changes: 4 additions & 4 deletions pulsar/apps/data/stores/couchdb/store.py
Expand Up @@ -33,7 +33,7 @@
'''
from base64 import b64encode, b64decode

from pulsar import asyncio, coroutine_return, wait_complete, multi_async
from pulsar import asyncio, coroutine_return, task, multi_async
from pulsar.utils.system import json
from pulsar.apps.data import Store, Command, register_store
from pulsar.utils.pep import zip
Expand Down Expand Up @@ -90,7 +90,7 @@ def design_create(self, name, views, language=None, **kwargs):
views=views,
language=language or 'javascript', **kwargs)

@wait_complete
@task
def design_delete(self, name):
'''Delete an existing design document at ``name``.
'''
Expand Down Expand Up @@ -171,7 +171,7 @@ def execute_transaction(self, transaction):
raise errors[0]
coroutine_return(models)

@wait_complete
@task
def get_model(self, manager, pkvalue):
try:
data = yield self.request('get', self._database, pkvalue)
Expand Down Expand Up @@ -218,7 +218,7 @@ def build_model(self, manager, *args, **kwargs):
return super(CouchDBStore, self).build_model(manager, *args, **kwargs)

# INTERNALS
@wait_complete
@task
def request(self, method, *bits, **kwargs):
'''Execute the HTTP request'''
if self._password:
Expand Down
7 changes: 5 additions & 2 deletions pulsar/apps/data/stores/pulsards/startds.py
@@ -1,16 +1,17 @@
import pulsar
from pulsar import asyncio
from pulsar.async import asyncio

if pulsar.appengine:
def start_store(url, workers=0, **kw):
raise RuntimeError('Cannot start datastore in google appengine')

else:
from pulsar import (when_monitor_start, coroutine_return, get_application,
send)
task, send)
from pulsar.apps.data import create_store
from pulsar.apps.ds import PulsarDS

@task
def start_pulsar_ds(arbiter, host, workers=0):
lock = getattr(arbiter, 'lock', None)
if lock is None:
Expand All @@ -27,6 +28,7 @@ def start_pulsar_ds(arbiter, host, workers=0):
finally:
lock.release()

@task
def start_store(url, workers=0, **kw):
'''Equivalent to :func:`.create_store` for most cases excepts when the
``url`` is for a pulsar store not yet started.
Expand Down Expand Up @@ -55,6 +57,7 @@ def localhost(host):
else:
return host

@task
def _start_store(monitor):
app = monitor.app
if not isinstance(app, PulsarDS):
Expand Down
4 changes: 2 additions & 2 deletions pulsar/apps/data/stores/redis/pubsub.py
@@ -1,6 +1,6 @@
from functools import partial

from pulsar import task, in_loop, Protocol
from pulsar import task, in_loop, Protocol, From
from pulsar.apps import data


Expand Down Expand Up @@ -86,5 +86,5 @@ def _subscribe(self, *args):
if not self._connection:
protocol_factory = partial(PubsubProtocol, self,
producer=self.store)
self._connection = yield self.store.connect(protocol_factory)
self._connection = yield From(self.store.connect(protocol_factory))
self._connection.execute(*args)

0 comments on commit 341d490

Please sign in to comment.