Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to develop an instrumentation for SimpleQueue (for threading/multiprocessing) #3105

Closed
kedare opened this issue Dec 30, 2022 · 4 comments

Comments

@kedare
Copy link

kedare commented Dec 30, 2022

Hello everyone.

As we have a lot of multi-threading and multiprocessing, I am trying to find a way to cover all/most of those cases, I took a look at the existing instrumentations (like the Redis one), and I am basically trying to wrap the get/set methods from the SimpleQueue class that is used everywhere for data exchange between the threads and processes (in the python multiprocessing module), but so far no luck,

It looks some code is not using the overrided versions of my methods.
I am not very familiar with this kind of monkeypatching, maybe what I am trying to do is not doable (restrictions on stlib or so ?)

This is what I have been trying to do (with temporary print statements)

from dataclasses import dataclass
from opentelemetry import context as otel_context
from wrapt import wrap_function_wrapper

@dataclass
class InstrumentedQueueItem:
    otel_context: otel_context.Context
    item: object

def instrument_simple_queue():
    """
    This function patches the SimpleQueue class used to communicate between processes.
    It allows opentelemetry context to be passed between the processes to allow traces to be continuous between
    the different processes.
    SimpleQueue is used internally by the multiprocessing library
    """

    def instrument_put(func, instance, args, kwargs):
        print(f"PUT ORIG {len(args)} {args} {kwargs}")
        if len(args) == 1:
            print("Patching ARGS")
            args = list(args)
            args[0] = InstrumentedQueueItem(otel_context=otel_context.get_current(), item=args[0])
            args = tuple(args)
        elif "item" in kwargs:
            print("Patching KWARGS")
            kwargs["item"] = InstrumentedQueueItem(otel_context=otel_context.get_current(), item=kwargs["item"])
        print(f"PUT PATCHED {args} {kwargs}")
        return func(*args, **kwargs)

    def instrument_get(func, instance, args, kwargs):
        wrapped_item = func(*args, **kwargs)
        print(f"GET {wrapped_item}")
        if type(wrapped_item) is InstrumentedQueueItem:
            otel_context.attach(wrapped_item.otel_context)
            return wrapped_item.item
        else:
            log.warning(f"Got an uninstrumented item from patched SimpleQueue: {wrapped_item}")
            return wrapped_item

    def instrument_handle_results(func, instance, args, kwargs):
        args = list(args)
        old_get = args[1]

        def result_parser():
            wrapped_item = old_get()
            print(f"RESULT {wrapped_item}")
            if type(wrapped_item) is InstrumentedQueueItem:
                otel_context.attach(wrapped_item.otel_context)
                wrapped_item = wrapped_item.item
            return wrapped_item

        args[1] = result_parser

        return func(*tuple(args), **kwargs)

    wrap_function_wrapper('multiprocessing.queues', 'SimpleQueue.get', instrument_get)
    wrap_function_wrapper('multiprocessing.queues', 'SimpleQueue.put', instrument_put)
    wrap_function_wrapper('multiprocessing.pool', 'Pool._handle_results', instrument_handle_results)

And a simple test

from multiprocessing import Pool
import requests

init_opentelemetry()

def f(x):
    return requests.get(x)

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, ["https://webhook.site/a256ce57-1b3f-4e15-b39c-7a564c001b9f"]))

The kind of error I am getting (a get before a put, and the put is apparently not instrumented, apparently it's running before my patching but how?)

GET (0, 0, <function mapstar at 0x7f6464fcde50>, ((<function f at 0x7f6464fcd700>, ('https://webhook.site/a256ce57-1b3f-4e15-b39c-7a564c001b9f',)),), {})
Got an uninstrumented item from patched SimpleQueue: (0, 0, <function mapstar at 0x7f6464fcde50>, ((<function f at 0x7f6464fcd700>, ('https://webhook.site/a256ce57-1b3f-4e15-b39c-7a564c001b9f',)),), {})
PUT ORIG 1 ((0, 0, (True, [<Response [200]>])),) {}
Patching ARGS
PUT PATCHED (InstrumentedQueueItem(otel_context={}, item=(0, 0, (True, [<Response [200]>]))),) {}
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 590, in _handle_results
    job, i, obj = task
TypeError: cannot unpack non-iterable InstrumentedQueueItem object

Thank you

@kedare
Copy link
Author

kedare commented Dec 30, 2022

I got some progress fixing the first error, some part of the Pool are not using the SimpleQueue.get but have a stange design where they pass directly a reader (https://github.com/python/cpython/blob/main/Lib/multiprocessing/pool.py#L574 the get parameter), I patched it and now this part is working fine on the small test script. (script update on the issue)
However on larger code with more intensive usage of multiprocessing, I get a lot of errors like this, apparently something is passing locks, but I don't understand why it's working fine before this change (without the tracing typically), I don't add any lock on my code (and it's not the existing code either or it would not work even before my instrumentation?)

PUT PATCHED (InstrumentedQueueItem(otel_context={'current-span-602ca248-6834-4594-83be-41dcfa9bd94a': _Span(name="Generating template", context=SpanContext(trace_id=0x5df8e443e5f3c48f37f94be578106968, span_id=0x82389d79279f7e30, trace_flags=0x01, trace_state=[], is_remote=False))}, item=(18, 0, (False, <MaybeEncodingError: Error sending result: '['/tmp/tmp3kgb1ec5/tiled_hrdp_0.png']'. Reason: 'TypeError("cannot pickle '_thread.lock' object")'>))),) {}
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 131, in worker
    put((job, i, result))
  File "/app/xxx/common/opentelemetry.py", line 98, in instrument_put
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 362, in put
    obj = _ForkingPickler.dumps(obj)
  File "/usr/local/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/pool.py", line 136, in worker
    put((job, i, (False, wrapped)))
  File "/app/xxx/common/opentelemetry.py", line 98, in instrument_put
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 362, in put
    obj = _ForkingPickler.dumps(obj)
  File "/usr/local/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object

@ocelotl
Copy link
Contributor

ocelotl commented May 24, 2023

Hello @kedare, sorry for the late reply. Did you make any more progress on this issue?

@kedare
Copy link
Author

kedare commented May 24, 2023

Hello.

No problem, I could not go further unfortunately, I did not find a way to make it work with the special case I found on the previous message, sometime the multiprocessing code goes directly into reading the UNIX FIFO associated with the queue and so bypassing completely the code I override.

I could not find any info on why this has been done, there isn't much documentation about the internals of multiprocesssing and the code quality is unfortunately very low on this part.

I did not find any clean way to override this behavior to add instrumentation other than having to fork the whole code.

@ocelotl
Copy link
Contributor

ocelotl commented Jun 13, 2023

Thanks for you reply, @kedare, sad to read that you could not make any more progress. I'll be closing this issue for the time being, please reopen if you make new progress ✌️

@ocelotl ocelotl closed this as completed Jun 13, 2023
@ocelotl ocelotl closed this as not planned Won't fix, can't repro, duplicate, stale Jun 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants