New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[lib] fix bug in handle_klio decorator with thread-limiting #209
Conversation
6e96bcb
to
e840fb1
Compare
|
||
return wrapper(*args, **wrapper_kwargs, **kwargs) | ||
result = wrapper(*args, **wrapper_kwargs, **kwargs) | ||
yield from result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you confirmed that this still works for when @handle_klio
decorates functions and non-generator methods? I know that whenever you put a yield
in a function, it makes it a generator - even if you never hit the yield
branch. I would assume this is the same case for yield from
. This is why the existing yield
'ing and yield from
'ing are in their own functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decorating functions is unaffected since the other wrapper handles that (also tested and it seems fine). For methods though, what other methods besides process
would we want to use this decorator on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't decorated functions be affected though, since this is _handle_klio
? and adding yield from
to _handle_klio
makes it a generator?
To answer your question, handle_klio
/_handle_klio
is meant to work with any method or function. It just makes the simplistic assumption that DoFn.process
is the only method that's a generator. It's meant to allow folks to have classes - if they so please - that are not necessarily transforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah okay sorry I see now - I just finally properly tested it - and you're right re: if it's just function. my mistake - so sorry.
But yeah this won't work if a class method is not a generator. As I said, someone could have a non-transform class that has a method decorated with handle_klio
. But also, I just remembered, folks can have composite transforms.
Some cases yield from
does not work:
class MyClass:
@decorators.handle_klio
def my_method(self, item):
print(f"MyClass.my_method got item: {item}")
return item
my_class = MyClass()
def my_map_func(item):
item = my_class.my_method(item)
return item
def run (input_pcol, config):
return input_pcol | beam.Map(my_map_func)
def my_map_func(item):
print(f"my_my_func got item: {item}")
return item
class MyComposite(beam.PTransform):
@decorators.handle_klio
def expand(self, pcoll):
return (
pcoll | beam.Map(my_map_func)
)
def run (input_pcol, config):
return input_pcol | MyComposite()
e840fb1
to
6e9dcc7
Compare
6e9dcc7
to
678458d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done 👏🏻 just one thought that you can ignore if you'd like!
@@ -49,6 +49,40 @@ | |||
) | |||
|
|||
|
|||
class ThreadLimitGenerator(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional thought: would tucking this class in with the ThreadLimiter
class et al be a better fit? I could see a reason for either here or there so your call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I debated that too. I ultimately put it in decorators.py
because at least for now that's all that uses it and I don't immediately foresee any other potential use. If we ever find another use or a need to generalize it for reusability then we'd probably want to move it.
This fixes a bug with the
ThreadLimiter
not properly wrapping user code inDoFn
process
methods.Basically the current behavior is the thread limiter will acquire and release the lock before execution enters the
process
method, resulting in effectively no actual thread limiting around user code. This happens because:handle_klio
decorator, the thread limiter is used as a context managerprocess
method always returns a generator. This is regardless of whether the user code usesreturn
oryield
.This behavior can be clearly seen when logging is turned on:
I tested this extensively on direct runner and Dataflow. Especially on Dataflow it is clear that before the fix, multiple message area processed concurrently even with
max_thread_count=1
, but after the fix only one message is processed at a time.Checklist for PR author(s)
[cli] Fixes bugs in 'klio job fake-cmd'
.docs/src
.docs/src/reference/<package>/changelog.rst
.