Skip to content

Commit

Permalink
Move server's logic from executors
Browse files Browse the repository at this point in the history
Now we have situation when openstack projects like Mistral needs
extra oslo.messaging functionality.

 But it is too complicated now to to implement something new and
 integrate it with current code because there is a little bit mess.
 1) Executor should be responsible for how to run jobs
      (but now also has code with server logic)
 2) Dispatcher should be responsible for routing message to the
     target endpoint for processing (but it also has serialisation, sending replies,
     executing some executor's callbacks etc)
 3) Server should do all server specific logic, we need to have different
      implementation of servers for RPC and notification, not different implementations
      of dispatchers

 This patch fixes 1-st point

Change-Id: Ib6408f408889bb7b7056722be636a5547b1a780d
  • Loading branch information
dukhlov committed Feb 23, 2016
1 parent 92c4f76 commit 1482687
Show file tree
Hide file tree
Showing 18 changed files with 190 additions and 623 deletions.
4 changes: 2 additions & 2 deletions oslo_messaging/_cmd/zmq_broker.py
Expand Up @@ -20,11 +20,11 @@

from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.broker import zmq_broker
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging import server

CONF = cfg.CONF
CONF.register_opts(impl_zmq.zmq_opts)
CONF.register_opts(impl_pooledexecutor._pool_opts)
CONF.register_opts(server._pool_opts)
CONF.rpc_zmq_native = True


Expand Down
4 changes: 2 additions & 2 deletions oslo_messaging/_drivers/impl_zmq.py
Expand Up @@ -24,8 +24,8 @@
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._executors import impl_pooledexecutor
from oslo_messaging._i18n import _LE
from oslo_messaging import server


RPCException = rpc_common.RPCException
Expand Down Expand Up @@ -160,7 +160,7 @@ def __init__(self, conf, url, default_exchange=None,
raise ImportError(_LE("ZeroMQ is not available!"))

conf.register_opts(zmq_opts)
conf.register_opts(impl_pooledexecutor._pool_opts)
conf.register_opts(server._pool_opts)
conf.register_opts(base.base_opts)
self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods
Expand Down
44 changes: 38 additions & 6 deletions oslo_messaging/_drivers/protocols/amqp/driver.py
Expand Up @@ -20,14 +20,15 @@
'tasks' that are performed on its behalf via the controller module.
"""

import collections
import logging
import os
import threading
import time

from oslo_serialization import jsonutils
from oslo_utils import importutils
from six import moves
from oslo_utils import timeutils

from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
Expand Down Expand Up @@ -114,18 +115,49 @@ def requeue(self):
pass


class Queue(object):
def __init__(self):
self._queue = collections.deque()
self._lock = threading.Lock()
self._pop_wake_condition = threading.Condition(self._lock)
self._started = True

def put(self, item):
with self._lock:
self._queue.appendleft(item)
self._pop_wake_condition.notify()

def pop(self, timeout):
with timeutils.StopWatch(timeout) as stop_watcher:
with self._lock:
while len(self._queue) == 0:
if stop_watcher.expired() or not self._started:
return None
self._pop_wake_condition.wait(
stop_watcher.leftover(return_none=True)
)
return self._queue.pop()

def stop(self):
with self._lock:
self._started = False
self._pop_wake_condition.notify_all()


class ProtonListener(base.Listener):
def __init__(self, driver):
super(ProtonListener, self).__init__(driver.prefetch_size)
self.driver = driver
self.incoming = moves.queue.Queue()
self.incoming = Queue()

def stop(self):
self.incoming.stop()

@base.batch_poll_helper
def poll(self, timeout=None):
try:
message = self.incoming.get(True, timeout)
except moves.queue.Empty:
return
message = self.incoming.pop(timeout)
if message is None:
return None
request, ctxt = unmarshal_request(message)
LOG.debug("Returning incoming message")
return ProtonIncomingMessage(self, ctxt, request, message)
Expand Down
Empty file.
44 changes: 0 additions & 44 deletions oslo_messaging/_executors/base.py

This file was deleted.

102 changes: 0 additions & 102 deletions oslo_messaging/_executors/impl_blocking.py

This file was deleted.

43 changes: 0 additions & 43 deletions oslo_messaging/_executors/impl_eventlet.py

This file was deleted.

0 comments on commit 1482687

Please sign in to comment.