Skip to content

Commit

Permalink
Merge pull request #200 from davidszotten/web_redux
Browse files Browse the repository at this point in the history
Web redux
  • Loading branch information
davidszotten committed Feb 16, 2015
2 parents 3ba2868 + 2c6a401 commit cae5663
Show file tree
Hide file tree
Showing 19 changed files with 1,047 additions and 41 deletions.
2 changes: 2 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
-r requirements.txt
-r contrib_requirements.txt

websocket-client==0.23.0
requests==2.5.0
coverage==4.0a1
flake8==2.1.0
mccabe==0.3
Expand Down
4 changes: 3 additions & 1 deletion nameko/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,19 @@ def shutdown(signum, frame):
continue
raise
except KeyboardInterrupt:
print # looks nicer with the ^C e.g. bash prints in the terminal
try:
service_runner.stop()
except KeyboardInterrupt:
print # as above
service_runner.kill()
else:
# runner.wait completed
break


def main(args):
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO, format='%(message)s')

if '.' not in sys.path:
sys.path.insert(0, '.')
Expand Down
12 changes: 9 additions & 3 deletions nameko/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ def stop(self):
self._kill_protected_threads()

self.started = False
self._died.send(None)

# if `kill` is called after `stop`, they race to send this
if not self._died.ready():
self._died.send(None)

def kill(self, exc_info=None):
""" Kill the container in a semi-graceful way.
Expand Down Expand Up @@ -282,7 +285,10 @@ def safely_kill_extensions(ext_set):
self._kill_protected_threads()

self.started = False
self._died.send(None, exc_info)

# if `kill` is called after `stop`, they race to send this
if not self._died.ready():
self._died.send(None, exc_info)

def wait(self):
""" Block until the container has been stopped.
Expand Down Expand Up @@ -451,7 +457,7 @@ def _handle_thread_exited(self, gt):
# we don't care much about threads killed by the container
# this can happen in stop() and kill() if extensions
# don't properly take care of their threads
_log.warning('%s thread killed by container', self)
_log.debug('%s thread killed by container', self)

except Exception:
_log.error('%s thread exited with error', self, exc_info=True)
Expand Down
14 changes: 11 additions & 3 deletions nameko/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,22 @@ def deserialize_to_instance(exc_type):
return exc_type


class BadRequest(Exception):
pass


@deserialize_to_instance
class MalformedRequest(Exception):
class MalformedRequest(BadRequest):
pass


@deserialize_to_instance
class MethodNotFound(Exception):
class MethodNotFound(BadRequest):
pass


@deserialize_to_instance
class IncorrectSignature(Exception):
class IncorrectSignature(BadRequest):
pass


Expand All @@ -156,3 +160,7 @@ def __str__(self):

class CommandError(Exception):
"""Raise from subcommands to report error back to the user"""


class ConnectionNotFound(BadRequest):
"""Unknown websocket connection id"""
13 changes: 12 additions & 1 deletion nameko/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from functools import partial
import inspect
from logging import getLogger
import types
import weakref

from eventlet.event import Event

from logging import getLogger
from nameko.exceptions import IncorrectSignature

_log = getLogger(__name__)


Expand Down Expand Up @@ -265,6 +267,15 @@ def bind(self, container, method_name):
instance.method_name = method_name
return instance

def check_signature(self, args, kwargs):
service_cls = self.container.service_cls
fn = getattr(service_cls, self.method_name)
try:
service_instance = None # fn is unbound
inspect.getcallargs(fn, service_instance, *args, **kwargs)
except TypeError as exc:
raise IncorrectSignature(str(exc))

@classmethod
def decorator(cls, *args, **kwargs):

Expand Down
25 changes: 7 additions & 18 deletions nameko/rpc.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
from __future__ import absolute_import
from functools import partial
import inspect

import json
from logging import getLogger
import sys
import uuid
from functools import partial
from logging import getLogger

from eventlet.event import Event
from eventlet.queue import Empty
from kombu import Connection, Exchange, Queue
from kombu.pools import producers

