Skip to content

Commit

Permalink
fix: set prefix_hint_len is join test
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Feb 23, 2023
1 parent b4177f9 commit b0bf45b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 9 deletions.
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,31 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
Distribution::fallback(),
None,
false,
0,
)
.await
}

/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
/// tests.
pub async fn new_without_distribution_with_prefix_hint_len(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
prefix_hint_len: usize,
) -> Self {
Self::new_with_distribution_inner(
store,
table_id,
columns,
order_types,
pk_indices,
Distribution::fallback(),
None,
true,
prefix_hint_len,
)
.await
}
Expand All @@ -331,6 +356,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
distribution,
value_indices,
true,
0,
)
.await
}
Expand All @@ -353,6 +379,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
distribution,
value_indices,
false,
0,
)
.await
}
Expand All @@ -370,6 +397,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
}: Distribution,
value_indices: Option<Vec<usize>>,
is_consistent_op: bool,
prefix_hint_len: usize,
) -> Self {
let local_state_store = store.new_local(table_id).await;

Expand All @@ -396,7 +424,7 @@ impl<S: StateStore, W: WatermarkBufferStrategy> StateTable<S, W> {
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len: 0,
prefix_hint_len,
vnodes,
table_option: Default::default(),
is_consistent_op,
Expand Down
18 changes: 13 additions & 5 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,18 +1026,20 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
prefix_hint_len: usize,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
let state_table = StateTable::new_without_distribution(
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
mem_state.clone(),
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
prefix_hint_len,
)
.await;

Expand Down Expand Up @@ -1088,8 +1090,9 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
let params_l = JoinParams::new(vec![0], vec![1]);
let params_r = JoinParams::new(vec![0], vec![1]);
let join_key_indices = vec![0];
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1100,6 +1103,7 @@ mod tests {
&[OrderType::Ascending, OrderType::Ascending],
&[0, 1],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1109,6 +1113,7 @@ mod tests {
&[OrderType::Ascending, OrderType::Ascending],
&[0, 1],
2,
join_key_indices.len(),
)
.await;

Expand Down Expand Up @@ -1154,8 +1159,9 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
let params_l = JoinParams::new(vec![0, 1], vec![]);
let params_r = JoinParams::new(vec![0, 1], vec![]);
let join_key_indices = vec![0, 1];
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1170,6 +1176,7 @@ mod tests {
],
&[0, 1, 0],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1183,6 +1190,7 @@ mod tests {
],
&[0, 1, 1],
0,
join_key_indices.len(),
)
.await;
let schema_len = match T {
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
Expand Down Expand Up @@ -468,7 +468,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exist in neither cache or storage.
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
Expand Down

0 comments on commit b0bf45b

Please sign in to comment.