feat: ReplayBuffer example.#441
Conversation
Signed-off-by: Klaus Ma <klausm@nvidia.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a distributed reinforcement learning replay buffer example that leverages Flame's patch_object API for efficient data synchronization. It includes a collector service, a replay buffer implementation with custom deserialization, and updated gRPC configurations in both the Rust cache server and Python SDK to support larger message sizes. Review feedback identified potential security risks in setting unlimited message sizes, suggesting a 2GB cap instead to prevent resource exhaustion, and pointed out a mathematical inaccuracy in the average reward calculation within the example script.
| .add_service( | ||
| FlightServiceServer::new(server) | ||
| .max_decoding_message_size(usize::MAX) | ||
| .max_encoding_message_size(usize::MAX), | ||
| ) |
There was a problem hiding this comment.
Setting the maximum message size to usize::MAX effectively disables all size limits. While necessary for large reinforcement learning buffers, using an unlimited value can expose the server to Out-Of-Memory (OOM) issues or denial-of-service attacks if malformed or excessively large packets are received. It is safer to use a very large but finite limit (e.g., 2GB).
| .add_service( | |
| FlightServiceServer::new(server) | |
| .max_decoding_message_size(usize::MAX) | |
| .max_encoding_message_size(usize::MAX), | |
| ) | |
| .add_service( | |
| FlightServiceServer::new(server) | |
| .max_decoding_message_size(2 * 1024 * 1024 * 1024) | |
| .max_encoding_message_size(2 * 1024 * 1024 * 1024), | |
| ) |
| GRPC_OPTIONS = [ | ||
| ("grpc.max_send_message_length", -1), | ||
| ("grpc.max_receive_message_length", -1), | ||
| ] |
There was a problem hiding this comment.
Using -1 for gRPC message length options disables the limits entirely. Similar to the server-side change, it is recommended to use a large but bounded value to prevent potential stability issues or memory exhaustion.
| GRPC_OPTIONS = [ | |
| ("grpc.max_send_message_length", -1), | |
| ("grpc.max_receive_message_length", -1), | |
| ] | |
| GRPC_OPTIONS = [ | |
| ("grpc.max_send_message_length", 2 * 1024 * 1024 * 1024), | |
| ("grpc.max_receive_message_length", 2 * 1024 * 1024 * 1024), | |
| ] |
| stats = buffer_svc.state().get() | ||
| total_size = stats["size"] | ||
| total_added = stats["total_added"] | ||
| avg_reward = sum(r["avg_reward"] for r in collect_results) / num_collections |
There was a problem hiding this comment.
This line calculates an average of averages, which can be mathematically incorrect if the number of episodes completed by each collector varies. A more accurate approach is to calculate a weighted average based on the total episodes seen by each collector.
| avg_reward = sum(r["avg_reward"] for r in collect_results) / num_collections | |
| total_episodes = sum(r["episode_count"] for r in collect_results) | |
| avg_reward = sum(r["avg_reward"] * r["episode_count"] for r in collect_results) / max(1, total_episodes) |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
- Use weighted average for avg_reward calculation based on episode count - Add codecov.yml to disable patch coverage check while keeping project coverage
No description provided.