Skip to content

Commit

Permalink
perf(db): Add "contains" clause for get_logs (#1384)
Browse files Browse the repository at this point in the history
## What ❔

Add "contains" clause for get_logs and gin index that should be used by
this clause.

## Why ❔

Improve query performance

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
perekopskiy committed Mar 8, 2024
1 parent 58576d1 commit e62ae32
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS events_gin_index;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE INDEX IF NOT EXISTS events_gin_index on events using gin ((array [
address,
'\x01'::bytea || topic1,
'\x02'::bytea || topic2,
'\x03'::bytea || topic3,
'\x04'::bytea || topic4
]));
228 changes: 147 additions & 81 deletions core/lib/dal/src/events_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl EventsWeb3Dal<'_, '_> {
offset: usize,
) -> Result<Option<MiniblockNumber>, SqlxError> {
{
let (where_sql, arg_index) = self.build_get_logs_where_clause(filter);
let (where_sql, arg_index) = Self::build_get_logs_where_clause(filter);

let query = format!(
r#"
Expand Down Expand Up @@ -74,7 +74,7 @@ impl EventsWeb3Dal<'_, '_> {
limit: usize,
) -> Result<Vec<Log>, SqlxError> {
{
let (where_sql, arg_index) = self.build_get_logs_where_clause(&filter);
let (where_sql, arg_index) = Self::build_get_logs_where_clause(&filter);

let query = format!(
r#"
Expand Down Expand Up @@ -124,7 +124,7 @@ impl EventsWeb3Dal<'_, '_> {
}
}

fn build_get_logs_where_clause(&self, filter: &GetLogsFilter) -> (String, u8) {
fn build_get_logs_where_clause(filter: &GetLogsFilter) -> (String, u8) {
let mut arg_index = 1;

let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0 as i64);
Expand All @@ -133,15 +133,15 @@ impl EventsWeb3Dal<'_, '_> {

// Add filters for address (like `address = ANY($1)` or `address = $1`)
if let Some(filter_sql) =
Self::build_sql_filter(filter.addresses.len() as u32, "address", arg_index)
EventsWeb3Dal::build_sql_filter(filter.addresses.len() as u32, "address", arg_index)
{
where_sql += &filter_sql;
arg_index += 1;
}

// Add filters for topics (like `topic0 = ANY($2)`)
// Add filters for topics (like `topic1 = ANY($2)`)
for (topic_index, topics) in filter.topics.iter() {
if let Some(filter_sql) = Self::build_sql_filter(
if let Some(filter_sql) = EventsWeb3Dal::build_sql_filter(
topics.len() as u32,
&format!("topic{}", topic_index),
arg_index,
Expand All @@ -151,6 +151,34 @@ impl EventsWeb3Dal<'_, '_> {
}
}

let address_topics_variants = std::iter::once(filter.addresses.len())
.chain(filter.topics.iter().map(|(_, topics)| topics.len()));
let no_multiple_variants = address_topics_variants.clone().all(|x| x <= 1);
let at_least_one_is_not_empty = address_topics_variants.into_iter().any(|x| x > 0);

// Add "contains" condition. It only makes sense if no multiple variants exist for either address or any topic.
// It also doesn't make sense for queries without filtering by address or topics.
if no_multiple_variants && at_least_one_is_not_empty {
// Condition uses `@>` operator. It uses GIN index and can speed up query execution in some cases.
let mut contains_sql = " AND (array [address, '\\x01'::bytea || topic1, '\\x02'::bytea || topic2, '\\x03'::bytea || topic3, '\\x04'::bytea || topic4] @> array [".to_string();
let mut array_to_contain = Vec::new();
if let Some(address) = filter.addresses.first() {
array_to_contain.push(format!("'\\x{}'::bytea", hex::encode(address.as_bytes())));
}
for (topic_index, topics) in filter.topics.iter() {
if let Some(topic) = topics.first() {
array_to_contain.push(format!(
"'\\x0{topic_index}{}'::bytea",
hex::encode(topic.as_bytes())
));
}
}
contains_sql += array_to_contain.join(", ").as_str();
contains_sql += "])";

where_sql += contains_sql.as_str();
}

(where_sql, arg_index)
}

Expand Down Expand Up @@ -260,82 +288,120 @@ mod tests {
use zksync_types::{Address, H256};

use super::*;
use crate::connection::ConnectionPool;

#[tokio::test]
async fn test_build_get_logs_where_clause() {
let connection_pool = ConnectionPool::test_pool().await;
let storage = &mut connection_pool.access_storage().await.unwrap();
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(100),
to_block: MiniblockNumber(200),
addresses: vec![Address::from_low_u64_be(123)],
topics: vec![(0, vec![H256::from_low_u64_be(456)])],
};

let expected_sql = "(miniblock_number >= 100) AND (miniblock_number <= 200) AND (address = $1) AND (topic0 = $2)";
let expected_arg_index = 3;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
}

#[tokio::test]
async fn test_build_get_logs_with_multiple_topics_where_clause() {
let connection_pool = ConnectionPool::test_pool().await;
let storage = &mut connection_pool.access_storage().await.unwrap();
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![
Address::from_low_u64_be(123),
Address::from_low_u64_be(1233),
],
topics: vec![
(
0,
vec![
H256::from_low_u64_be(456),
H256::from_low_u64_be(789),
H256::from_low_u64_be(101),
#[test]
fn test_build_get_logs_where_clause() {
let cases = [
(
GetLogsFilter {
from_block: MiniblockNumber(100),
to_block: MiniblockNumber(200),
addresses: vec![Address::from_low_u64_be(123)],
topics: vec![(1, vec![H256::from_low_u64_be(456)])],
},
"(miniblock_number >= 100) AND (miniblock_number <= 200) AND (address = $1) AND (topic1 = $2)",
true,
3,
"single_address_and_topic"
),
(
GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![
Address::from_low_u64_be(123),
Address::from_low_u64_be(1233),
],
),
(2, vec![H256::from_low_u64_be(789)]),
],
};

let expected_sql = "(miniblock_number >= 10) AND (miniblock_number <= 400) AND (address = ANY($1)) AND (topic0 = ANY($2)) AND (topic2 = $3)";
let expected_arg_index = 4;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
}

#[tokio::test]
async fn test_build_get_logs_with_no_address_where_clause() {
let connection_pool = ConnectionPool::test_pool().await;
let storage = &mut connection_pool.access_storage().await.unwrap();
let events_web3_dal = EventsWeb3Dal { storage };
let filter = GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![],
topics: vec![(2, vec![H256::from_low_u64_be(789)])],
};

let expected_sql =
"(miniblock_number >= 10) AND (miniblock_number <= 400) AND (topic2 = $1)";
let expected_arg_index = 2;

let (actual_sql, actual_arg_index) = events_web3_dal.build_get_logs_where_clause(&filter);

assert_eq!(actual_sql, expected_sql);
assert_eq!(actual_arg_index, expected_arg_index);
topics: vec![
(
1,
vec![
H256::from_low_u64_be(456),
H256::from_low_u64_be(789),
H256::from_low_u64_be(101),
],
),
(2, vec![H256::from_low_u64_be(789)]),
],
},
"(miniblock_number >= 10) AND (miniblock_number <= 400) AND (address = ANY($1)) AND (topic1 = ANY($2)) AND (topic2 = $3)",
false,
4,
"multiple_addresses_and_topics"
),
(
GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![],
topics: vec![(2, vec![H256::from_low_u64_be(789)])],
},
"(miniblock_number >= 10) AND (miniblock_number <= 400) AND (topic2 = $1)",
true,
2,
"single_topic"
),
(
GetLogsFilter {
from_block: MiniblockNumber(10),
to_block: MiniblockNumber(400),
addresses: vec![],
topics: vec![],
},
"(miniblock_number >= 10) AND (miniblock_number <= 400)",
false,
1,
"no_addresses_and_topics"
)
];

for (
filter,
expected_sql_without_contains,
include_contains,
expected_arg_index,
case_name,
) in cases
{
let (actual_sql, actual_arg_index) =
EventsWeb3Dal::build_get_logs_where_clause(&filter);
let expected_sql = if !include_contains {
expected_sql_without_contains.to_string()
} else {
let start = " AND (array [address, '\\x01'::bytea || topic1, '\\x02'::bytea || topic2, '\\x03'::bytea || topic3, '\\x04'::bytea || topic4] @> array [";
let elems: Vec<_> = filter
.addresses
.iter()
.map(|x| x.0.to_vec())
.chain(filter.topics.iter().flat_map(|(i, topics)| {
let topics: Vec<_> = topics
.iter()
.map(|topic| {
let mut vec = topic.0.to_vec();
vec.insert(0, *i as u8);
vec
})
.collect();
topics
}))
.map(|vec| format!("'\\x{}'::bytea", hex::encode(vec)))
.collect();
let elems = elems.join(", ");
let end = "])";

format!("{expected_sql_without_contains}{start}{elems}{end}")
};

assert_eq!(
actual_sql, expected_sql,
"SQL is not expected for {}",
case_name
);
assert_eq!(
actual_arg_index, expected_arg_index,
"arg index is not expected for {}",
case_name
);
}
}
}

0 comments on commit e62ae32

Please sign in to comment.