Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): call may_exist when insert cache miss in join executor #7957

Merged
merged 4 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,11 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert{{actor_id}} {{side}}",
"cache miss when insert {{actor_id}} {{side}}",
),
panels.target(
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
"may_exist true when insert {{actor_id}} {{side}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,31 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
Distribution::fallback(),
None,
false,
0,
)
.await
}

/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
/// tests.
pub async fn new_without_distribution_with_prefix_hint_len(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
prefix_hint_len: usize,
) -> Self {
Self::new_with_distribution_inner(
store,
table_id,
columns,
order_types,
pk_indices,
Distribution::fallback(),
None,
true,
prefix_hint_len,
)
.await
}
Expand All @@ -331,6 +356,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
distribution,
value_indices,
true,
0,
)
.await
}
Expand All @@ -353,6 +379,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
distribution,
value_indices,
false,
0,
)
.await
}
Expand All @@ -370,6 +397,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
}: Distribution,
value_indices: Option<Vec<usize>>,
is_consistent_op: bool,
prefix_hint_len: usize,
) -> Self {
let local_state_store = store.new_local(table_id).await;

Expand All @@ -396,7 +424,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len: 0,
prefix_hint_len,
vnodes,
table_option: Default::default(),
is_consistent_op,
Expand Down
18 changes: 13 additions & 5 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,18 +1026,20 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
prefix_hint_len: usize,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
let state_table = StateTable::new_without_distribution(
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
mem_state.clone(),
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
prefix_hint_len,
)
.await;

Expand Down Expand Up @@ -1088,8 +1090,9 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
let params_l = JoinParams::new(vec![0], vec![1]);
let params_r = JoinParams::new(vec![0], vec![1]);
let join_key_indices = vec![0];
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1100,6 +1103,7 @@ mod tests {
&[OrderType::Ascending, OrderType::Ascending],
&[0, 1],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1109,6 +1113,7 @@ mod tests {
&[OrderType::Ascending, OrderType::Ascending],
&[0, 1],
2,
join_key_indices.len(),
)
.await;

Expand Down Expand Up @@ -1154,8 +1159,9 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
let params_l = JoinParams::new(vec![0, 1], vec![]);
let params_r = JoinParams::new(vec![0, 1], vec![]);
let join_key_indices = vec![0, 1];
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1170,6 +1176,7 @@ mod tests {
],
&[0, 1, 0],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1183,6 +1190,7 @@ mod tests {
],
&[0, 1, 1],
0,
join_key_indices.len(),
)
.await;
let schema_len = match T {
Expand Down
34 changes: 31 additions & 3 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub struct JoinHashMapMetrics {
total_lookup_count: usize,
/// How many times have we miss the cache when insert row
insert_cache_miss_count: usize,
may_exist_true_count: usize,
}

impl JoinHashMapMetrics {
Expand All @@ -171,6 +172,7 @@ impl JoinHashMapMetrics {
lookup_miss_count: 0,
total_lookup_count: 0,
insert_cache_miss_count: 0,
may_exist_true_count: 0,
}
}

Expand All @@ -187,9 +189,14 @@ impl JoinHashMapMetrics {
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.insert_cache_miss_count as u64);
self.metrics
.join_may_exist_true_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.may_exist_true_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
self.insert_cache_miss_count = 0;
self.may_exist_true_count = 0;
}
}

Expand Down Expand Up @@ -417,11 +424,22 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, value.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key contains primary key.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand All @@ -433,7 +451,6 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

/// Insert a row.
/// Used when the side does not need to update degree.
#[allow(clippy::unused_async)]
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
let join_row = JoinRow::new(&value, 0);
let pk = (&value)
Expand All @@ -443,11 +460,22 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, join_row.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key contains primary key.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand Down
16 changes: 13 additions & 3 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct StreamingMetrics {
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -266,23 +267,31 @@ impl StreamingMetrics {

let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss duration",
"Join executor lookup miss count",
&["actor_id", "side"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total operation",
"Join executor lookup total count",
&["actor_id", "side"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Join executor cache miss when insert operation",
"Count of cache miss when insert rows in join executor",
&["actor_id", "side"],
registry
)
.unwrap();

let join_may_exist_true_count = register_int_counter_vec_with_registry!(
"stream_join_may_exist_true_count",
"Count of may_exist's true returns of when insert rows in join executor",
&["actor_id", "side"],
registry
)
Expand Down Expand Up @@ -468,6 +477,7 @@ impl StreamingMetrics {
join_lookup_miss_count,
join_total_lookup_count,
join_insert_cache_miss_count,
join_may_exist_true_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
Expand Down