Skip to content

Commit

Permalink
Close #59 Support metadata rebuilding
Browse files Browse the repository at this point in the history
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
  • Loading branch information
LeeSmet committed Jul 13, 2021
1 parent 4c1dc4a commit 8407e81
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 11 deletions.
2 changes: 1 addition & 1 deletion zstor/src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type ErasureResult<T> = Result<T, EncodingError>;

/// A data encoder is responsible for encoding original data into multiple shards, and decoding
/// multiple shards back to the original data, if sufficient shards are available.
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub struct Encoder {
data_shards: usize,
parity_shards: usize,
Expand Down
237 changes: 227 additions & 10 deletions zstor/src/zdb_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@ use futures::{
};
use log::{debug, error, trace, warn};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{fmt, io};
use std::{
collections::HashMap,
fmt, io,
path::{Path, PathBuf},
};

/// Amount of data shards to use for the encoder used by the 0-db MetaStore.
const ZDB_METASTORE_DATA_SHARDS: usize = 2;
/// Amount of parity shards to use for the encoder used by the 0-db MetaStore.
const ZDB_METASTORE_PARITY_SHARDS: usize = 2;

// TODO: find a good limit here
/// Concurrent amount of keys being rebuild in a rebuild operation.
const CONCURRENT_KEY_REBUILDS: usize = 10;

/// Configuration to create a 0-db based metadata store
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ZdbMetaStoreConfig {
Expand Down Expand Up @@ -75,6 +82,7 @@ pub struct ZdbMetaStore<E: Encryptor> {
encryptor: E,
prefix: String,
virtual_root: Option<PathBuf>,
writeable: bool,
}

impl<E> ZdbMetaStore<E>
Expand All @@ -90,12 +98,14 @@ where
prefix: String,
virtual_root: Option<PathBuf>,
) -> Self {
let writeable = backends.len() >= encoder.data_shards() + encoder.parity_shards();
Self {
backends,
encoder,
encryptor,
prefix,
virtual_root,
writeable,
}
}

Expand Down Expand Up @@ -231,14 +241,26 @@ where
}
let mut most_keys_idx = 0;
let mut highest_key_count = 0;
let mut healthy_backend = false;
for (idx, ns_info_res) in join_all(ns_requests).await.into_iter().enumerate() {
let ns_info = ns_info_res?;
if ns_info.entries() > highest_key_count {
most_keys_idx = idx;
highest_key_count = ns_info.entries();
// Only utilize reachable backends.
if let Ok(ns_info) = ns_info_res {
healthy_backend = true;
if ns_info.entries() > highest_key_count {
most_keys_idx = idx;
highest_key_count = ns_info.entries();
}
}
}

// If there is no reachable backend, we can't list the keys.
if !healthy_backend {
return Err(ZdbMetaStoreError {
kind: ErrorKind::InsufficientHealthBackends,
internal: InternalError::Other("no healthy backend found to list keys".into()),
});
}

// Now iterate over the keys of the longest backend
Ok(self.backends[most_keys_idx]
.keys()
Expand All @@ -251,8 +273,200 @@ where
}))
}

// hash a path using blake2b with 16 bytes of output, and hex encode the result
// the path is canonicalized before encoding so the full path is used
/// Rebuild an old metdata cluster on a new one. This method does not return untill all known
/// keys are rebuild.
pub async fn rebuild_cluster(&self, old_cluster: &Self) -> ZdbMetaStoreResult<()> {
if !self.writeable {
return Err(ZdbMetaStoreError {
kind: ErrorKind::InsufficientHealthBackends,
internal: InternalError::Other(
"Can't rebuild data to a cluster which is not writeable".into(),
),
});
}

// Rebuild data, for now we skip keys which error but otherwise attempt to complete the
// rebuild.
old_cluster
.keys(&self.prefix)
.await?
.for_each_concurrent(CONCURRENT_KEY_REBUILDS, |key| async move {
if old_cluster.encoder.data_shards() == self.encoder.data_shards()
&& old_cluster.encoder.parity_shards() == self.encoder.parity_shards()
{
if let Err(e) = self.sparse_rebuild(old_cluster, &key).await {
error!("Failed to sparse rebuild key {} on new cluster: {}", key, e);
}
} else {
let value = match old_cluster.read_value(&key).await {
Ok(value) => value,
Err(e) => {
error!("Failed to read key {} from old cluster: {}", key, e);
return;
}
};
if value.is_none() {
debug!("Skipping key {} with no value in rebuild", key);
return;
}
// unwrap here is safe as we just returned on the None variant, leaving only
// the Some variant as option.
if let Err(e) = self.write_value(&key, &value.unwrap()).await {
error!("Failed to write key {} to new cluster: {}", key, e);
}
}
})
.await;
Ok(())
}

