Skip to content

Conversation

@LucasLLC
Copy link
Contributor

@LucasLLC LucasLLC commented Aug 29, 2025

Adds RDMA Support & generic transport interface.

Interface

Introduces "pipe", "message", & "transport buffer". Should be fairly generic enough for now for us to support shared memory.

Gotcha's (Read before using)

-- Using monarch nightly from 8.2. On latest monarch, we need D81395338 for rdma support. Disable via TORCHSTORE_RDMA_ENABLED to workaround.

On HF models, for some reason we're not too sure, certain tensors cause RDMA to fail unless chunk size = 1 mb. Still trying to understand what's going on here, but it causes us to lose a majority of perf.

Benchmarks

TestStore.test_large_tensors

image image

TestHFModel.test_basic (qwen 1 -> 1)

  • rdma_chunk_s=1 Put 21.79s, 21.49s
  • rdma_chunk_s=512 (broken :/)
  • rdma_enabled=False put: 40.52s, get: 44.731s

New Env Vars

  • Sets default value for HYPERACTOR_CODEC_MAX_FRAME_LENGTH, which prevents us from sending large messages via monarch RPC.
  • TORCHSTORE_RDMDA_CHUNK_SIZE_MB: chunking size for rdma buffers. Currently set to 1mb as a workaround for an issue in tensor_engine.RDMABuffer.
  • TORCHSTORE_RDMA_ENABLED: helpful for swapping back and forth

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Aug 29, 2025
@LucasLLC LucasLLC changed the title [WIP] Not ready for review!! RDMA Support + an interface for transportation methods Adds RDMA Support & generic transport interface Aug 31, 2025
@LucasLLC LucasLLC marked this pull request as ready for review August 31, 2025 21:56
Copy link
Contributor

@allenwang28 allenwang28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is awesome :)

# but setting this chunk size works around the issue until we can fix it
# N.B. from benchmarking, we know the ideal size is any size >256mb.
RDMDA_CHUNK_SIZE_MB= int(
os.environ.get("TORCHSTORE_RDMDA_CHUNK_SIZE_MB", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
os.environ.get("TORCHSTORE_RDMDA_CHUNK_SIZE_MB", "1")
os.environ.get("TORCHSTORE_RDMA_CHUNK_SIZE_MB", "1")

# we should consider turning this into a "PendingTensor" class,
# and having these functions defined there instead.
# should also totally simplify the logic here
# TODO: Utility fucntions may make more sense in a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: Utility fucntions may make more sense in a
# TODO: Utility functions may make more sense in a

Copy link
Contributor

@allenwang28 allenwang28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mostly had naming related nits, looking forward to the next PR!

message = Message.from_any(inplace_tensor)

return inplace_tensor
fetched_tensor = await pipe.get_from_storage_volume(key, message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this API be simplified to just a get(...)?

Copy link
Contributor

@casteryh casteryh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks incredible! Left some noob questions.


# recv
async def write_from(self, tensor):
self.tensor = tensor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract of write_from is that the caller is responsible to maintain the integrity of the passed in tensor until it's read (for example, not modified unless intended), correct? Hence storing a reference to tensor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract around monarch.tensor_engine.RDMABuffer is such, yes. There are also some other gotcha's which force us to create copies locally, with top of mind being no support for non-contiguous tensors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, as it relates to the monarch buffer, that is also the current state, yes. In the future when we provide mechanisms for asynchronous puts/gets, we'll have to think about this a little more

# else: we are in the remote case (in a different process), and must read from
# the rdma buffer
try:
for idx, chunk in enumerate(chunked_byte_view):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we gather instead of sequentially read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, I think we can follow up in a future diff, but will add a todo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I didn't do this from the start is that I recalled running into some issues with rdma buffer when running async in the past.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! that makes perfect sense. My initial thought was if we can get concurrent RDMA buffer read working then maybe performance for smaller chunk size (say 1MB) will be better. But I agree this can be added later assuming it can work.

"tensor": tensor
}

async def put(self, key: str, transport_buffer: torch.Tensor, message: Message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def put(self, key: str, transport_buffer: torch.Tensor, message: Message):
async def put(self, key: str, transport_buffer: TransportBuffer, message: Message):


self.kv[key] = tensor

async def get(self, key: str, transport_buffer: torch.Tensor, message: Message):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async def get(self, key: str, transport_buffer: torch.Tensor, message: Message):
async def get(self, key: str, transport_buffer: TransportBuffer, message: Message):

@LucasLLC LucasLLC merged commit 751f97b into main Sep 3, 2025
1 check passed
@LucasLLC LucasLLC deleted the luca/add_rdma_support branch September 3, 2025 17:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants