New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sharing instance of proxy object across processes results in pickle errors #101320
Comments
Actually, I am wondering the following: in manager class, method handle_request is spawned for each new request. It will call whatever function was received from the request (https://github.com/python/cpython/blob/main/Lib/multiprocessing/managers.py#L202) |
Here is a repro case :
Doing this just raises |
And here is an example where I have no problem at all.
This will run forever without issues |
Actually, I managed to create a more complex example where even when the custom class is fully synced, I still have corrupted data. I've just seen that pickle itself isn't thread safe apparently so I wonder how this Manager class can work in the general case ? given that it processes input requests by threads without locking and that pickle (default serializer) is not thread-safe |
I'm not a core developer, but IMHO it's not guaranteed that the multiprocessing API will work correctly with an |
In both examples I tried either to reuse the proxy object instance directly (forked from main process) and also to rebuild it from scratch from what we get from the call to |
Hi @vincent-grosbois, I believe the superficial reason of this error is that the The complete code is: import os
from multiprocessing.managers import SyncManager
if __name__ == '__main__':
manager = SyncManager(authkey=b'test')
manager.start()
address = manager.address
d = manager.dict()
pickled_dict = d.__reduce__()
pickled_dict[1][-1]["authkey"] = b"test"
print(pickled_dict)
for i in range(1000):
d[i] = i
child_id = os.fork()
if child_id != 0:
# in parent, do work on the proxy object forever
i = 0
while True:
d[i%1000] = i%3434
i += 1
else:
# in children
# connect to manager process
child_manager = SyncManager(address=address, authkey=b'test')
child_manager.connect()
# rebuild the dictionary proxy
proxy_obj = pickled_dict[0](*pickled_dict[1])
# create a new connection here
proxy_obj._connect()
# read on the proxy object forever
while True:
print(list(proxy_obj.values())[:10]) |
And there's an important difference between the two snippets: you didn't assign data to custom dict before forking in the second snippet. If you do, you should get the same result: import os
from multiprocessing.managers import SyncManager
from threading import RLock
class SyncedDictionary:
def __init__(self):
# store the data in the instance
self.data = {}
self.lock = RLock()
print(f"init from {os.getpid()}")
def add(self, k, v):
with self.lock:
self.data[k] = v
def get_values(self):
with self.lock:
return list(self.data.values())
if __name__ == '__main__':
# custom class
SyncManager.register("custom", SyncedDictionary)
manager = SyncManager(authkey=b'test')
manager.start()
address = manager.address
print(f"from main pid {os.getpid()}")
custom_dict = manager.custom()
pickled_dict = custom_dict.__reduce__()
pickled_dict[1][-1]["authkey"] = b"test"
print(pickled_dict)
# missing assignment here
for i in range(1000):
custom_dict.add(i, i)
child_id = os.fork()
if child_id != 0:
# in parent, do work on the proxy object forever
i = 0
while True:
custom_dict.add(i % 1000, i % 3434)
i += 1
else:
for i in range(3):
os.fork() # even more child processes...
print(os.getpid())
# in children
# connect to manager process
child_manager = SyncManager(address=address, authkey=b'test')
child_manager.connect()
# rebuild the dictionary proxy
proxy_obj = pickled_dict[0](*pickled_dict[1])
# read on the proxy object forever
while True:
list(proxy_obj.get_values())[:10] In this snippet, the proxy object in the subprocess will also reuse the same connection as that in the parent process. The solution is still |
So what's the underlying reason? Let's go through some code snippets first. When a proxy is created, it tries to find existing connection from TLS. https://github.com/python/cpython/blob/main/Lib/multiprocessing/managers.py#L760 Then when we invoke methods through this proxy by calling The TLS data is stored in And the registered hooks is called by the So the underlying solution is, like @relent95 said, using Process to create your subprocess. import os
import multiprocessing as mp
from multiprocessing.managers import SyncManager
def worker(pickled_dict, address):
child_manager = SyncManager(address=address, authkey=b'test')
child_manager.connect()
# rebuild the dictionary proxy
proxy_obj = pickled_dict[0](*pickled_dict[1])
# read on the proxy object forever
while True:
print(list(proxy_obj.values())[:10])
if __name__ == '__main__':
mp.set_start_method('fork')
log_to_stderr(logging.DEBUG)
manager = SyncManager(authkey=b'test')
manager.start()
address = manager.address
d = manager.dict()
pickled_dict = d.__reduce__()
pickled_dict[1][-1]["authkey"] = b"test"
print(pickled_dict)
for i in range(1000):
d[i] = i
p = mp.Process(target=worker, args=(pickled_dict, address))
p.start()
i = 0
while True:
d[i%1000] = i%3434
i += 1
p.join() |
Thanks all, this thread is incredibly helpful. Just have a few questions on things I don't understand / aren't covered above. I know forking and managers may not be an ideal combination. But sometimes you don't have a choice. For instance, see my example below about using HTTPServer with ForkingMixin. I can't control how new processes are created, but I need to share some objects between them. QuestionsRefering to examples from @vincent-grosbois and @knwng above:
Revised exampleMy current understanding is that the last example above from @knwng could be rewritten as follows for the same effect. Am I missing something?
My SetupI have a similar issue as OP with pickle errors on managed objects. In my case, I'm running a local webserver with HTTPServer and ForkingMixin. Every connection forks a new process to handle the request. A single process server won't work because I'm serving several clients at once. Threading server won't work because of GIL (many requests are compute-intensive, threads would blow up response times). Plus threads are harder to get right, trampling on global state in libs and such. My server has a cache object for quick return of already computed responses. When new response is needed, it's slow to compute, so cache updates are expensive. Cache updates made by one forked handler should be added to cache for future requests in other handlers. This requires a shared cache object with a manager to handle updates from multiple processes. I have this setup working with a SyncManager in parent that creates the cache proxy. Forked handlers inherit the cache proxy object. All works fine for awhile. But eventually, there's a pickle error on cache access from one of the children. After that, manager stops responding and server needs restarted. I think it's the TLS connection issue knwg mentioned above. Children are reusing the same proxy obj without resetting the connection. If I'm right, then I just have to figure out how to reset TLS connection in inherited proxy object. Looks like proxy objects have a method _after_fork. That sounds promising. Sample codeHere's simplified code to illustrate my setup (nonstandard conventions, I know).
|
UpdateWell calling _after_fork isn't enough. It seems to help a bit, server lasts a little longer, but still eventually hits a pickle error and cache stops responding. I finally managed to fix my problem. Here's the takeaway lesson: CALL Here's how I got there, hope it helps someone else: Data CollectionI built a test client to make my server fail reproducibly. Test client makes process pool to send requests to the server, with random 0-50 ms delay before calls. Here's what I observed:
Taken together, the above suggests the issue may be a race condition triggering a deadlock. I dug into my code to investigate further. DeadlockThis is where things get strange. Let's start with my expectation. Based on pydocs, I expect that a multiprocessing manager is a single threaded, single process server. Or rather, it may use concurrency to handle multiple concurrent requests on a managed object, but actual operations on that object will be performed sequentially. Eg if two processes call dictproxy.update (other) at the same time, the manager will complete one update call on the underlying object before performing the second. The update calls may happen out of order, but only one update call will be done at a time. Based on my tests, it seems that is correct, but not the whole story. First I examined debug logs from my server. When it hangs, there are several calls to cache.locate () that have started but never finished. So the hang is either inside the cache proxy object function, or in the manager process that services the call. Next I turned on debug logs for the underlying cache object. By logging pid and thread id on every call, I confirmed that the manager object is single process, single threaded for accessing cache. Every log record on calls to cache object shows the same pid and thread id, regardless of test client pool size and forked server connections. This is the part I don't understand. My cache object runs fine, up to the point locate () has the return value. Then it just gets stuck somewhere during return. I'll illustrate with code:
No exception happens. The server just never sees return value. I reached 'return ret' in cache object inside manager, but never got the return value in calling code. I thought it might be a logging issue, but logs were flushed and still nothing. Return value is given to manager, but never sent back to server. It seems manager is stuck on something. I looked at the code for BaseProxy and SyncManager. Couldn't find an obvious place where they get stuck. Didn't want to send a long time tracing through every line though. I also looked at the proxy object to see how it connects to the manager. For a proxy object d, my system shows that At this point I suspect the problem is a data race condition. Two processes are reading or writing to the socket at the same time, trampling on each other's data. Somehow this reaches a state where the manager has the return value, but is unable to write it to the socket. So my program gets stuck waiting for the return value, which never appears to the caller. The FixTrying cache._after_fork () had little effect. Then I tried cache._connect () in each handler after fork. Wow did that fix it! Suddenly my server can respond to 50 requests from 5 concurrent clients almost instantly! Ok bit of an exaggeration but within a couple seconds. Takeway Lesson: CALL Footnotes[1] Not sure why _after_fork helps. Checking stdlib code for ForkingMixIn, it calls os._exit () when done handling a request. So cleanup on proxy objects and decrementing ref cnt shouldn't be triggered when forked handler exits. Not sure why |
Hello
I'm trying to understand if a SyncManager proxy object instances are supposed to be shareable accoss processes, and if so, how
Bug report
See #101320 (comment) below
What I'm doing is the following:
Is this a bug or something that should be possible to do in python ?
I assume that the objects in syncmanager can be accessed through several processes, is it true ?
If so, what is the proper way to do it ? (I think that just using the same reference of proxy object accross forked processes is not supposed to work)
Your environment
python 3.9
The text was updated successfully, but these errors were encountered: