Open
Description
Problem
The current get_or_create_schema
method has a race condition between the initial cache check and cache insertion:
// Check cache first (read lock released here)
if let Some((schema, schema_md5, field_info)) =
self.schema_cache.read().unwrap().get(&schema_id) {
return (/* cached result */);
}
// RACE CONDITION: Multiple threads can reach here simultaneously
// All will perform expensive schema creation operations
let schema = BondEncodedSchema::from_fields(/* expensive */);
let schema_md5 = md5::compute(schema_bytes).0; // expensive
{
let mut cache = self.schema_cache.write().unwrap();
cache.insert(schema_id, (schema.clone(), schema_md5, field_info.clone()));
}
Impact:
- Multiple threads with the same schema_id bypass the initial cache check
- All perform expensive BondEncodedSchema::from_fields operations redundantly
- Cache entries get overwritten with identical data
- Wasted CPU cycles in high-throughput scenarios
Solution:
Implement double-checked locking pattern:
fn get_or_create_schema(
&self,
schema_id: u64,
field_info: Vec<FieldDef>,
) -> (CentralSchemaEntry, Vec<FieldDef>) {
// First check (read lock)
if let Some((schema, schema_md5, field_info)) =
self.schema_cache.read().unwrap().get(&schema_id) {
return (/* return cached */);
}
// Expensive operations outside locks
let schema = BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", field_info.clone());
let schema_bytes = schema.as_bytes();
let schema_md5 = md5::compute(schema_bytes).0;
{
let mut cache = self.schema_cache.write().unwrap();
// Second check under write lock
if let Some((existing_schema, existing_md5, existing_field_info)) = cache.get(&schema_id) {
return (/* return existing */);
}
// Safe to insert
cache.insert(schema_id, (schema.clone(), schema_md5, field_info.clone()));
}
(/* return new schema */)
}
### Benefits
- Eliminates redundant schema creation under concurrent access
- Maintains thread safety without unnecessary performance overhead
- Standard pattern for this type of lazy initialization scenario