Skip to content

Commit

Permalink
Revert "feat(streaming): call may_exist when insert cache miss in joi…
Browse files Browse the repository at this point in the history
…n executor (#7957)"

This reverts commit 6d3dba6.
  • Loading branch information
lmatz committed Mar 13, 2023
1 parent 428354d commit 7f9e782
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 92 deletions.
6 changes: 1 addition & 5 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,11 +1362,7 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert {{actor_id}} {{side}}",
),
panels.target(
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
"may_exist true when insert {{actor_id}} {{side}}",
"cache miss when insert{{actor_id}} {{side}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

30 changes: 1 addition & 29 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,6 @@ where
Distribution::fallback(),
None,
false,
0,
)
.await
}

/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
/// tests.
pub async fn new_without_distribution_with_prefix_hint_len(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
prefix_hint_len: usize,
) -> Self {
Self::new_with_distribution_inner(
store,
table_id,
columns,
order_types,
pk_indices,
Distribution::fallback(),
None,
true,
prefix_hint_len,
)
.await
}
Expand All @@ -370,7 +345,6 @@ where
distribution,
value_indices,
true,
0,
)
.await
}
Expand All @@ -393,7 +367,6 @@ where
distribution,
value_indices,
false,
0,
)
.await
}
Expand All @@ -411,7 +384,6 @@ where
}: Distribution,
value_indices: Option<Vec<usize>>,
is_consistent_op: bool,
prefix_hint_len: usize,
) -> Self {
let local_state_store = store
.new_local(NewLocalOptions {
Expand Down Expand Up @@ -454,7 +426,7 @@ where
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len,
prefix_hint_len: 0,
vnodes,
table_option: Default::default(),
vnode_col_idx_in_pk: None,
Expand Down
18 changes: 5 additions & 13 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,20 +1027,18 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
prefix_hint_len: usize,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
let state_table = StateTable::new_without_distribution(
mem_state.clone(),
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
prefix_hint_len,
)
.await;

Expand Down Expand Up @@ -1091,9 +1089,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
let join_key_indices = vec![0];
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_l = JoinParams::new(vec![0], vec![1]);
let params_r = JoinParams::new(vec![0], vec![1]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1104,7 +1101,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1114,7 +1110,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
2,
join_key_indices.len(),
)
.await;

Expand Down Expand Up @@ -1160,9 +1155,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
let join_key_indices = vec![0, 1];
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
let params_l = JoinParams::new(vec![0, 1], vec![]);
let params_r = JoinParams::new(vec![0, 1], vec![]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1177,7 +1171,6 @@ mod tests {
],
&[0, 1, 0],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1191,7 +1184,6 @@ mod tests {
],
&[0, 1, 1],
0,
join_key_indices.len(),
)
.await;
let schema_len = match T {
Expand Down
34 changes: 3 additions & 31 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ pub struct JoinHashMapMetrics {
total_lookup_count: usize,
/// How many times have we miss the cache when insert row
insert_cache_miss_count: usize,
may_exist_true_count: usize,
}

impl JoinHashMapMetrics {
Expand All @@ -173,7 +172,6 @@ impl JoinHashMapMetrics {
lookup_miss_count: 0,
total_lookup_count: 0,
insert_cache_miss_count: 0,
may_exist_true_count: 0,
}
}

Expand All @@ -190,14 +188,9 @@ impl JoinHashMapMetrics {
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.insert_cache_miss_count as u64);
self.metrics
.join_may_exist_true_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.may_exist_true_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
self.insert_cache_miss_count = 0;
self.may_exist_true_count = 0;
}
}

Expand Down Expand Up @@ -432,22 +425,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, value.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand All @@ -459,6 +441,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

/// Insert a row.
/// Used when the side does not need to update degree.
#[allow(clippy::unused_async)]
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
let join_row = JoinRow::new(&value, 0);
let pk = (&value)
Expand All @@ -468,22 +451,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, join_row.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand Down
16 changes: 3 additions & 13 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct StreamingMetrics {
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -275,31 +274,23 @@ impl StreamingMetrics {

let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss count",
"Join executor lookup miss duration",
&["actor_id", "side"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total count",
"Join executor lookup total operation",
&["actor_id", "side"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Count of cache miss when insert rows in join executor",
&["actor_id", "side"],
registry
)
.unwrap();

let join_may_exist_true_count = register_int_counter_vec_with_registry!(
"stream_join_may_exist_true_count",
"Count of may_exist's true returns of when insert rows in join executor",
"Join executor cache miss when insert operation",
&["actor_id", "side"],
registry
)
Expand Down Expand Up @@ -486,7 +477,6 @@ impl StreamingMetrics {
join_lookup_miss_count,
join_total_lookup_count,
join_insert_cache_miss_count,
join_may_exist_true_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
Expand Down

0 comments on commit 7f9e782

Please sign in to comment.