/// Helper method which rebuilds a single key from an old cluster on a new cluster. It maps the
/// available shards to the exact instance in the old cluster, so it can be reused if present
/// in the new cluster. Existing shards are not written again to save bandwidth.
///
/// # Panics
///
/// Panics if the encoder configuration for the old and new cluster is different.
async fn sparse_rebuild(&self, old_cluster: &Self, key: &str) -> ZdbMetaStoreResult<()> {
trace!("Sparse rebuild of key {}", key);
// let mut read_requests = Vec::with_capacity(old_cluster.backends.len());
// for backend in old_cluster.backends.iter() {
// read_requests.push((backend.connection_info(), backend.get(key)));
// }
assert_eq!(old_cluster.encoder, self.encoder);

let shard_count = old_cluster.encoder.data_shards() + old_cluster.encoder.parity_shards();
let mut shard_idx: HashMap<&ZdbConnectionInfo, usize> = HashMap::with_capacity(shard_count);
let mut shards = vec![None; shard_count as usize];
for (ci, read_result) in join_all(
old_cluster
.backends
.iter()
.map(|backend| async move { (backend.connection_info(), backend.get(key).await) }),
)
.await
{
// Ignore errored shards
if read_result.is_err() {
// Don't log, as that might spamm the logfile.
continue;
}
// Unwrap here is now safe as we only get here in case of the Ok variant.
if let Some(mut data) = read_result.unwrap() {
if data.is_empty() {
continue;
}
let idx = data[0];
let mut shard: Vec<u8> = data.drain(..).skip(1).collect();
// Sanity check
if idx as usize >= shard_count {
warn!(
"found shard at index {}, but only {} shards are expected for key {}",
idx, shard_count, key
);
continue;
}
// Checksum verification
let saved_checksum = shard.split_off(shard.len() - 16);
let shard = Shard::from(shard);
if saved_checksum != shard.checksum() {
warn!("shard {} checksum verification failed", idx);
continue;
}

shards[idx as usize] = Some(shard.into_inner());
shard_idx.insert(ci, idx as usize);
}
}

let shard_count = shards.iter().filter(|x| x.is_some()).count();

// Nothing to do.
// We skipped keys without data, meaning we won't rebuild a possible dataless key, TODO:
// verify if this is an issue.
if shard_count == 0 {
return Ok(());
}

if shard_count < self.encoder.data_shards() {
error!("key {} is corrupt and cannot be repaired", key);
return Err(ZdbMetaStoreError {
kind: ErrorKind::Encoding,
internal: InternalError::Corruption(CorruptedKey {
available_shards: shard_count,
required_shards: self.encoder.data_shards(),
}),
});
}

let content = self.encoder.decode(shards)?;
let new_shards = self.encoder.encode(content);

let mut new_shards: Vec<Option<Shard>> = new_shards.into_iter().map(Some).collect();
let mut writes = Vec::with_capacity(new_shards.len());

// First iteration, map existing shards. These shards can be removed as we won't write them
// to the backend, they are already present. This optimizes the outgoing bandwidth a bit.
let writeable_backends = self
.backends
.iter()
.filter(|backend| {
if let Some(idx) = shard_idx.get(backend.connection_info()) {
// Remove the shard as it is already present.
new_shards[*idx] = None;
// Filter out the backend as we don't want to write to it again.
false
} else {
true
}
})
.collect::<Vec<_>>();

for backend in writeable_backends {
// Grab the first shard available.
for (idx, shard) in new_shards.iter_mut().enumerate() {
if shard.is_some() {
// Unwrap is safe as we only enter this block in case of the Some variant.
let mut shard = shard.take().unwrap();
// Prepare shard
// Grab the checksum before inserting the shard idx
let checksum = shard.checksum();
shard.insert(0, idx as u8);
trace!(
"Storing data chunk of size {} in backend {} with key {}",
shard.len(),
backend.connection_info().address(),
key
);
shard.extend(&checksum);
writes.push((backend, shard));
}
}
}

let shards_left = new_shards.into_iter().filter(Option::is_some).count();
if shards_left != 0 {
return Err(ZdbMetaStoreError {
kind: ErrorKind::Encoding,
internal: InternalError::Other(
format!("Not all shards would be written in reconstruction, still have {} unassigned shards", shards_left),
),
});
}

// Single write failure is considered an error
try_join_all(
writes
.into_iter()
.map(|(backend, shard)| async move { backend.set(key, &shard).await }),
)
.await?;

Ok(())
}

/// hash a path using blake2b with 16 bytes of output, and hex encode the result
/// the path is canonicalized before encoding so the full path is used
fn build_key(&self, path: &Path) -> ZdbMetaStoreResult<String> {
let canonical_path = canonicalize(path)?;

Expand Down Expand Up @@ -511,10 +725,12 @@ enum ErrorKind {
Encryption,
/// An erorr while using dispersed encoding or decoding.
Encoding,
/// An error regarding a key
/// An error regarding a key.
Key,
/// An otherwise unspecified Io error
/// An otherwise unspecified Io error.
Io,
/// Insufficient healthy backends to perform the request.
InsufficientHealthBackends,
}

impl fmt::Display for ErrorKind {
Expand All @@ -529,6 +745,7 @@ impl fmt::Display for ErrorKind {
ErrorKind::Encoding => "ENCODING",
ErrorKind::Key => "KEY",
ErrorKind::Io => "IO",
ErrorKind::InsufficientHealthBackends => "Insufficient healthy backends",
}
)
}
Expand Down

0 comments on commit 8407e81

Please sign in to comment.