Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thespian + RabbitMQ (Pika) #47

Closed
htarnacki opened this issue Sep 22, 2019 · 14 comments
Closed

Thespian + RabbitMQ (Pika) #47

htarnacki opened this issue Sep 22, 2019 · 14 comments

Comments

@htarnacki
Copy link

In my actor based application I want to use Thespian actors along with the Pika integration to RabbitMQ instance. My problem is: what should be the best pattern of such integration? In other words: how to read RabbitMQ messages delivered by Pika to one of a Thespian actor? In a non blocking and best asynchronous way?

@kquick
Copy link
Owner

kquick commented Sep 23, 2019

Both Thespian and Pika provide scheduling capabilities, so it can be difficult to combine different frameworks that are attempting to implement the same feature.

I think the best approach here (I haven't done this myself, so this is based on a high-level assessment of Pika) would be to combine the Thespian Watch functionality (https://thespianpy.com/doc/using.html#hH-94edef18-154e-4847-864f-027dff4f6e0a) with the Pika SelectConnector and a custom Pika poller based on Pika's _PollerBase (https://github.com/pika/pika/blob/master/pika/adapters/select_connection.py#L579).

At the core, Thespian implements a select()-style scheduling framework (similar to Pika's SelectPoller at https://github.com/pika/pika/blob/master/pika/adapters/select_connection.py#L930), and it can be provided with an additional set of file descriptors that it will select on; when activity is available on one of those file descriptors, it will deliver a WatchMessage with those file descriptors.

Roughly this would mean creating a custom Poller structure that would record all the file descriptors it was supposed to call as Pika calls it. Register that poller for Pika, store it in the Actor's local storage, and then wait for incoming messages. When you get an incoming message that is a file-descriptor managed by the poller, call the _PollerBase._dispatch_fd_events to handle those events (like https://github.com/pika/pika/blob/master/pika/adapters/select_connection.py#L975). When it calls back to poll, return back to the receiveMessage processing (perhaps via coroutine yield... depending on how you structure your core flow) that handled the WatchMessage, which will return a ThespianWatch with all of the currently registered fd's for the poller. In this way, Thespian provides the select() call for both Thespian and Pika file descriptors, and the WatchMessage lets you forward those file descriptors to Pika for handling.

@htarnacki
Copy link
Author

Thanks a lot for in depth clarification. I will try this solution although it seems more complicated than i thougth asking the question ;)
Don't you plan any simpler and generic integration between Thespian and frameworks from asynchronous, event loops world?
For example new kind of actors managed by event loop instantiated and managed by Thespian itself? That Thespian event loop could be accessible by actor and could then be plugged into frameworks such as Pika (event loop based frameworks/tools usually have ability to plug into them an external event loop instance). In that way we could have one event loop allowing cooperative progress for both worlds: Thespian and tools like Pika

@kquick
Copy link
Owner

kquick commented Sep 25, 2019

To some extent, Thespian and an event bus exist in the same solution space, so the overlap can be a bit hard to resolve. Thespian is written to allow multiple transport layers to be created however, so ideally a message bus transport layer could use RabbitMQ as the communications layer between Thespian Actors.

At present, I don't know of a lot of interest in a solution like this other than yours, so I'm hesitant to commit much time and effort to implementing this approach.

@rbotafogo
Copy link

If such integration existed, I would probably use it. This certainly doesn't qualify as 'a lot of interest', but two is certainly better than one!

Thespian looks very promising for what I need. Great work!

@kquick
Copy link
Owner

kquick commented Nov 22, 2019

I'd like to use this issue to gather more input on the usage model (and interest level) for using Thespian and RabbitMQ together. My assumption would be that there is an existing RabbitMQ network with clients and that a particular client would also wish to use Thespian to implement its functionality like below:

                                             Thespian Actor
          RabbitMQ Bus                       /
   ------------------------------> Client ------------ Thespian Actor
                                                 \
                                                  Thespian Actor

In this configuration, the "Client" would be capable of talking to both the RabbitMQ bus to send and receive messages there, as well as to Thespian Actors to send and receive messages on that side as well. The "Client" would be the primary interface between the two otherwise independent environments.

Please let me know if the above represents the usage model needed or if there is other functionality or considerations that would need to be addressed.

To anyone else desiring this functionality: please up-vote this issue so that I can determine how much interest there is in having this implemented.

@rbotafogo
Copy link

rbotafogo commented Nov 22, 2019 via email

Repository owner deleted a comment from rbotafogo Dec 12, 2019
@htarnacki
Copy link
Author

That model is exactly what we need

@htarnacki
Copy link
Author

I think the best approach here (I haven't done this myself, so this is based on a high-level assessment of Pika) would be to combine the Thespian Watch functionality (https://thespianpy.com/doc/using.html#hH-94edef18-154e-4847-864f-027dff4f6e0a) with the Pika SelectConnector and a custom Pika poller based on Pika's _PollerBase

Ok so i have tried to implement this. Unfortunately without a success. There are some major problems with that approach.

Problem number 1:

It only watches for "readable" events, not writable or exception events.

Pika uses all 3 types of events: read, write and error for negotiating a connection and sending and receiving messages. Thespian watch functionality only allows for a partial implementation of this. So we do not have a solution that covers all requirements and fits well to an actor life cycle and design pattern. Write and error events must be implemented somehow differently than read events possibly blocking and breaking the normal and desired nature of an actor processing

Problem number 2:

There is no timeout (use .wakeupAfter() for this).

Pika event loop uses timeouts extensively. And it seems for me that wake up messages do not work well in that case.
Let's look at the following code:

            self.wakeupAfter(timeout, payload='SelectTimeout')
            return ThespianWatch(file_descriptors)

for example:

  • pika requires to watch for read events at file descriptor 1
  • timeout is 2s
  • we send a wake up message wit timeout 2s and watch message with file descriptor 1
  • after 2s an actor receives a wake up message and then it sends back to pika information that file descriptor 1 is not ready for reading
  • so now pika requires us to watch for read events but at a different file descriptor number 2

does this mean that we are leaking resources here? will now thespian still watch for events on both file descriptors: 1 and 2 even if right now pika do not requires observing file descriptor 1?

and another example for the same code:
let's say that we want to just quickly check if file descriptor is ready for reading without blocking and waiting. We want to have immediate response. So the timeout variable equals 0:

            self.wakeupAfter(0, payload='SelectTimeout')
            return ThespianWatch(file_descriptors)

it looks like wake up message will be always delivered first before the watch message. Ok we can just simply use a slightly higher timeout (what makes at the other side an undesired slowdown):

            self.wakeupAfter(1, payload='SelectTimeout')
            return ThespianWatch(file_descriptors)

but it still looks like overcomplicated solution for something that should be simply implemented directly in thespian api in a such way:

return ThespianWatch(read_fdrs, write_fdrs, error_fdrs, timeout)

Problem number 3:
it looks like Thespian watch sometimes gives me a different results than plain python select api

ok so the code:

script to start local rabbitmq instance (in docker):
start.sh

#!/usr/bin/env bash

readonly CID="$(docker container ls -a --filter='name=rabbitmq-thespian' --format='{{.ID}}')"
readonly ID=$(id -u)


if [[ "$CID" ]]; then
	docker start "$CID"
else
	docker run \
		-d \
		--user $ID \
		--restart=always \
		--hostname YOUR_HOSTNAME \
		--name rabbitmq-thespian \
		-p 8080:15672 \
		-p 5672:5672 \
		-e RABBITMQ_DEFAULT_USER=admin \
		-e RABBITMQ_DEFAULT_PASS=123 \
		rabbitmq:3-management
fi

custom SelectPooler and IOLoop:

import collections
import select
from pika.adapters.select_connection import PollEvents, _is_resumable, \
    _SELECT_ERRORS
from pika.adapters.select_connection import SelectPoller as PikaSelectPooler
from pika.adapters.select_connection import IOLoop as PikaIOLoop
from logging import info


class SelectPoller(PikaSelectPooler):

    def poll(self):
        while True:
            try:
                info('try read: ' + str(self._fd_events[PollEvents.READ]))
                info('try write: ' + str(self._fd_events[PollEvents.WRITE]))
                info('try error: ' + str(self._fd_events[PollEvents.ERROR]))
                # handle somehow write and error events (this blocks an actor)
                _, write, error = select.select(
                    [],
                    self._fd_events[PollEvents.WRITE],
                    self._fd_events[PollEvents.ERROR], self._get_max_wait())
                info(f'ready to write: {write}')
                info(f'ready to error: {error}')
                # handle read events by thespian watch
                read = (
                    yield (
                        self._fd_events[PollEvents.READ],
                        self._get_max_wait()))
                break
            except _SELECT_ERRORS as error:
                if _is_resumable(error):
                    continue
                else:
                    raise
        info(f'ready to read: {read}')
        fd_event_map = collections.defaultdict(int)
        for fd_set, evt in zip(
                (read, write, error),
                (PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR)):
            for fileno in fd_set:
                fd_event_map[fileno] |= evt
        self._dispatch_fd_events(fd_event_map)

    def start(self):
        info('SelectPooler.start')
        if self._running:
            raise RuntimeError('IOLoop is not reentrant and is already running')
        self._running = True
        self.activate_poller()
        try:
            while not self._stopping:
                yield from self.poll()
                self._process_timeouts()
        finally:
            try:
                self.deactivate_poller()
            finally:
                self._stopping = False
                self._running = False


class IOLoop(PikaIOLoop):

    @staticmethod
    def _get_poller(get_wait_seconds, process_timeouts):
        kwargs = dict(
            get_wait_seconds=get_wait_seconds,
            process_timeouts=process_timeouts)
        return SelectPoller(**kwargs)

    def start(self):
        info('IOLoop.start')
        return self._poller.start()

an actor communicating with a rabbitmq instance:

from typing import Any, Dict, List
from logging import debug, info
from aurel.lib.Actors.protocols import IActor
from thespian.actors import ThespianWatch
from aurel.lib.RabbitMQ import IOLoop
import pika

credentials = pika.PlainCredentials('admin', '123')
parameters = pika.ConnectionParameters('YOUR_HOSTNAME',
                                       5672,
                                       '/',
                                       credentials)

# Listens on RabbitMQ queues for incomming messages


def on_connection_open(connection):
    info('ON_CONNECTION_OPEN')
    connection.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    info('ON_CHANNEL_OPEN')


class RabbitMQListener(IActor):

    def _process_file_descriptors(
            self, file_descriptors: List[int], max_wait: int = None):
        self._file_descriptors = file_descriptors
        max_wait = max_wait or 0
        info(f'_process_file_descriptors.file_descriptors: {file_descriptors}')
        info(f'_process_file_descriptors.max_wait: {max_wait}')
        if len(file_descriptors) < 1:
            self.wakeupAfter(max_wait, payload='EmptyFileDescriptors')
        else:
            self.wakeupAfter(max_wait + 1, payload='EmptyFileDescriptors')
            return ThespianWatch(file_descriptors)

    def receiveMsg_StartActor(self, message: Any, sender: IActor):
        info(f'receiveMsg_StartActor: {message}')
        connection = pika.SelectConnection(
            parameters=parameters,
            custom_ioloop=IOLoop(),
            on_open_callback=on_connection_open)
        self._select_gen = connection.ioloop.start()
        file_descriptors, max_wait = next(self._select_gen)
        return self._process_file_descriptors(file_descriptors, max_wait)

    def receiveMsg_WakeupMessage(self, message: Any, sender: IActor):
        info(f'receiveMsg_WakeupMessage: {message}')
        if isinstance(message.payload, str):
            if message.payload == 'EmptyFileDescriptors':
                file_descriptors, max_wait = self._select_gen.send([])
                return self._process_file_descriptors(
                    file_descriptors, max_wait)

    def receiveMsg_WatchMessage(self, message: Any, sender: IActor):
        info(f'receiveMsg_WatchMessage: {message}')
        info(f'message.ready: {message.ready}')
        file_descriptors_to_send = [
            _ for _ in message.ready
            if _ in self._file_descriptors]
        info(f'file_descriptors_to_send: {file_descriptors_to_send}')
        if len(file_descriptors_to_send) > 0:
            file_descriptors, max_wait = self._select_gen.send(
                file_descriptors_to_send)
            return self._process_file_descriptors(
                file_descriptors, max_wait)

and this code doesn't work because at several attempts only once was with successful connection to rabbitmq instance (ON_CONNECTION_OPEN). 99% of attempts were unsuccessful
So i have started to debugging and playing around with that code and made a following change in SelectPoller that also gave me 100% of successful connections:

    def poll(self):
        while True:
            try:
                info('try read: ' + str(self._fd_events[PollEvents.READ]))
                info('try write: ' + str(self._fd_events[PollEvents.WRITE]))
                info('try error: ' + str(self._fd_events[PollEvents.ERROR]))
                read, write, error = select.select(
                    self._fd_events[PollEvents.READ],
                    self._fd_events[PollEvents.WRITE],
                    self._fd_events[PollEvents.ERROR], self._get_max_wait())
                info(f'ready to read: {read}')
                info(f'ready to write: {write}')
                info(f'ready to error: {error}')
                read_thespian = (
                    yield (
                        self._fd_events[PollEvents.READ],
                        0))
                info(f'ready to read thespian: {read_thespian}')
                break
            except _SELECT_ERRORS as error:
                if _is_resumable(error):
                    continue
                else:
                    raise
        fd_event_map = collections.defaultdict(int)
        for fd_set, evt in zip(
                (read, write, error),
                (PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR)):
            for fileno in fd_set:
                fd_event_map[fileno] |= evt
        self._dispatch_fd_events(fd_event_map)

as you can see i started to use normal python select api for watching for all events (read, write, error) and after that i additionally yield file descriptors to thespian to watch for read events but with timeout 0. I expect that thespian watch should give me the same results as normal python select api (i suppose that pika is blocked in the meantime and nothing is reading from this ready for read file descriptors). And for the most of the time results are indeed the same but not always:

2021-05-02 13:25:49,235 INFO  =>  try read: {18, 20}  [RabbitMQ.py:17] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,235 INFO  =>  try write: set()  [RabbitMQ.py:18] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,236 INFO  =>  try error: set()  [RabbitMQ.py:19] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,237 INFO  =>  ready to read: [18]  [RabbitMQ.py:25] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,238 INFO  =>  ready to write: []  [RabbitMQ.py:26] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,239 INFO  =>  ready to error: []  [RabbitMQ.py:27] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,239 INFO  =>  _process_file_descriptors.file_descriptors: {18, 20}  [RabbitMQListener.py:47] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,240 INFO  =>  _process_file_descriptors.max_wait: 0  [RabbitMQListener.py:48] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,241 INFO  =>  receiveMsg_WakeupMessage: WakeupMessage(1, EmptyFileDescriptors)  [RabbitMQListener.py:69] [ActorAddr-(T|:45357)]
2021-05-02 13:25:49,241 INFO  =>  ready to read thespian: []  [RabbitMQ.py:40] [ActorAddr-(T|:45357)]

And after all of that one another problem occured. I noticed strange intervals of logging inactivity. It looks like logging regularly freeze for at least 4-5 minutes. So i see normal info outputs and suddenly output stops showing for at least 4 minutes. And then a bunch of logs is printed at once to terminal and once again logging stops showing. If i change one of the "info" logging to simply "print" then i see that code is still running but the rest info logs stuck somewhere at the logger actor:

    def poll(self):
        while True:
            try:
                print('try read: ' + str(self._fd_events[PollEvents.READ]))
                print('try write: ' + str(self._fd_events[PollEvents.WRITE]))
                print('try error: ' + str(self._fd_events[PollEvents.ERROR]))
                read, write, error = select.select(
                    self._fd_events[PollEvents.READ],
                    self._fd_events[PollEvents.WRITE],
                    self._fd_events[PollEvents.ERROR], self._get_max_wait())
                print(f'ready to read: {read}')
                print(f'ready to write: {write}')
                print(f'ready to error: {error}')
                read_thespian = (
                    yield (
                        self._fd_events[PollEvents.READ],
                        0))
                print(f'ready to read thespian: {read_thespian}')
                break
            except _SELECT_ERRORS as error:
                if _is_resumable(error):
                    continue
                else:
                    raise
        fd_event_map = collections.defaultdict(int)
        for fd_set, evt in zip(
                (read, write, error),
                (PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR)):
            for fileno in fd_set:
                fd_event_map[fileno] |= evt
        self._dispatch_fd_events(fd_event_map)
2021-05-02 23:48:33,678 INFO  =>  _process_file_descriptors.ThespianWatch  [RabbitMQListener.py:53] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,683 INFO  =>  receiveMsg_WakeupMessage: WakeupMessage(2, EmptyFileDescriptors)  [RabbitMQListener.py:69] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,686 INFO  =>  message.payload: EmptyFileDescriptors  [RabbitMQListener.py:70] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,690 INFO  =>  message.payload is str  [RabbitMQListener.py:72] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,694 INFO  =>  EmptyFileDescriptors  [RabbitMQListener.py:74] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,698 INFO  =>  _process_file_descriptors.file_descriptors: {18, 20}  [RabbitMQListener.py:47] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,701 INFO  =>  _process_file_descriptors.max_wait: 0  [RabbitMQListener.py:48] [ActorAddr-(T|:46703)]
2021-05-02 23:48:33,702 INFO  =>  _process_file_descriptors.ThespianWatch  [RabbitMQListener.py:53] [ActorAddr-(T|:46703)]
2021-05-02 23:48:35,325 INFO  =>  receiveMsg_WakeupMessage: WakeupMessage(2, EmptyFileDescriptors)  [RabbitMQListener.py:69] [ActorAddr-(T|:46703)]
ready to read thespian: []
try read: {18, 20}
try write: set()
try error: set()
ready to read: []
ready to write: []
ready to error: []
ready to read thespian: []
try read: {18, 20}
try write: set()
try error: set()
ready to read: []
ready to write: []
ready to error: []
ready to read thespian: []
try read: {18, 20}
try write: set()
try error: set()
ready to read: []
ready to write: []
ready to error: []
ready to read thespian: []
try read: {18, 20}
try write: set()
try error: set()
ready to read: []
ready to write: []
ready to error: []
...
...
...
... (and after several minutes suddenly info comes back to life again)
2021-05-02 23:50:08,730 INFO  =>  _process_file_descriptors.file_descriptors: {18, 20}  [RabbitMQListener.py:47] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,732 INFO  =>  _process_file_descriptors.max_wait: 0  [RabbitMQListener.py:48] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,733 INFO  =>  _process_file_descriptors.ThespianWatch  [RabbitMQListener.py:53] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,734 INFO  =>  receiveMsg_WakeupMessage: WakeupMessage(2, EmptyFileDescriptors)  [RabbitMQListener.py:69] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,735 INFO  =>  message.payload: EmptyFileDescriptors  [RabbitMQListener.py:70] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,736 INFO  =>  message.payload is str  [RabbitMQListener.py:72] [ActorAddr-(T|:46703)]
2021-05-02 23:50:08,738 INFO  =>  EmptyFileDescriptors  [RabbitMQListener.py:74] [ActorAddr-(T|:46703)]
...

Any help appreciated ;-)

I use thespian==3.10.1

@kquick
Copy link
Owner

kquick commented May 5, 2021

Hi @htarnacki . I just returned from vacation and wanted to let you know I've seen this report. There's a lot here, so I will need a day or so to digest this and be able to respond.

@kquick
Copy link
Owner

kquick commented May 10, 2021

does this mean that we are leaking resources here? will now thespian still watch for events on both file descriptors: 1 and 2 even if right now pika do not requires observing file descriptor 1?

There is no resource leakage on the Thespian side for this. Thespian will only listen on file descriptors returned in the most recent ThespianWatch response, and any internal resources associated with that are discarded by the time the actor is invoked with the watch results.

@kquick
Copy link
Owner

kquick commented May 10, 2021

but it still looks like overcomplicated solution for something that should be simply implemented directly in thespian api in a such way:

return ThespianWatch(read_fdrs, write_fdrs, error_fdrs, timeout)

I think that might be a potential to extend the ThespianWatch to include write_fdrs and error_fdrs. I think the timeout is an orthogonal concern (some users might only be interested in timeouts, not file descriptors) so I would still use the wakeupAfter to implement that functionality.

I'm somewhat concerned that write and error conditions are local and can be persistent, so it's possible to get into situations where the use of these prevents even normal functionality, but since this is an issue with any use of select and this is not a mainline use case, it's probably OK to provide this with appropriate usage warnings.

If the ability to wait on write and error conditions still sounds useful, I can see about putting together some initial support for that which could be tested, but let me know if this is still the case after reading some of my other responses on this issue.

@kquick
Copy link
Owner

kquick commented May 10, 2021

I noticed strange intervals of logging inactivity. It looks like logging regularly freeze for at least 4-5 minutes. So i see normal info outputs and suddenly output stops showing for at least 4 minutes. And then a bunch of logs is printed at once to terminal and once again logging stops showing.

I think this is probably providing some useful insight into some of the otherwise invisible functionality going on. First some background though:

In Thespian, there is a flow control mechanism that will cause an actor to suspend incoming activity for a period of time if the number of queued outbound messages exceeds a threshold value. This is intended to let the actor complete the current processing and send out the corresponding results before accepting new work that will result in additional processing demands.

Also, Thespian modifies the logging facility to forward log messages to the Logger actor, routing those messages through the MultiProcAdmin. The reason that all messages are forwarded to a single Logger is because the normal logging facility doesn't provide filesystem-level protections against multiple processes writing to the same logfiles and so only a single process (the Logger) should write to the file to ensure messages aren't lost. The other reason is that in a multi-system Actor convention, some log messages are forwarded to the central actor system for logging there as well as locally. This latter reason is why log messages are forwarded through the MultiProcAdmin, and also for the reason that the MultiProcAdmin is the manager of the Logger actor and could restart a new Logger if the previous Logger dies or exits.

Given this background, I believe the reason that you are seeing the logging delays is because pika is starting up multiple threads and some of those threads are generating (perhaps large amounts of) logging at a low severity level (e.g. Debug). Those low level messages are not visible in the logs, but they are forwarded from the generating Actor process to the Logger via the MultiProcAdmin; the threshold-based discard is managed by the Logger (since the level can be dynamically changed), so all logging must be forwarded. I believe there is enough generated logging that it is overloading the MultiProcAdmin outbound threshold which is causing it to stop receiving messages; since logging messages continue to accumulate in the sending Actor processes, this creating a large backlog that must be processed when the MultiProcAdmin re-enables input; this cycle may have to be repeated several times before finally processing log messages which are visibly logged.

Based on the behavior, I believe that pika does have other threads running, which could also be affecting the difference in the select results (since the two select calls happen at different times and the pika threads could be reading from the sockets in the interim). You've done some very intricate and impressive work in trying to weave the two of these together, but I'm concerned about the level of complication and dependency on internals that is needed to support this.

Since the model we described earlier in this thread is that of a Client representing the boundary between the pika-supported RabbitMQ message bus on one side and the Actor System Actors on the other side, I'm beginning to think that a somewhat different approach might be simpler and more stable:

  1. The creation of the 'ClientActor' creates an Actor, and that Actor uses the multiprocessing library to create a sub-process, using the Pipe method to connect those two processes.
  2. The sub-process (PikaClient) created by the ClientActor connects via pika to the RabbitMQ message bus, and uses standard pika scheduling, extended to support the ability to detect incoming messages on the Pipe and read those messages.
  3. All timing and selecting functionality in the PikaClient is handled natively by Pika; any messages it receives on the Pipe (from the ClientActor) are forwarded as appropriate to the message bus, and messages received from RabbitMQ can be written to the Pipe to send them to the ClientActor.
  4. The ClientActor simply maintains a ThespianWatch on the fd for the Pipe to the PikaClient; when messages are received from the PikaClient, they are handled by the actors as needed; actor messages outbound to the RabbitMQ are written to the Pipe by the ClientActor.

This configuration separates all of the processing scheduling of Thespian and pika into two separate process domains where they don't conflict with each other or need to be tightly integrated. The Pipe communication should be pretty straightforward for both to interact with. The logging domains are also separate (although you might need to re-initialize logging in the PikaClient because it might already be set to Actor Logger forwarding, depending on how the multiprocessing sub-process is setup), so the logging delay issues you noted above should go away. Also, in this system, no extensions are needed to the ThespianWatch functionality (as described in one of my other responses).

Does this ClientActor/PikaClient approach sound reasonable for your needs?

@htarnacki
Copy link
Author

hi,
thanks for your answer. Recently i was looking around Thespian documentation and it's features and decided to implement whole system without any RabbitMQ internal communication. I will give a chance to a built in features of Thespian itself.
So i will close this issue and open another one where i will bother you with a lot of different questions regarding Thespian and how i use it in my work ;-)

@kquick
Copy link
Owner

kquick commented May 19, 2021

OK @htarnacki , that sounds good. Good luck and let me know if you run into issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants