Skip to content

RFE445: add incremental object get#446

Merged
k82cn merged 5 commits into
mainfrom
flm_445
May 12, 2026
Merged

RFE445: add incremental object get#446
k82cn merged 5 commits into
mainfrom
flm_445

Conversation

@k82cn
Copy link
Copy Markdown
Contributor

@k82cn k82cn commented May 11, 2026

Summary

  • add version-aware object cache get responses for full, patch-only, and not-modified reads
  • update flamepy cache handling for full, patch-only, and cached materialized responses
  • add RFE445 design doc, Rust/Python unit tests, E2E coverage, and replay-buffer benchmark flags

Validation

  • cargo fmt --all -- --check
  • cargo build -p flame-object-cache
  • cargo clippy -p flame-object-cache --all-targets -- -D warnings
  • cargo test -p flame-object-cache --quiet
  • cd sdk/python && ./.venv/bin/python -m pytest tests/test_cache.py -q
  • cd sdk/python && ./.venv/bin/ruff check src tests
  • cd sdk/python && ./.venv/bin/ruff format --check src tests
  • cd sdk/python && uv build
  • cd e2e && uv run -n ruff check src tests
  • cd e2e && uv run -n ruff format --check src tests
  • cd e2e && uv build

Note

  • Selected E2E pytest execution was attempted locally but blocked by local Flame config: current-context is not set.

@k82cn k82cn marked this pull request as ready for review May 11, 2026 23:25
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements incremental object retrieval by patch version, optimizing network transfer for patch-heavy objects like replay buffers. Key changes include updating the Arrow Flight protocol to support versioned patches, modifying the Rust server to stream partial updates, and enhancing the Python SDK to cache patches and materialized views. Review feedback highlights several performance optimizations in the Rust server, such as avoiding unnecessary data cloning and using a RwLock for concurrent reads. Additionally, a critical issue was identified in the Python SDK where using id() for caching materialized views could lead to memory leaks and cache misses when using bound methods.

Comment thread object_cache/src/cache.rs Outdated
Comment on lines +951 to +957
fn object_patches_to_flight_data_vec(patches: &[Object]) -> Result<Vec<FlightData>, FlameError> {
let rows = patches
.iter()
.map(|delta| (ObjectResponseKind::Patch, delta))
.collect();
object_rows_to_flight_data_vec(rows)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This helper function can be optimized to avoid unnecessary cloning of Object instances (and their underlying Vec<u8> data) by working with references. This is particularly important for large objects and high-concurrency scenarios.

Suggested change
fn object_patches_to_flight_data_vec(patches: &[Object]) -> Result<Vec<FlightData>, FlameError> {
let rows = patches
.iter()
.map(|delta| (ObjectResponseKind::Patch, delta))
.collect();
object_rows_to_flight_data_vec(rows)
}
fn object_patches_to_flight_data_vec(patches: Vec<&Object>) -> Result<Vec<FlightData>, FlameError> {
let rows = patches
.into_iter()
.map(|delta| (ObjectResponseKind::Patch, delta))
.collect();
object_rows_to_flight_data_vec(rows)
}

Comment thread object_cache/src/cache.rs
Comment on lines +1117 to +1123
} else if client_version != 0 && object.version <= client_version {
let needed_patches: Vec<Object> = object
.deltas
.iter()
.filter(|delta| delta.version > client_version)
.cloned()
.collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Cloning all patches before checking contiguity and encoding is inefficient. Using references avoids deep-copying the patch data.

        } else if client_version != 0 && object.version <= client_version {
            let needed_patches: Vec<&Object> = object
                .deltas
                .iter()
                .filter(|delta| delta.version > client_version)
                .collect();

Comment thread object_cache/src/cache.rs Outdated
client_version,
server_version
);
let flight_data_vec = object_patches_to_flight_data_vec(&needed_patches)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Pass the vector of references directly to the optimized helper.

Suggested change
let flight_data_vec = object_patches_to_flight_data_vec(&needed_patches)?;
let flight_data_vec = object_patches_to_flight_data_vec(needed_patches)?;

Comment thread sdk/python/src/flamepy/core/cache.py Outdated


def _materialize_object(obj: Object, deserializer: Optional[Deserializer] = None) -> Any:
materialized_key = None if deserializer is None else id(deserializer)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using id(deserializer) as a cache key is problematic for bound methods (e.g., self._deserializer), as Python creates a new bound method object with a unique ID on every access. This results in a 0% cache hit rate and a memory leak as the materialized dictionary grows with every call. Using the deserializer object itself as the key is safe and efficient, as functions and methods are hashable and bound methods implement equality correctly.

Suggested change
materialized_key = None if deserializer is None else id(deserializer)
materialized_key = deserializer

Comment thread object_cache/src/cache.rs
Comment on lines +241 to +248
pub fn current_version(&self) -> u64 {
self.deltas
.iter()
.map(|delta| delta.version)
.max()
.unwrap_or(self.version)
.max(self.version)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The .max(self.version) call is redundant because unwrap_or(self.version) already ensures the result is at least self.version if the iterator is empty. If the iterator is not empty, the maximum delta version should naturally be greater than or equal to the base version.

    pub fn current_version(&self) -> u64 {
        self.deltas
            .iter()
            .map(|delta| delta.version)
            .max()
            .unwrap_or(self.version)
    }

Comment thread object_cache/src/cache.rs Outdated
.cache
.get_write_lock(&key_str)
.map_err(|e| Status::internal(format!("Lock error: {}", e)))?;
let _guard = write_lock.lock().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using an exclusive Mutex lock for do_get operations serializes all read requests for the same key, which can become a significant performance bottleneck. Since cache.get() is thread-safe and returns a cloned Object, the returned data is consistent. If synchronization against concurrent writes is required to avoid reading partial storage states, consider using a RwLock to allow multiple concurrent readers.

Comment thread sdk/python/src/flamepy/core/cache.py Outdated
version: int
data: Any
patches: List[Patch] = field(default_factory=list)
materialized: Dict[Optional[int], Any] = field(default_factory=dict)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for the materialized dictionary should reflect that it will store callables as keys to support stable caching of materialized views.

Suggested change
materialized: Dict[Optional[int], Any] = field(default_factory=dict)
materialized: Dict[Optional[Callable], Any] = field(default_factory=dict)

@k82cn k82cn merged commit 14a70c2 into main May 12, 2026
11 checks passed
@k82cn k82cn deleted the flm_445 branch May 12, 2026 02:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant