Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 75 additions & 17 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slog::{debug, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::iter::RootsIterator;
use store::{Error, ItemStore, StoreItem, StoreOp};
Expand All @@ -18,6 +18,13 @@ use types::{
SignedBeaconBlockHash, Slot,
};

/// Compact at least this frequently, finalization permitting (7 days).
const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
/// Compact at *most* this frequently, to prevent over-compaction during sync (2 hours).
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;

/// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
Expand Down Expand Up @@ -49,7 +56,10 @@ impl MigratorConfig {
/// Pruning can be successful, or in rare cases deferred to a later point.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruningOutcome {
Successful,
/// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`.
Successful {
old_finalized_checkpoint: Checkpoint,
},
DeferredConcurrentMutation,
}

Expand Down Expand Up @@ -159,7 +169,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;

match Self::prune_abandoned_forks(
let old_finalized_checkpoint = match Self::prune_abandoned_forks(
db.clone(),
notif.head_tracker,
finalized_state_root,
Expand All @@ -168,7 +178,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif.genesis_block_root,
log,
) {
Ok(PruningOutcome::Successful) => {}
Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
}) => old_finalized_checkpoint,
Ok(PruningOutcome::DeferredConcurrentMutation) => {
warn!(
log,
Expand Down Expand Up @@ -203,15 +215,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
};

// Finally, compact the database so that new free space is properly reclaimed.
debug!(log, "Starting database compaction");
if let Err(e) = db.compact() {
error!(
log,
"Database compaction failed";
"error" => format!("{:?}", e)
);
if let Err(e) = Self::run_compaction(
db,
old_finalized_checkpoint.epoch,
notif.finalized_checkpoint.epoch,
log,
) {
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
}
debug!(log, "Database compaction complete");
}

/// Spawn a new child thread to run the migration process.
Expand Down Expand Up @@ -272,7 +283,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into());
}

info!(
debug!(
log,
"Starting database pruning";
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
Expand Down Expand Up @@ -469,8 +480,55 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));

store.hot_db.do_atomically(kv_batch)?;
info!(log, "Database pruning complete");
debug!(log, "Database pruning complete");

Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
})
}

Ok(PruningOutcome::Successful)
/// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it
/// was last compacted.
pub fn run_compaction(
db: Arc<HotColdDB<E, Hot, Cold>>,
old_finalized_epoch: Epoch,
new_finalized_epoch: Epoch,
log: &Logger,
) -> Result<(), Error> {
if !db.compact_on_prune() {
return Ok(());
}

let last_compaction_timestamp = db
.load_compaction_timestamp()?
.unwrap_or_else(|| Duration::from_secs(0));
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(last_compaction_timestamp);
let seconds_since_last_compaction = start_time
.checked_sub(last_compaction_timestamp)
.as_ref()
.map_or(0, Duration::as_secs);

if seconds_since_last_compaction > MAX_COMPACTION_PERIOD_SECONDS
|| (new_finalized_epoch - old_finalized_epoch > COMPACTION_FINALITY_DISTANCE
&& seconds_since_last_compaction > MIN_COMPACTION_PERIOD_SECONDS)
{
info!(
log,
"Starting database compaction";
"old_finalized_epoch" => old_finalized_epoch,
"new_finalized_epoch" => new_finalized_epoch,
);
db.compact()?;

let finish_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(start_time);
db.store_compaction_timestamp(finish_time)?;

info!(log, "Database compaction complete");
}
Ok(())
}
}
15 changes: 14 additions & 1 deletion beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,26 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
)

/*
* Purge.
* Database purging and compaction.
*/
.arg(
Arg::with_name("purge-db")
.long("purge-db")
.help("If present, the chain database will be deleted. Use with caution.")
)
.arg(
Arg::with_name("compact-db")
.long("compact-db")
.help("If present, apply compaction to the database on start-up. Use with caution. \
It is generally not recommended unless auto-compaction is disabled.")
)
.arg(
Arg::with_name("auto-compact-db")
.long("auto-compact-db")
.help("Enable or disable automatic compaction of the database on finalization.")
.takes_value(true)
.default_value("true")
)

/*
* Misc.
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ pub fn get_config<E: EthSpec>(
.map_err(|_| "block-cache-size is not a valid integer".to_string())?;
}

client_config.store.compact_on_init = cli_args.is_present("compact-db");
if let Some(compact_on_prune) = cli_args.value_of("auto-compact-db") {
client_config.store.compact_on_prune = compact_on_prune
.parse()
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
}

/*
* Zero-ports
*
Expand Down
30 changes: 27 additions & 3 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,24 @@ pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;

/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StoreConfig {
/// Number of slots to wait between storing restore points in the freezer database.
pub slots_per_restore_point: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
/// Whether to compact the database on initialization.
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
pub compact_on_prune: bool,
}

/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
pub struct OnDiskStoreConfig {
pub slots_per_restore_point: u64,
// NOTE: redundant, see https://github.com/sigp/lighthouse/issues/1784
pub _block_cache_size: usize,
}

#[derive(Debug, Clone)]
Expand All @@ -27,12 +39,24 @@ impl Default for StoreConfig {
// Safe default for tests, shouldn't ever be read by a CLI node.
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
}
}
}

impl StoreConfig {
pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> {
pub fn as_disk_config(&self) -> OnDiskStoreConfig {
OnDiskStoreConfig {
slots_per_restore_point: self.slots_per_restore_point,
_block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
}
}

pub fn check_compatibility(
&self,
on_disk_config: &OnDiskStoreConfig,
) -> Result<(), StoreConfigError> {
if self.slots_per_restore_point != on_disk_config.slots_per_restore_point {
return Err(StoreConfigError::MismatchedSlotsPerRestorePoint {
config: self.slots_per_restore_point,
Expand All @@ -43,7 +67,7 @@ impl StoreConfig {
}
}

impl StoreItem for StoreConfig {
impl StoreItem for OnDiskStoreConfig {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
Expand Down
41 changes: 35 additions & 6 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::chunked_vector::{
store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots,
};
use crate::config::StoreConfig;
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY,
CompactionTimestamp, PruningCheckpoint, SchemaVersion, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY,
CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::{
Expand All @@ -31,6 +31,7 @@ use std::convert::TryInto;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::*;

/// Defines how blocks should be replayed on states.
Expand Down Expand Up @@ -187,9 +188,16 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
*db.split.write() = split;
}

// Finally, run a garbage collection pass.
// Run a garbage collection pass.
db.remove_garbage()?;

// If configured, run a foreground compaction pass.
if db.config.compact_on_init {
info!(db.log, "Running foreground compaction");
db.compact()?;
info!(db.log, "Foreground compaction complete");
}

Ok(db)
}

Expand Down Expand Up @@ -829,13 +837,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

/// Load previously-stored config from disk.
fn load_config(&self) -> Result<Option<StoreConfig>, Error> {
fn load_config(&self) -> Result<Option<OnDiskStoreConfig>, Error> {
self.hot_db.get(&CONFIG_KEY)
}

/// Write the config to disk.
fn store_config(&self) -> Result<(), Error> {
self.hot_db.put(&CONFIG_KEY, &self.config)
self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config())
}

/// Load the split point from disk.
Expand Down Expand Up @@ -932,6 +940,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

/// Return `true` if compaction on finalization/pruning is enabled.
pub fn compact_on_prune(&self) -> bool {
self.config.compact_on_prune
}

/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
Ok(self
Expand All @@ -944,6 +957,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
}

/// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch.
pub fn load_compaction_timestamp(&self) -> Result<Option<Duration>, Error> {
Ok(self
.hot_db
.get(&COMPACTION_TIMESTAMP_KEY)?
.map(|c: CompactionTimestamp| Duration::from_secs(c.0)))
}

/// Store the timestamp of the last compaction as a `Duration` since the UNIX epoch.
pub fn store_compaction_timestamp(&self, compaction_timestamp: Duration) -> Result<(), Error> {
self.hot_db.put(
&COMPACTION_TIMESTAMP_KEY,
&CompactionTimestamp(compaction_timestamp.as_secs()),
)
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0);
pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1);
pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2);
pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);
Expand Down Expand Up @@ -58,3 +59,20 @@ impl StoreItem for PruningCheckpoint {
})
}
}

/// The last time the database was compacted.
pub struct CompactionTimestamp(pub u64);

impl StoreItem for CompactionTimestamp {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(CompactionTimestamp(u64::from_ssz_bytes(bytes)?))
}
}