Skip to content

Commit

Permalink
Adressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andresilva91 committed Jul 8, 2024
1 parent b8f8e05 commit 48c1f52
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 134 deletions.
7 changes: 0 additions & 7 deletions linera-base/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,6 @@ pub struct HashedBlob {
pub blob: Blob,
}

/// The state of a blob of binary data.
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
pub struct BlobState {
/// Hash of the last `Certificate` that published or used this blob.
pub last_used_by: CryptoHash,
}

impl HashedBlob {
/// Loads a hashed blob from a file.
pub async fn load_from_file(path: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
Expand Down
6 changes: 0 additions & 6 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,6 @@ impl BlobId {
pub fn new(blob: &Blob) -> Self {
BlobId(CryptoHash::new(blob))
}

/// Returns the blob ID of `TestString(s)`, for testing purposes.
#[cfg(with_testing)]
pub fn test_blob_id(s: impl Into<String>) -> Self {
BlobId(CryptoHash::test_hash(s))
}
}

impl Display for BlobId {
Expand Down
15 changes: 14 additions & 1 deletion linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{borrow::Cow, collections::HashSet};
use async_graphql::SimpleObject;
use linera_base::{
crypto::{BcsHashable, BcsSignable, CryptoError, CryptoHash, KeyPair, PublicKey, Signature},
data_types::{Amount, BlockHeight, HashedBlob, OracleRecord, Round, Timestamp},
data_types::{Amount, BlockHeight, HashedBlob, OracleRecord, OracleResponse, Round, Timestamp},
doc_scalar, ensure,
identifiers::{
Account, BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner,
Expand Down Expand Up @@ -744,6 +744,19 @@ impl ExecutedBlock {
index,
}
}

pub fn required_blob_ids(&self) -> HashSet<BlobId> {
let mut required_blob_ids = HashSet::new();
for record in &self.outcome.oracle_records {
for response in &record.responses {
if let OracleResponse::Blob(blob_id) = response {
required_blob_ids.insert(*blob_id);
}
}
}

required_blob_ids
}
}

impl BlockExecutionOutcome {
Expand Down
89 changes: 44 additions & 45 deletions linera-core/src/chain_worker/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ use futures::{
};
use linera_base::{
crypto::CryptoHash,
data_types::{
ArithmeticError, BlockHeight, HashedBlob, OracleRecord, OracleResponse, Timestamp,
},
data_types::{ArithmeticError, BlockHeight, HashedBlob, Timestamp},
ensure,
identifiers::{BlobId, ChainId},
};
Expand All @@ -31,8 +29,8 @@ use linera_chain::{
};
use linera_execution::{
committee::{Committee, Epoch},
BytecodeLocation, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest,
UserApplicationDescription, UserApplicationId,
BlobState, BytecodeLocation, ExecutionRequest, Query, QueryContext, Response,
ServiceRuntimeRequest, UserApplicationDescription, UserApplicationId,
};
use linera_storage::Storage;
use linera_views::{
Expand Down Expand Up @@ -309,13 +307,14 @@ where
async fn check_no_missing_blobs(
&self,
block: &Block,
blobs_in_block: HashSet<BlobId>,
hashed_certificate_values: &[HashedCertificateValue],
hashed_blobs: &[HashedBlob],
) -> Result<(), WorkerError> {
let missing_bytecodes = self
.get_missing_bytecodes(block, hashed_certificate_values)
.await?;
let missing_blobs = self.get_missing_blobs(block, hashed_blobs).await?;
let missing_blobs = self.get_missing_blobs(blobs_in_block, hashed_blobs).await?;

if missing_bytecodes.is_empty() && missing_blobs.is_empty() {
return Ok(());
Expand All @@ -330,10 +329,9 @@ where
/// Returns the blobs required by the block that we don't have, or an error if unrelated blobs were provided.
async fn get_missing_blobs(
&self,
block: &Block,
mut required_blob_ids: HashSet<BlobId>,
hashed_blobs: &[HashedBlob],
) -> Result<Vec<BlobId>, WorkerError> {
let mut required_blob_ids = block.published_blob_ids();
// Find all certificates containing blobs used when executing this block.
for hashed_blob in hashed_blobs {
let blob_id = hashed_blob.id();
Expand All @@ -344,13 +342,26 @@ where
}

let pending_blobs = &self.chain.manager.get().pending_blobs;
Ok(self
let tasks = self
.recent_hashed_blobs
.subtract_cached_items_from::<_, Vec<_>>(required_blob_ids, |id| id)
.await
.into_iter()
.filter(|blob_id| !pending_blobs.contains_key(blob_id))
.collect())
.map(|blob_id| {
self.storage
.contains_blob(blob_id)
.map(move |result| (blob_id, result))
});

let mut missing_blobs = vec![];
for (blob_id, result) in future::join_all(tasks).await {
if !result? {
missing_blobs.push(blob_id);
}
}

Ok(missing_blobs)
}

/// Returns the blobs requested by their `blob_ids` that are either in pending in the
Expand Down Expand Up @@ -406,10 +417,8 @@ where
.collect::<Vec<_>>();
let mut missing_locations = vec![];
for (location, result) in future::join_all(tasks).await {
match result {
Ok(true) => {}
Ok(false) => missing_locations.push(location),
Err(err) => Err(err)?,
if !result? {
missing_locations.push(location);
}
}

Expand Down Expand Up @@ -685,7 +694,12 @@ where
// Verify that all required bytecode hashed certificate values and blobs are available, and no
// unrelated ones provided.
self.0
.check_no_missing_blobs(block, hashed_certificate_values, hashed_blobs)
.check_no_missing_blobs(
block,
block.published_blob_ids(),
hashed_certificate_values,
hashed_blobs,
)
.await?;
// Write the values so that the bytecode is available during execution.
// TODO(#2199): We should not persist anything in storage before the block is confirmed.
Expand Down Expand Up @@ -1070,7 +1084,12 @@ where
// Verify that all required bytecode hashed certificate values and blobs are available, and no
// unrelated ones provided.
self.state
.check_no_missing_blobs(block, hashed_certificate_values, hashed_blobs)
.check_no_missing_blobs(
block,
required_blob_ids.clone(),
hashed_certificate_values,
hashed_blobs,
)
.await?;
// Persist certificate and hashed certificate values.
self.state
Expand All @@ -1083,31 +1102,7 @@ where
.await;
}

let blob_ids_in_oracle_records = oracle_records
.iter()
.flat_map(|record| {
record.responses.iter().filter_map(|response| {
if let OracleResponse::Blob(blob_id) = response {
Some(blob_id)
} else {
None
}
})
})
.collect::<HashSet<_>>();

let block_blob_ids = block.published_blob_ids();
let missing_blobs_from_oracles = block_blob_ids
.iter()
.filter(|block_blob_id| !blob_ids_in_oracle_records.contains(block_blob_id))
.cloned()
.collect::<Vec<_>>();
ensure!(
missing_blobs_from_oracles.is_empty(),
WorkerError::MissingBlockBlobsInOracles(missing_blobs_from_oracles)
);

let blobs_in_block = self.state.get_blobs(block_blob_ids).await?;
let blobs_in_block = self.state.get_blobs(required_blob_ids.clone()).await?;
let certificate_hash = certificate.hash();

let (result_hashed_certificate_value, result_blobs, result_certificate) = tokio::join!(
Expand All @@ -1122,10 +1117,14 @@ where
result_certificate?;

// Update the blob state with last used certificate hash.
try_join_all(blob_ids_in_oracle_records.into_iter().map(|blob_id| {
self.state
.storage
.write_blob_state(*blob_id, certificate_hash)
try_join_all(required_blob_ids.into_iter().map(|blob_id| {
self.state.storage.maybe_write_blob_state(
blob_id,
BlobState {
last_used_by: certificate_hash,
epoch: certificate.value().epoch(),
},
)
}))
.await?;

Expand Down
10 changes: 10 additions & 0 deletions linera-core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,16 @@ where
client_a.synchronize_from_validators().await.unwrap();
assert_eq!(expected_blob_id, blob_id);
assert_eq!(certificate.round, Round::MultiLeader(0));
assert!(certificate
.value()
.executed_block()
.unwrap()
.outcome
.oracle_records
.iter()
.any(|record| record
.responses
.contains(&OracleResponse::Blob(expected_blob_id))));
let previous_block_hash = client_a.block_hash().unwrap();

// Validators goes back up
Expand Down
30 changes: 27 additions & 3 deletions linera-core/src/unit_tests/wasm_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,49 @@ where

let contract_blob = HashedBlob::load_from_file(contract_path.clone()).await?;
let expected_contract_blob_id = contract_blob.id();
let (blob_id, _) = publisher
let (blob_id, certificate) = publisher
.publish_blob(contract_blob.clone())
.await
.unwrap()
.unwrap();
assert_eq!(expected_contract_blob_id, blob_id);
assert!(certificate
.value()
.executed_block()
.unwrap()
.outcome
.oracle_records
.iter()
.any(|record| record.responses.contains(&OracleResponse::Blob(blob_id))));

let service_blob = HashedBlob::load_from_file(service_path.clone()).await?;
let expected_service_blob_id = service_blob.id();
let (blob_id, _) = publisher.publish_blob(service_blob).await.unwrap().unwrap();
let (blob_id, certificate) = publisher.publish_blob(service_blob).await.unwrap().unwrap();
assert_eq!(expected_service_blob_id, blob_id);
assert!(certificate
.value()
.executed_block()
.unwrap()
.outcome
.oracle_records
.iter()
.any(|record| record.responses.contains(&OracleResponse::Blob(blob_id))));

// If I try to upload the contract blob again, I should get the same blob ID
let (blob_id, _) = publisher
let (blob_id, certificate) = publisher
.publish_blob(contract_blob)
.await
.unwrap()
.unwrap();
assert_eq!(expected_contract_blob_id, blob_id);
assert!(certificate
.value()
.executed_block()
.unwrap()
.outcome
.oracle_records
.iter()
.any(|record| record.responses.contains(&OracleResponse::Blob(blob_id))));

let (bytecode_id, cert) = publisher
.publish_bytecode(
Expand Down
3 changes: 0 additions & 3 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ pub enum WorkerError {
ApplicationBytecodesOrBlobsNotFound(Vec<BytecodeLocation>, Vec<BlobId>),
#[error("The block proposal is invalid: {0}")]
InvalidBlockProposal(String),

#[error("Block blobs missing from oracles: {0:?}")]
MissingBlockBlobsInOracles(Vec<BlobId>),
}

impl From<linera_chain::ChainError> for WorkerError {
Expand Down
4 changes: 2 additions & 2 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ impl Debug for ExecutionRequest {
.field("content_type", content_type)
.finish_non_exhaustive(),

Request::ReadBlob { blob_id, .. } => formatter
.debug_struct("Request::ReadBlob")
ExecutionRequest::ReadBlob { blob_id, .. } => formatter
.debug_struct("ExecutionRequest::ReadBlob")
.field("blob_id", blob_id)
.finish_non_exhaustive(),
}
Expand Down
10 changes: 10 additions & 0 deletions linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{fmt, str::FromStr, sync::Arc};

use async_graphql::SimpleObject;
use async_trait::async_trait;
use committee::Epoch;
use custom_debug_derive::Debug;
use dashmap::DashMap;
use derive_more::Display;
Expand Down Expand Up @@ -1017,6 +1018,15 @@ impl std::fmt::Debug for Bytecode {
}
}

/// The state of a blob of binary data.
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
pub struct BlobState {
/// Hash of the last `Certificate` that published or used this blob.
pub last_used_by: CryptoHash,
/// Epoch of the `last_used_by` certificate.
pub epoch: Epoch,
}

/// The runtime to use for running the application.
#[derive(Clone, Copy, Display)]
#[cfg_attr(with_wasm_runtime, derive(Debug, Default))]
Expand Down
2 changes: 1 addition & 1 deletion linera-execution/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ impl<UserInstance> BaseRuntime for SyncRuntimeInternal<UserInstance> {
}
let blob = self
.execution_state_sender
.send_request(|callback| Request::ReadBlob {
.send_request(|callback| ExecutionRequest::ReadBlob {
blob_id: *blob_id,
callback,
})?
Expand Down
Loading

0 comments on commit 48c1f52

Please sign in to comment.