Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ pub enum CrucibleError {

#[error("context slot deserialization failed: {0}")]
BadContextSlot(String),

#[error("missing block context for non-empty block")]
MissingBlockContext,
}

impl From<std::io::Error> for CrucibleError {
Expand Down Expand Up @@ -398,7 +401,8 @@ impl From<CrucibleError> for dropshot::HttpError {
| CrucibleError::UuidMismatch
| CrucibleError::MissingContextSlot(..)
| CrucibleError::BadMetadata(..)
| CrucibleError::BadContextSlot(..) => {
| CrucibleError::BadContextSlot(..)
| CrucibleError::MissingBlockContext => {
dropshot::HttpError::for_internal_error(e.to_string())
}
}
Expand Down
47 changes: 21 additions & 26 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,10 +1170,15 @@ impl DownstairsClient {
/// Handles a single IO operation
///
/// Returns `true` if the job is now ackable, `false` otherwise
///
/// If this is a read response, then the values in `responses` must
/// _already_ be decrypted (with corresponding hashes stored in
/// `read_response_hashes`).
pub(crate) fn process_io_completion(
&mut self,
job: &mut DownstairsIO,
mut responses: Result<Vec<ReadResponse>, CrucibleError>,
responses: Result<Vec<ReadResponse>, CrucibleError>,
read_response_hashes: Vec<Option<u64>>,
deactivate: bool,
extent_info: Option<ExtentInfo>,
) -> bool {
Expand All @@ -1189,26 +1194,9 @@ impl DownstairsClient {
let mut jobs_completed_ok = job.state_count().completed_ok();
let mut ackable = false;

// Validate integrity hashes and optionally authenticated decryption.
//
// With AE, responses can come back that are invalid given an encryption
// context. Test this here. If it fails, then something has gone
// irrecoverably wrong and we should panic.
let mut read_response_hashes = Vec::new();
let new_state = match &mut responses {
Ok(responses) => {
responses.iter_mut().for_each(|x| {
let mh =
if let Some(context) = &self.cfg.encryption_context {
validate_encrypted_read_response(
x, context, &self.log,
)
} else {
validate_unencrypted_read_response(x, &self.log)
}
.expect("decryption failed");
read_response_hashes.push(mh);
});
let new_state = match &responses {
Ok(..) => {
// Messages have already been decrypted out-of-band
jobs_completed_ok += 1;
IOState::Done
}
Expand Down Expand Up @@ -2620,8 +2608,12 @@ pub(crate) fn validate_encrypted_read_response(
// expect to see this case unless this is a blank block.
//
// XXX if it's not a blank block, we may be under attack?
assert!(response.data[..].iter().all(|&x| x == 0));
return Ok(None);
if response.data[..].iter().all(|&x| x == 0) {
return Ok(None);
} else {
error!(log, "got empty block context with non-blank block");
return Err(CrucibleError::MissingBlockContext);
}
}

let mut valid_hash = None;
Expand Down Expand Up @@ -2786,9 +2778,12 @@ pub(crate) fn validate_unencrypted_read_response(
// this case unless this is a blank block.
//
// XXX if it's not a blank block, we may be under attack?
assert!(response.data[..].iter().all(|&x| x == 0));

Ok(None)
if response.data[..].iter().all(|&x| x == 0) {
Ok(None)
} else {
error!(log, "got empty block context with non-blank block");
Err(CrucibleError::MissingBlockContext)
}
}
}

Expand Down
82 changes: 80 additions & 2 deletions upstairs/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
use std::sync::Arc;

use crate::{
upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ImpactedBlocks,
upstairs::UpstairsConfig, BlockContext, BlockReq, BlockRes, ClientId,
ImpactedBlocks, Message,
};
use bytes::Bytes;
use crucible_common::{integrity_hash, CrucibleError, RegionDefinition};
use futures::{
future::{Either, Ready},
future::{ready, Either, Ready},
stream::FuturesOrdered,
StreamExt,
};
use slog::{error, Logger};
use tokio::sync::oneshot;

/// Future stored in a [`DeferredQueue`]
Expand Down Expand Up @@ -70,6 +72,18 @@ impl<T> DeferredQueue<T> {
t.map(|t| t.expect("oneshot failed"))
}

/// Stores a new future in the queue, marking it as non-empty
pub fn push_immediate(&mut self, t: T) {
self.push_back(Either::Left(ready(Ok(t))));
}

/// Stores a new pending oneshot in the queue, returning the sender
pub fn push_oneshot(&mut self) -> oneshot::Sender<T> {
let (rx, tx) = oneshot::channel();
self.push_back(Either::Right(tx));
rx
}

/// Check whether the queue is known to be empty
///
/// It is possible for this to return `false` if the queue is actually
Expand Down Expand Up @@ -184,3 +198,67 @@ impl DeferredWrite {
})
}
}

////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub(crate) struct DeferredMessage {
pub message: Message,

/// If this was a `ReadResponse`, then the hashes are stored here
pub hashes: Vec<Option<u64>>,

pub client_id: ClientId,
}

/// Standalone data structure which can perform decryption
pub(crate) struct DeferredRead {
/// Message, which must be a `ReadResponse`
pub message: Message,

pub client_id: ClientId,
pub cfg: Arc<UpstairsConfig>,
pub log: Logger,
}

impl DeferredRead {
/// Consume the `DeferredRead` and perform decryption
///
/// If decryption fails, then the resulting `Message` has an error in the
/// `responses` field, and `hashes` is empty.
pub fn run(mut self) -> DeferredMessage {
use crate::client::{
validate_encrypted_read_response,
validate_unencrypted_read_response,
};
let Message::ReadResponse { responses, .. } = &mut self.message else {
panic!("invalid DeferredRead");
};
let mut hashes = vec![];

if let Ok(rs) = responses {
for r in rs.iter_mut() {
let v = if let Some(ctx) = &self.cfg.encryption_context {
validate_encrypted_read_response(r, ctx, &self.log)
} else {
validate_unencrypted_read_response(r, &self.log)
};
match v {
Ok(hash) => hashes.push(hash),
Err(e) => {
error!(self.log, "decryption failure: {e:?}");
*responses = Err(e);
hashes.clear();
break;
}
}
}
}

DeferredMessage {
client_id: self.client_id,
message: self.message,
hashes,
}
}
}
19 changes: 19 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,7 @@ impl Downstairs {
&mut self,
client_id: ClientId,
m: Message,
read_response_hashes: Vec<Option<u64>>,
up_state: &UpstairsState,
) -> Result<(), CrucibleError> {
let (upstairs_id, session_id, ds_id, read_data, extent_info) = match &m
Expand Down Expand Up @@ -3133,6 +3134,7 @@ impl Downstairs {
ds_id,
client_id,
read_data,
read_response_hashes,
up_state,
extent_info,
);
Expand Down Expand Up @@ -3181,10 +3183,18 @@ impl Downstairs {
extent_info: Option<ExtentInfo>,
) -> bool {
let was_ackable = self.ackable_work.contains(&ds_id);

// Make up dummy values for hashes, since they're not actually checked
// here (besides confirming that we have the correct number).
let hashes = match &responses {
Ok(r) => vec![Some(0); r.len()],
Err(..) => vec![],
};
self.process_io_completion_inner(
ds_id,
client_id,
responses,
hashes,
up_state,
extent_info,
);
Expand All @@ -3198,6 +3208,7 @@ impl Downstairs {
ds_id: JobId,
client_id: ClientId,
responses: Result<Vec<ReadResponse>, CrucibleError>,
read_response_hashes: Vec<Option<u64>>,
up_state: &UpstairsState,
extent_info: Option<ExtentInfo>,
) {
Expand Down Expand Up @@ -3226,9 +3237,17 @@ impl Downstairs {
return;
};

// Sanity-checking for a programmer error during offloaded decryption.
// If we didn't get one hash per read block, then `responses` must
// have been converted into `Err(..)`.
if let Ok(reads) = &responses {
assert_eq!(reads.len(), read_response_hashes.len());
}

if self.clients[client_id].process_io_completion(
job,
responses,
read_response_hashes,
deactivate,
extent_info,
) {
Expand Down
Loading