Skip to content

Commit

Permalink
Adding force-migrate-shard flag to run worker command (#2862)
Browse files Browse the repository at this point in the history
  • Loading branch information
silva-fj committed Jul 5, 2024
1 parent 217637f commit 3f206f7
Show file tree
Hide file tree
Showing 26 changed files with 345 additions and 14 deletions.
7 changes: 7 additions & 0 deletions bitacross-worker/core-primitives/enclave-api/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ extern "C" {
shard_size: u32,
) -> sgx_status_t;

pub fn force_migrate_shard(
eid: sgx_enclave_id_t,
retval: *mut sgx_status_t,
new_shard: *const u8,
shard_size: u32,
) -> sgx_status_t;

pub fn ignore_parentchain_block_import_validation_until(
eid: sgx_enclave_id_t,
retval: *mut sgx_status_t,
Expand Down
20 changes: 20 additions & 0 deletions bitacross-worker/core-primitives/enclave-api/src/enclave_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait EnclaveBase: Send + Sync + 'static {
// litentry
fn migrate_shard(&self, old_shard: Vec<u8>, new_shard: Vec<u8>) -> EnclaveResult<()>;

fn force_migrate_shard(&self, new_shard: Vec<u8>) -> EnclaveResult<()>;

/// Publish generated wallets on parachain
fn publish_wallets(&self) -> EnclaveResult<()>;

Expand Down Expand Up @@ -410,6 +412,24 @@ mod impl_ffi {
Ok(())
}

fn force_migrate_shard(&self, new_shard: Vec<u8>) -> EnclaveResult<()> {
let mut retval = sgx_status_t::SGX_SUCCESS;

let result = unsafe {
ffi::force_migrate_shard(
self.eid,
&mut retval,
new_shard.as_ptr(),
new_shard.len() as u32,
)
};

ensure!(result == sgx_status_t::SGX_SUCCESS, Error::Sgx(result));
ensure!(retval == sgx_status_t::SGX_SUCCESS, Error::Sgx(retval));

Ok(())
}

fn publish_wallets(&self) -> EnclaveResult<()> {
let mut retval = sgx_status_t::SGX_SUCCESS;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ pub trait HandleState {
old_shard: ShardIdentifier,
new_shard: ShardIdentifier,
) -> Result<Self::HashType>;

fn force_migrate_shard(&self, new_shard: ShardIdentifier) -> Result<Self::HashType>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,21 @@ where
let (state, _) = self.load_cloned(&old_shard)?;
self.reset(state, &new_shard)
}

fn force_migrate_shard(&self, new_shard: ShardIdentifier) -> Result<Self::HashType> {
if self.shard_exists(&new_shard)? {
let (_, state_hash) = self.load_cloned(&new_shard)?;
return Ok(state_hash)
}
let old_shard = match self.list_shards()? {
shards if shards.len() == 1 => shards[0],
_ =>
return Err(Error::Other(
"Cannot force migrate shard if there is more than one shard".into(),
)),
};
self.migrate_shard(old_shard, new_shard)
}
}

impl<Repository, StateObserver, StateInitializer> QueryShardState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ impl HandleState for HandleStateMock {
self.reset(state, &new_shard)
}

fn force_migrate_shard(&self, new_shard: ShardIdentifier) -> Result<Self::HashType> {
let old_shard = *self.state_map.read().unwrap().keys().next().unwrap();
self.migrate_shard(old_shard, new_shard)
}

fn execute_on_current<E, R>(&self, shard: &ShardIdentifier, executing_function: E) -> Result<R>
where
E: FnOnce(&Self::StateT, Self::HashType) -> R,
Expand Down Expand Up @@ -235,6 +240,26 @@ pub mod tests {
assert_eq!(*inserted_value, value.encode());
}

pub fn force_migrate_shard_works() {
let state_handler = HandleStateMock::default();
let old_shard = ShardIdentifier::default();
let bytes = hex::decode("91de6f606be264f089b155256385470f5395969386894ffba38775442f508ee2")
.unwrap();
let new_shard = ShardIdentifier::from_slice(&bytes);
state_handler.initialize_shard(old_shard).unwrap();

let (lock, mut state) = state_handler.load_for_mutation(&old_shard).unwrap();
let (key, value) = ("my_key", "my_value");
state.insert(key.encode(), value.encode());
state_handler.write_after_mutation(state, lock, &old_shard).unwrap();

state_handler.force_migrate_shard(new_shard).unwrap();
let (new_state, _) = state_handler.load_cloned(&new_shard).unwrap();
let inserted_value =
new_state.get(key.encode().as_slice()).expect("value for key should exist");
assert_eq!(*inserted_value, value.encode());
}

fn decode<T: Decode>(encoded: Vec<u8>) -> T {
T::decode(&mut encoded.as_slice()).unwrap()
}
Expand Down
11 changes: 8 additions & 3 deletions bitacross-worker/enclave-runtime/Enclave.edl
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,16 @@ enclave {
public size_t test_main_entrance();

public sgx_status_t migrate_shard(
[in, size=shard_size] uint8_t* old_shard,
[in, size=shard_size] uint8_t* new_shard,
[in, size=shard_size] uint8_t* old_shard,
[in, size=shard_size] uint8_t* new_shard,
uint32_t shard_size
);


public sgx_status_t force_migrate_shard(
[in, size=shard_size] uint8_t* new_shard,
uint32_t shard_size
);

public sgx_status_t ignore_parentchain_block_import_validation_until(
[in] uint32_t* until
);
Expand Down
6 changes: 6 additions & 0 deletions bitacross-worker/enclave-runtime/src/initialization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,12 @@ pub(crate) fn migrate_shard(
Ok(())
}

pub(crate) fn force_migrate_shard(new_shard: ShardIdentifier) -> EnclaveResult<()> {
let state_handler = GLOBAL_STATE_HANDLER_COMPONENT.get()?;
let _ = state_handler.force_migrate_shard(new_shard)?;
Ok(())
}

/// Initialize the TOP pool author component.
pub fn create_top_pool_author(
rpc_responder: Arc<EnclaveRpcResponder>,
Expand Down
16 changes: 16 additions & 0 deletions bitacross-worker/enclave-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,22 @@ pub unsafe extern "C" fn migrate_shard(
sgx_status_t::SGX_SUCCESS
}

#[no_mangle]
pub unsafe extern "C" fn force_migrate_shard(
new_shard: *const u8,
shard_size: u32,
) -> sgx_status_t {
let shard_identifier =
ShardIdentifier::from_slice(slice::from_raw_parts(new_shard, shard_size as usize));

if let Err(e) = initialization::force_migrate_shard(shard_identifier) {
error!("Failed to (force) migrate shard ({:?}): {:?}", shard_identifier, e);
return sgx_status_t::SGX_ERROR_UNEXPECTED
}

sgx_status_t::SGX_SUCCESS
}

#[no_mangle]
pub unsafe extern "C" fn sync_parentchain(
blocks_to_sync: *const u8,
Expand Down
4 changes: 4 additions & 0 deletions bitacross-worker/service/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ subcommands:
long: request-state
short: r
help: Run the worker and request key and state provisioning from another worker.
- force-migrate-shard:
long: force-migrate-shard
help: Force migrate the shard before starting the worker
required: false
- request-state:
about: (Deprecated - TODO) join a shard by requesting key provisioning from another worker
args:
Expand Down
8 changes: 6 additions & 2 deletions bitacross-worker/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use clap::ArgMatches;
use itc_rest_client::rest_client::Url;
use itp_types::parentchain::ParentchainId;
use itp_types::{parentchain::ParentchainId, ShardIdentifier};
use parse_duration::parse;
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -288,6 +288,8 @@ pub struct RunConfig {
marblerun_base_url: Option<String>,
/// parentchain which should be used for shielding/unshielding the stf's native token
pub shielding_target: Option<ParentchainId>,
/// Whether to migrate the shard before initializing the enclave
pub force_migrate_shard: bool,
}

impl RunConfig {
Expand Down Expand Up @@ -332,7 +334,9 @@ impl From<&ArgMatches<'_>> for RunConfig {
i
),
});
Self { skip_ra, dev, shard, marblerun_base_url, shielding_target }
let force_migrate_shard = m.is_present("force-migrate-shard");

Self { skip_ra, dev, shard, marblerun_base_url, shielding_target, force_migrate_shard }
}
}

