Skip to content

Commit

Permalink
feat: remove enum index migration (#1734)
Browse files Browse the repository at this point in the history
## What ❔

Removes all functionality and tests relating to enum index migration.

The only mention left is a reserved field in state keeper proto file.

## Why ❔

Not needed anymore and we keep introducing it to new components to keep
compatibility

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
itegulov committed Apr 19, 2024
1 parent 6611ee9 commit 13c0f52
Show file tree
Hide file tree
Showing 14 changed files with 13 additions and 288 deletions.
7 changes: 0 additions & 7 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,6 @@ pub(crate) struct OptionalENConfig {
// Other config settings
/// Port on which the Prometheus exporter server is listening.
pub prometheus_port: Option<u16>,
/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
#[serde(default = "OptionalENConfig::default_enum_index_migration_chunk_size")]
pub enum_index_migration_chunk_size: usize,
/// Capacity of the queue for asynchronous miniblock sealing. Once this many miniblocks are queued,
/// sealing will block until some of the miniblocks from the queue are processed.
/// 0 means that sealing is synchronous; this is mostly useful for performance comparison, testing etc.
Expand Down Expand Up @@ -439,10 +436,6 @@ impl OptionalENConfig {
10
}

const fn default_enum_index_migration_chunk_size() -> usize {
5000
}

const fn default_miniblock_seal_queue_capacity() -> usize {
10
}
Expand Down
7 changes: 2 additions & 5 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,8 @@ async fn build_state_keeper(
// We only need call traces on the external node if the `debug_` namespace is enabled.
let save_call_traces = config.optional.api_namespaces().contains(&Namespace::Debug);

let (storage_factory, task) = AsyncRocksdbCache::new(
connection_pool.clone(),
state_keeper_db_path,
config.optional.enum_index_migration_chunk_size,
);
let (storage_factory, task) =
AsyncRocksdbCache::new(connection_pool.clone(), state_keeper_db_path);
let mut stop_receiver_clone = stop_receiver.clone();
task_handles.push(tokio::task::spawn(async move {
let result = task.run(stop_receiver_clone.clone()).await;
Expand Down
8 changes: 0 additions & 8 deletions core/lib/config/src/configs/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,6 @@ pub struct StateKeeperConfig {
pub validation_computational_gas_limit: u32,
pub save_call_traces: bool,

/// Number of keys that is processed by enum_index migration in State Keeper each L1 batch.
pub enum_index_migration_chunk_size: Option<usize>,

/// The maximal number of circuits that a batch can support.
/// Note, that this number corresponds to the "base layer" circuits, i.e. it does not include
/// the recursion layers' circuits.
Expand Down Expand Up @@ -201,17 +198,12 @@ impl StateKeeperConfig {
fee_model_version: FeeModelVersion::V2,
validation_computational_gas_limit: 300000,
save_call_traces: true,
enum_index_migration_chunk_size: None,
max_circuits_per_batch: 24100,
bootloader_hash: None,
default_aa_hash: None,
l1_batch_commit_data_generator_mode: L1BatchCommitDataGeneratorMode::Rollup,
}
}

pub fn enum_index_migration_chunk_size(&self) -> usize {
self.enum_index_migration_chunk_size.unwrap_or(1_000)
}
}

#[derive(Debug, Deserialize, Clone, PartialEq)]
Expand Down
1 change: 0 additions & 1 deletion core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl Distribution<configs::chain::StateKeeperConfig> for EncodeDist {
fee_model_version: self.sample(rng),
validation_computational_gas_limit: self.sample(rng),
save_call_traces: self.sample(rng),
enum_index_migration_chunk_size: self.sample(rng),
max_circuits_per_batch: self.sample(rng),
// These values are not involved into files serialization skip them
fee_account_addr: None,
Expand Down
2 changes: 0 additions & 2 deletions core/lib/env_config/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ mod tests {
fee_model_version: FeeModelVersion::V2,
validation_computational_gas_limit: 10_000_000,
save_call_traces: false,
enum_index_migration_chunk_size: Some(2_000),
bootloader_hash: Some(hash(
"0x010007ede999d096c84553fb514d3d6ca76fbf39789dda76bfeda9f3ae06236e",
)),
Expand Down Expand Up @@ -134,7 +133,6 @@ mod tests {
CHAIN_STATE_KEEPER_FEE_MODEL_VERSION="V2"
CHAIN_STATE_KEEPER_VALIDATION_COMPUTATIONAL_GAS_LIMIT="10000000"
CHAIN_STATE_KEEPER_SAVE_CALL_TRACES="false"
CHAIN_STATE_KEEPER_ENUM_INDEX_MIGRATION_CHUNK_SIZE="2000"
CHAIN_STATE_KEEPER_BOOTLOADER_HASH=0x010007ede999d096c84553fb514d3d6ca76fbf39789dda76bfeda9f3ae06236e
CHAIN_STATE_KEEPER_DEFAULT_AA_HASH=0x0100055b041eb28aff6e3a6e0f37c31fd053fc9ef142683b05e5f0aee6934066
CHAIN_STATE_KEEPER_L1_BATCH_COMMIT_DATA_GENERATOR_MODE="{l1_batch_commit_data_generator_mode}"
Expand Down
9 changes: 0 additions & 9 deletions core/lib/protobuf_config/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ impl ProtoRepr for proto::StateKeeper {
validation_computational_gas_limit: *required(&self.validation_computational_gas_limit)
.context("validation_computational_gas_limit")?,
save_call_traces: *required(&self.save_call_traces).context("save_call_traces")?,
enum_index_migration_chunk_size: self
.enum_index_migration_chunk_size
.map(|x| x.try_into())
.transpose()
.context("enum_index_migration_chunk_size")?,
max_circuits_per_batch: required(&self.max_circuits_per_batch)
.and_then(|x| Ok((*x).try_into()?))
.context("max_circuits_per_batch")?,
Expand Down Expand Up @@ -119,10 +114,6 @@ impl ProtoRepr for proto::StateKeeper {
fee_model_version: Some(proto::FeeModelVersion::new(&this.fee_model_version).into()),
validation_computational_gas_limit: Some(this.validation_computational_gas_limit),
save_call_traces: Some(this.save_call_traces),
enum_index_migration_chunk_size: this
.enum_index_migration_chunk_size
.as_ref()
.map(|x| (*x).try_into().unwrap()),
max_circuits_per_batch: Some(this.max_circuits_per_batch.try_into().unwrap()),
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/protobuf_config/src/proto/config/chain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ message StateKeeper {
optional FeeModelVersion fee_model_version = 20; // required
optional uint32 validation_computational_gas_limit = 21; // required; wei?
optional bool save_call_traces = 22; // required
optional uint64 enum_index_migration_chunk_size = 26; // optional
optional uint64 max_circuits_per_batch = 27; // required
optional uint64 miniblock_max_payload_size = 28; // required
reserved 23; reserved "virtual_blocks_interval";
reserved 24; reserved "virtual_blocks_per_miniblock";
reserved 26; reserved "enum_index_migration_chunk_size";
}

message OperationsManager {
Expand Down
136 changes: 1 addition & 135 deletions core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
//! | Column | Key | Value | Description |
//! | ------------ | ------------------------------- | ------------------------------- | ----------------------------------------- |
//! | State | 'block_number' | serialized block number | Last processed L1 batch number (u32) |
//! | State | 'enum_index_migration_cursor' | serialized hashed key or empty | If key is not present it means that the migration hasn't started. |
//! | | | bytes | If value is of length 32 then it represents hashed_key migration should start from. |
//! | | | | If value is empty then it means the migration has finished |
//! | State | hashed `StorageKey` | 32 bytes value ++ 8 bytes index | State value for the given key |
//! | | | (big-endian) | |
//! | Contracts | address (20 bytes) | `Vec<u8>` | Contract contents |
Expand All @@ -32,8 +29,7 @@ use itertools::{Either, Itertools};
use tokio::sync::watch;
use zksync_dal::{Connection, Core, CoreDal, DalError};
use zksync_storage::{db::NamedColumnFamily, RocksDB};
use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256, U256};
use zksync_utils::{h256_to_u256, u256_to_h256};
use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256};

use self::metrics::METRICS;
#[cfg(test)]
Expand Down Expand Up @@ -131,7 +127,6 @@ impl From<anyhow::Error> for RocksdbSyncError {
pub struct RocksdbStorage {
db: RocksDB<StateKeeperColumnFamily>,
pending_patch: InMemoryStorage,
enum_index_migration_chunk_size: usize,
/// Test-only listeners to events produced by the storage.
#[cfg(test)]
listener: RocksdbStorageEventListener,
Expand All @@ -148,17 +143,11 @@ impl RocksdbStorageBuilder {
RocksdbStorageBuilder(RocksdbStorage {
db: value,
pending_patch: InMemoryStorage::default(),
enum_index_migration_chunk_size: 100,
#[cfg(test)]
listener: RocksdbStorageEventListener::default(),
})
}

/// Enables enum indices migration.
pub fn enable_enum_index_migration(&mut self, chunk_size: usize) {
self.0.enum_index_migration_chunk_size = chunk_size;
}

/// Returns the last processed l1 batch number + 1.
///
/// # Panics
Expand Down Expand Up @@ -207,17 +196,12 @@ impl RocksdbStorageBuilder {

impl RocksdbStorage {
const L1_BATCH_NUMBER_KEY: &'static [u8] = b"block_number";
const ENUM_INDEX_MIGRATION_CURSOR: &'static [u8] = b"enum_index_migration_cursor";

/// Desired size of log chunks loaded from Postgres during snapshot recovery.
/// This is intentionally not configurable because chunks must be the same for the entire recovery
/// (i.e., not changed after a node restart).
const DESIRED_LOG_CHUNK_SIZE: u64 = 200_000;

fn is_special_key(key: &[u8]) -> bool {
key == Self::L1_BATCH_NUMBER_KEY || key == Self::ENUM_INDEX_MIGRATION_CURSOR
}

/// Creates a new storage builder with the provided RocksDB `path`.
///
/// # Errors
Expand All @@ -234,7 +218,6 @@ impl RocksdbStorage {
Ok(Self {
db: RocksDB::new(&path).context("failed initializing state keeper RocksDB")?,
pending_patch: InMemoryStorage::default(),
enum_index_migration_chunk_size: 100,
#[cfg(test)]
listener: RocksdbStorageEventListener::default(),
})
Expand Down Expand Up @@ -313,14 +296,6 @@ impl RocksdbStorage {
"Secondary storage for L1 batch #{latest_l1_batch_number} initialized, size is {estimated_size}"
);

assert!(self.enum_index_migration_chunk_size > 0);
// Enum indices must be at the storage. Run migration till the end.
while self.enum_migration_start_from().await.is_some() {
if *stop_receiver.borrow() {
return Err(RocksdbSyncError::Interrupted);
}
self.save_missing_enum_indices(storage).await?;
}
Ok(())
}

Expand Down Expand Up @@ -366,96 +341,6 @@ impl RocksdbStorage {
Ok(())
}

async fn save_missing_enum_indices(
&self,
storage: &mut Connection<'_, Core>,
) -> anyhow::Result<()> {
let (true, Some(start_from)) = (
self.enum_index_migration_chunk_size > 0,
self.enum_migration_start_from().await,
) else {
return Ok(());
};

let started_at = Instant::now();
tracing::info!(
"RocksDB enum index migration is not finished, starting from key {start_from:0>64x}"
);

let db = self.db.clone();
let enum_index_migration_chunk_size = self.enum_index_migration_chunk_size;
let (keys, values): (Vec<_>, Vec<_>) = tokio::task::spawn_blocking(move || {
db.from_iterator_cf(StateKeeperColumnFamily::State, start_from.as_bytes())
.filter_map(|(key, value)| {
if Self::is_special_key(&key) {
return None;
}
let state_value = StateValue::deserialize(&value);
state_value
.enum_index
.is_none()
.then(|| (H256::from_slice(&key), state_value.value))
})
.take(enum_index_migration_chunk_size)
.unzip()
})
.await
.unwrap();

let enum_indices_and_batches = storage
.storage_logs_dal()
.get_l1_batches_and_indices_for_initial_writes(&keys)
.await
.context("failed getting enumeration indices for storage logs")?;
assert_eq!(keys.len(), enum_indices_and_batches.len());
let key_count = keys.len();

let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut write_batch = db.new_write_batch();
for (key, value) in keys.iter().zip(values) {
let index = enum_indices_and_batches[key].1;
write_batch.put_cf(
StateKeeperColumnFamily::State,
key.as_bytes(),
&StateValue::new(value, Some(index)).serialize(),
);
}

let next_key = keys
.last()
.and_then(|last_key| h256_to_u256(*last_key).checked_add(U256::one()))
.map(u256_to_h256);
match (next_key, keys.len()) {
(Some(next_key), keys_len) if keys_len == enum_index_migration_chunk_size => {
write_batch.put_cf(
StateKeeperColumnFamily::State,
Self::ENUM_INDEX_MIGRATION_CURSOR,
next_key.as_bytes(),
);
}
_ => {
write_batch.put_cf(
StateKeeperColumnFamily::State,
Self::ENUM_INDEX_MIGRATION_CURSOR,
&[],
);
tracing::info!("RocksDB enum index migration finished");
}
}
db.write(write_batch)
.context("failed saving enum indices to RocksDB")
})
.await
.context("panicked while saving enum indices to RocksDB")??;

tracing::info!(
"RocksDB enum index migration chunk took {:?}, migrated {key_count} keys",
started_at.elapsed()
);
Ok(())
}

fn read_value_inner(&self, key: &StorageKey) -> Option<StorageValue> {
Self::read_state_value(&self.db, key.hashed_key()).map(|state_value| state_value.value)
}
Expand Down Expand Up @@ -625,25 +510,6 @@ impl RocksdbStorage {
.estimated_number_of_entries(StateKeeperColumnFamily::State)
}

async fn enum_migration_start_from(&self) -> Option<H256> {
let db = self.db.clone();
let value = tokio::task::spawn_blocking(move || {
db.get_cf(
StateKeeperColumnFamily::State,
Self::ENUM_INDEX_MIGRATION_CURSOR,
)
.expect("failed to read `ENUM_INDEX_MIGRATION_CURSOR`")
})
.await
.unwrap();

match value {
Some(value) if value.is_empty() => None,
Some(cursor) => Some(H256::from_slice(&cursor)),
None => Some(H256::zero()),
}
}

/// Converts self into the underlying RocksDB primitive
pub fn into_rocksdb(self) -> RocksDB<StateKeeperColumnFamily> {
self.db
Expand Down
Loading

0 comments on commit 13c0f52

Please sign in to comment.