from nameko.constants import DEFAULT_RETRY_POLICY, AMQP_URI_CONFIG_KEY
from nameko.constants import AMQP_URI_CONFIG_KEY, DEFAULT_RETRY_POLICY
from nameko.exceptions import (
MethodNotFound, UnknownService, UnserializableValueError,
MalformedRequest, RpcConnectionError, serialize, deserialize,
IncorrectSignature, ContainerBeingKilled)
ContainerBeingKilled, deserialize, MalformedRequest, MethodNotFound,
RpcConnectionError, serialize, UnknownService, UnserializableValueError)
from nameko.extensions import (
DependencyProvider, Entrypoint, ProviderCollector, SharedExtension)
from nameko.messaging import QueueConsumer, HeaderEncoder, HeaderDecoder
from nameko.messaging import HeaderDecoder, HeaderEncoder, QueueConsumer
from nameko.utils import repr_safe_str


_log = getLogger(__name__)


Expand Down Expand Up @@ -146,15 +144,6 @@ def setup(self):
def stop(self):
self.rpc_consumer.unregister_provider(self)

def check_signature(self, args, kwargs):
service_cls = self.container.service_cls
fn = getattr(service_cls, self.method_name)
try:
service_instance = None # fn is unbound
inspect.getcallargs(fn, service_instance, *args, **kwargs)
except TypeError as exc:
raise IncorrectSignature(str(exc))

def handle_message(self, body, message):
try:
args = body['args']
Expand Down
15 changes: 9 additions & 6 deletions nameko/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,34 @@ class provided in the __init__ method.
All containers are started concurently and the method will block
until all have completed their startup routine.
"""
_log.info('starting services: %s', self.service_names)
service_names = ', '.join(self.service_names)
_log.info('starting services: %s', service_names)

SpawningProxy(self.containers).start()

_log.info('services started: %s', self.service_names)
_log.debug('services started: %s', service_names)

def stop(self):
""" Stop all running containers concurrently.
The method blocks until all containers have stopped.
"""
_log.info('stopping services: %s', self.service_names)
service_names = ', '.join(self.service_names)
_log.info('stopping services: %s', service_names)

SpawningProxy(self.containers).stop()

_log.info('services stopped: %s', self.service_names)
_log.debug('services stopped: %s', service_names)

def kill(self):
""" Kill all running containers concurrently.
The method will block until all containers have stopped.
"""
_log.info('killing services: %s', self.service_names)
service_names = ', '.join(self.service_names)
_log.info('killing services: %s', service_names)

SpawningProxy(self.containers).kill()

_log.info('services killed: %s ', self.service_names)
_log.debug('services killed: %s ', service_names)

def wait(self):
""" Wait for all running containers to stop.
Expand Down
78 changes: 78 additions & 0 deletions nameko/testing/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from __future__ import absolute_import

import uuid
import json

from collections import defaultdict

from eventlet.event import Event
from eventlet.queue import Queue

from nameko.exceptions import deserialize


def make_virtual_socket(host, port, path='/ws'):
from websocket import WebSocketApp

result_handlers = {}

class Socket(object):

def __init__(self):
self._event_queues = defaultdict(Queue)

def get_event_queue(self, event_type):
return self._event_queues[event_type]

def wait_for_event(self, event_type):
return self.get_event_queue(event_type).get()

def rpc(self, _method, **data):
id = str(uuid.uuid4())
event = Event()
result_handlers[id] = event.send
ws_app.send(json.dumps({
'method': _method,
'data': data,
'correlation_id': id,
}))

rv = event.wait()
if rv['success']:
return rv['data']
raise deserialize(rv['error'])

sock = Socket()

def on_message(ws, message):
msg = json.loads(message)
if msg['type'] == 'event':
sock.get_event_queue(msg['event']).put((msg['event'], msg['data']))
elif msg['type'] == 'result':
result_id = msg['correlation_id']
handler = result_handlers.pop(result_id, None)
if handler is not None:
handler(msg)

ready_event = Event()

def on_open(ws):
ready_event.send(None)

def on_error(ws, err):
ready_event.send(err)

ws_app = WebSocketApp(
'ws://%s:%d%s' % (host, port, path),
on_message=on_message,
on_open=on_open,
on_error=on_error,
)

def connect_socket():
err = ready_event.wait()
if err is not None:
raise err
return sock

return ws_app, connect_socket
Empty file added nameko/web/__init__.py
Empty file.

0 comments on commit cae5663

Please sign in to comment.