Expand Down
4 changes: 4 additions & 0 deletions bitacross-worker/service/src/main_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ pub(crate) fn main() {

if clean_reset {
setup::initialize_shard_and_keys(enclave.as_ref(), &shard).unwrap();
} else if run_config.force_migrate_shard {
setup::force_migrate_shard(enclave.as_ref(), &shard);
let new_shard_name = shard.encode().to_base58();
setup::remove_old_shards(config.data_dir(), &new_shard_name);
}

let node_api =
Expand Down
56 changes: 54 additions & 2 deletions bitacross-worker/service/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use std::{fs, path::Path};

#[cfg(feature = "link-binary")]
pub(crate) use needs_enclave::{
generate_shielding_key_file, generate_signing_key_file, init_shard, initialize_shard_and_keys,
migrate_shard,
force_migrate_shard, generate_shielding_key_file, generate_signing_key_file, init_shard,
initialize_shard_and_keys, migrate_shard,
};

#[cfg(feature = "link-binary")]
Expand Down Expand Up @@ -97,6 +97,17 @@ mod needs_enclave {
}
}

pub(crate) fn force_migrate_shard(enclave: &Enclave, &new_shard: &ShardIdentifier) {
match enclave.force_migrate_shard(new_shard.encode()) {
Err(e) => {
panic!("Failed to force migrate shard {:?}. {:?}", new_shard, e);
},
Ok(_) => {
println!("Successfully force migrate shard {:?}", new_shard);
},
}
}

