Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callbacks not served in background thread. #522

Closed
mawenzy opened this issue Jan 22, 2023 · 1 comment
Closed

Callbacks not served in background thread. #522

mawenzy opened this issue Jan 22, 2023 · 1 comment
Assignees
Labels
Done The issue discussion is exhausted and is closed w/ comment Question Issue is a result of unclear documentation or lack of documentation

Comments

@mawenzy
Copy link
Contributor

mawenzy commented Jan 22, 2023

I am trying to implement callbacks and I am having trouble making sure that these are always handled in a background thread.

Environment
  • rpyc version 5.3.0
  • Python 3.10.9
  • Fedora Linux 36 (Workstation Edition) x86_64
Minimal example

Server:

import threading
import time

import rpyc
from rpyc.utils.server import ThreadedServer

class CallbackService(rpyc.Service):
    def __init__(self) -> None:
        self.callbacks = []

        thread = threading.Thread(target=self._callback_thread)
        thread.start()

    def exposed_register_callback(self, fun):
        self.callbacks.append(fun)

    def _callback_thread(self):
        while True:
            time.sleep(0.5)

            for callback in self.callbacks:
                callback()

    def exposed_get(self):
        return 5

if __name__ == "__main__":
    ThreadedServer(CallbackService, port="12345").start()

Client:

import threading
import time

import rpyc
from rpyc import BgServingThread

conn = rpyc.connect("localhost", 12345)
bgsrv = BgServingThread(conn)

class Buffer():
    def __init__(self):
        self.lock = threading.Lock()
        conn.root.register_callback(self._callback)

    def _callback(self):
        if threading.current_thread().ident == threading.main_thread().ident:
            print("callback in main thread")

    def reload(self):
        r = conn.root.get()

b = Buffer()

for i in range(10):
    time.sleep(0.1)
    b.reload()

time.sleep(1)

conn.close()
Output
callback in main thread
callback in main thread
Expected output

Nothing :)

As far as I understand it, the problem is that, when calling b.reload() and with that conn.root.get(), the main thread serves the connection. If a callback comes in before the wanted response, the callback is somehow handled by the main thread instead of the BgServingThread.

So my question is how do I make sure that the callbacks are handled in the background thread and never in the main one? A workaround seems to be to use:

bgsrv = threading.Thread(target=conn.serve_all)
bgsrv.start()

but I do not understand why this seems to be working.

@comrumino comrumino self-assigned this Mar 10, 2023
@comrumino comrumino added the To Start Description reviewed and a maintainer needs "to start" triage label Mar 10, 2023
@comrumino comrumino added Question Issue is a result of unclear documentation or lack of documentation Done The issue discussion is exhausted and is closed w/ comment and removed To Start Description reviewed and a maintainer needs "to start" triage labels Mar 10, 2023
@comrumino
Copy link
Collaborator

So, I deleted my previous comment b/c it was inaccurate. It turns out you are looking to use some newer behavior which binds thread communication. The behavior you saw was b/c b.reload() was serving the callbacks when waiting for it's response.

I rewrote your test case to show use the newer experimental behavior. If you toggle {'bind_threads': True} to False, you will see it starts to execute in the main thread again.
client.py

import threading
import time

import rpyc
from rpyc import BgServingThread

class Buffer(rpyc.Service):
    def on_connect(self, conn):
        self.conn = conn
        self.conn.root.register_callback(self.exposed_callback)

    def exposed_callback(self):
        if threading.current_thread().ident == threading.main_thread().ident:
            print("callback in main thread")
        else:
            print("callback in background thread")

    def reload(self):
        return self.conn.root.get()

conn = rpyc.connect("localhost", 12345, service=Buffer, config={'bind_threads': True})
bgsrv = BgServingThread(conn)


for i in range(10):
    time.sleep(0.1)
    conn._local_root.reload()

time.sleep(1)
bgsrv.stop()
conn.close()

server.py

import threading
import time

import rpyc
from rpyc.utils.server import ThreadedServer

class CallbackService(rpyc.Service):
    def __init__(self) -> None:
        self.callbacks = []

        thread = threading.Thread(target=self._callback_thread)
        thread.start()

    def exposed_register_callback(self, fun):
        self.callbacks.append(fun)

    def _callback_thread(self):
        while True:
            time.sleep(0.5)
            while self.callbacks:
                callback = self.callbacks.pop()
                callback()

    def exposed_get(self):
        return 5

if __name__ == "__main__":
    ThreadedServer(CallbackService, port="12345", protocol_config={'bind_threads': True}).start()

comrumino added a commit that referenced this issue Mar 10, 2023
… is expected default behavior as a caveat.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Done The issue discussion is exhausted and is closed w/ comment Question Issue is a result of unclear documentation or lack of documentation
Projects
None yet
Development

No branches or pull requests

2 participants