Skip to content

Enable taking Python Streams from Context#596

Merged
rapids-bot[bot] merged 10 commits into
rapidsai:mainfrom
nirandaperera:stream-from-python-context
Oct 23, 2025
Merged

Enable taking Python Streams from Context#596
rapids-bot[bot] merged 10 commits into
rapidsai:mainfrom
nirandaperera:stream-from-python-context

Conversation

@nirandaperera
Copy link
Copy Markdown
Contributor

This PR enables taking python Streams from the stream pool in the buffer resource obj.
This achieved by calling Stream._from_cudaStream_t(stream_view.value(), self). Context python obj is passed as the owner of the Stream so that it outlives the returned Stream obj.

Fixes #592

Signed-off-by: niranda perera <niranda.perera@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
@nirandaperera nirandaperera requested a review from a team as a code owner October 21, 2025 21:41
@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Oct 21, 2025
Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nirandaperera. Seems reasonable enough, and it should be sufficient for cudf-polars.

I don't think cudf-polars has a preference for whether this lives on Context or BufferResource, since we'll only access it via a Context. Though I don't know enough Cython / C++ to say whether it being on BufferResource would make getting the stream_pool in the cython method any easier.

Comment on lines +94 to +96
# passing self as the owner of the stream so that it is kept alive for
# the lifetime of the Stream object
return Stream._from_cudaStream_t(stream_view.value(), self)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the lifetime should be self.br rather than self. My understanding is that the stream pool is attached to the (C++) BufferResource, not the context.

But the Context being alive implies that the context.br is alive, so it ends up not mattering, maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. We accept a BufferResource to the Context. So, its best to pass br here.

Comment thread python/rapidsmpf/rapidsmpf/tests/streaming/test_cuda_streams.py Outdated
nirandaperera and others added 5 commits October 21, 2025 15:07
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Co-authored-by: Tom Augspurger <tom.augspurger88@gmail.com>
Signed-off-by: niranda perera <niranda.perera@gmail.com>
Comment thread python/rapidsmpf/rapidsmpf/streaming/core/context.pyx Outdated
Comment thread python/rapidsmpf/rapidsmpf/streaming/core/context.pyx Outdated
Comment thread python/rapidsmpf/rapidsmpf/tests/streaming/test_cuda_streams.py Outdated
Signed-off-by: niranda perera <niranda.perera@gmail.com>
@nirandaperera nirandaperera requested a review from wence- October 22, 2025 16:57
Comment thread python/rapidsmpf/rapidsmpf/buffer/resource.pxd
Copy link
Copy Markdown
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nirandaperera. I think things are good from the cudf-polars side. When the user configures the query to run with cudf-polars, we'll have some sort of bootstrapping to do (either explicitly or implicitly) to build the context:

import rapidsmpf.buffer.resource
import rapidsmpf.streaming.core.context
import rapidsmpf.communicator.single
import rapidsmpf.config
import rmm.mr

opts = rapidsmpf.config.Options()
comm = rapidsmpf.communicator.single.new_communicator(opts)
br = rapidsmpf.buffer.resource.BufferResource(rmm.mr.get_current_device_resource())
ctx = rapidsmpf.streaming.core.context.Context(comm, br, opts)

And then we'll get streams from the pool as needed (source IR nodes):

In [22]: stream = ctx.get_stream_from_pool()

In [23]: stream
Out[23]: <rmm.pylibrmm.stream.Stream at 0x7f3b024fdac0>

@nirandaperera
Copy link
Copy Markdown
Contributor Author

I have addressed @wence- 's comments. So merging this

@nirandaperera
Copy link
Copy Markdown
Contributor Author

/merge

@rapids-bot rapids-bot Bot merged commit b08c669 into rapidsai:main Oct 23, 2025
168 of 170 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add a Python API for BufferResource::stream_pool

3 participants