pub(crate) fn generate_signing_key_file(enclave: &Enclave) {
info!("*** Get the signing key from the TEE\n");
let pubkey = enclave.get_ecc_signing_pubkey().unwrap();
Expand Down Expand Up @@ -126,6 +137,17 @@ mod needs_enclave {
}
}

/// backs up shard directory and restores it after cleaning shards directory
pub(crate) fn remove_old_shards(root_dir: &Path, new_shard_name: &str) {
let shard_backup = root_dir.join("shard_backup");
let shard_dir = root_dir.join(SHARDS_PATH).join(new_shard_name);

fs::rename(shard_dir.clone(), shard_backup.clone()).expect("Failed to backup shard");
remove_dir_if_it_exists(root_dir, SHARDS_PATH).expect("Failed to remove shards directory");
fs::create_dir_all(root_dir.join(SHARDS_PATH)).expect("Failed to create shards directory");
fs::rename(shard_backup, shard_dir).expect("Failed to restore shard");
}

/// Purge all worker files from `dir`.
pub(crate) fn purge_files_from_dir(dir: &Path) -> ServiceResult<()> {
println!("[+] Performing a clean reset of the worker");
Expand Down Expand Up @@ -217,6 +239,36 @@ mod tests {
assert!(purge_files(&root_directory).is_ok());
}

#[test]
fn test_remove_old_shards() {
let test_directory_handle = TestDirectoryHandle::new(PathBuf::from("test_backup_shard"));
let root_directory = test_directory_handle.path();
let shard_1_name = "test_shard_1";
let shard_2_name = "test_shard_2";

let shard_1_dir = root_directory.join(SHARDS_PATH).join(shard_1_name);
fs::create_dir_all(&shard_1_dir).unwrap();
fs::File::create(shard_1_dir.join("test_state.bin")).unwrap();
fs::File::create(shard_1_dir.join("test_state_2.bin")).unwrap();

let shard_2_dir = root_directory.join(SHARDS_PATH).join(shard_2_name);
fs::create_dir_all(&shard_2_dir).unwrap();
fs::File::create(shard_2_dir.join("test_state.bin")).unwrap();

assert!(root_directory.join(SHARDS_PATH).join(shard_2_name).exists());

remove_old_shards(root_directory, shard_1_name);

assert!(root_directory.join(SHARDS_PATH).join(shard_1_name).exists());
assert_eq!(
fs::read_dir(root_directory.join(SHARDS_PATH).join(shard_1_name))
.expect("Failed to read shard directory")
.count(),
2
);
assert!(!root_directory.join(SHARDS_PATH).join(shard_2_name).exists());
}

/// Directory handle to automatically initialize a directory
/// and upon dropping the reference, removing it again.
struct TestDirectoryHandle {
Expand Down
4 changes: 4 additions & 0 deletions bitacross-worker/service/src/tests/mocks/enclave_api_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl EnclaveBase for EnclaveMock {
fn init_wallets(&self, _base_dir: &str) -> EnclaveResult<()> {
unimplemented!()
}

fn force_migrate_shard(&self, new_shard: Vec<u8>) -> EnclaveResult<()> {
unimplemented!()
}
}

impl Sidechain for EnclaveMock {
Expand Down
8 changes: 8 additions & 0 deletions tee-worker/core-primitives/enclave-api/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ extern "C" {
shard_size: u32,
) -> sgx_status_t;

// litentry
pub fn force_migrate_shard(
eid: sgx_enclave_id_t,
retval: *mut sgx_status_t,
new_shard: *const u8,
shard_size: u32,
) -> sgx_status_t;

pub fn ignore_parentchain_block_import_validation_until(
eid: sgx_enclave_id_t,
retval: *mut sgx_status_t,
Expand Down
20 changes: 20 additions & 0 deletions tee-worker/core-primitives/enclave-api/src/enclave_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub trait EnclaveBase: Send + Sync + 'static {

// litentry
fn migrate_shard(&self, old_shard: Vec<u8>, new_shard: Vec<u8>) -> EnclaveResult<()>;

fn force_migrate_shard(&self, new_shard: Vec<u8>) -> EnclaveResult<()>;
}

/// EnclaveApi implementation for Enclave struct
Expand Down Expand Up @@ -387,6 +389,24 @@ mod impl_ffi {

Ok(())
}

fn force_migrate_shard(&self, new_shard: Vec<u8>) -> EnclaveResult<()> {
let mut retval = sgx_status_t::SGX_SUCCESS;

let result = unsafe {
ffi::force_migrate_shard(
self.eid,
&mut retval,
new_shard.as_ptr(),
new_shard.len() as u32,
)
};

ensure!(result == sgx_status_t::SGX_SUCCESS, Error::Sgx(result));
ensure!(retval == sgx_status_t::SGX_SUCCESS, Error::Sgx(retval));

Ok(())
}
}

fn init_parentchain_components_ffi(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,7 @@ pub trait HandleState {
old_shard: ShardIdentifier,
new_shard: ShardIdentifier,
) -> Result<Self::HashType>;

/// Force migrate state to new shard
fn force_migrate_shard(&self, new_shard: ShardIdentifier) -> Result<Self::HashType>;
}
15 changes: 15 additions & 0 deletions tee-worker/core-primitives/stf-state-handler/src/state_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ where

Ok(new_shard_state_hash)
}

fn force_migrate_shard(&self, new_shard: ShardIdentifier) -> Result<Self::HashType> {
if self.shard_exists(&new_shard)? {
let (_, state_hash) = self.load_cloned(&new_shard)?;
return Ok(state_hash)
}
let old_shard = match self.list_shards()? {
shards if shards.len() == 1 => shards[0],
_ =>
return Err(Error::Other(
"Cannot force migrate shard. There is more than 1 shard in the list".into(),
)),
};
self.migrate_shard(old_shard, new_shard)
}
}

impl<Repository, StateObserver, StateInitializer> QueryShardState
Expand Down
Loading

0 comments on commit 3f206f7

Please sign in to comment.