Skip to content

Commit

Permalink
fix: check SAF message stored_at is not in the future
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Oct 11, 2021
1 parent 92dee77 commit b516fe0
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 54 deletions.
3 changes: 2 additions & 1 deletion comms/dht/src/store_forward/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub enum StoreAndForwardError {
MessageOriginRequired,
#[error("The message was malformed")]
MalformedMessage,

#[error("StorageError: {0}")]
StorageError(#[from] StorageError),
#[error("The store and forward service requester channel closed")]
Expand All @@ -81,4 +80,6 @@ pub enum StoreAndForwardError {
InvalidDhtMessageType,
#[error("Failed to send request for store and forward messages: {0}")]
RequestMessagesFailed(DhtOutboundError),
#[error("Invalid SAF request: `stored_at` cannot be in the future")]
StoredAtWasInFuture,
}
189 changes: 136 additions & 53 deletions comms/dht/src/store_forward/saf_handler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
let response = msg
.decode_part::<StoredMessagesResponse>(0)?
.ok_or(StoreAndForwardError::InvalidEnvelopeBody)?;
let source_peer = Arc::new(message.source_peer);
let source_peer = message.source_peer.clone();

debug!(
target: LOG_TARGET,
Expand All @@ -290,13 +290,9 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
message_tag
);

let mut results = Vec::with_capacity(response.messages.len());
for msg in response.messages {
let result = self
.process_incoming_stored_message(Arc::clone(&source_peer), msg)
.await;
results.push(result);
}
let results = self
.process_incoming_stored_messages(source_peer.clone(), response.messages)
.await?;

let successful_msgs_iter = results
.into_iter()
Expand Down Expand Up @@ -376,26 +372,65 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
Ok(())
}

async fn process_incoming_stored_message(
async fn process_incoming_stored_messages(
&mut self,
source_peer: Arc<Peer>,
messages: Vec<ProtoStoredMessage>,
) -> Result<Vec<Result<DecryptedDhtMessage, StoreAndForwardError>>, StoreAndForwardError> {
let mut last_saf_received = self
.dht_requester
.get_metadata::<DateTime<Utc>>(DhtMetadataKey::LastSafMessageReceived)
.await?;

let mut results = Vec::with_capacity(messages.len());
for msg in messages {
let result = self
.validate_and_decrypt_incoming_stored_message(Arc::clone(&source_peer), msg)
.await;

if let Ok((_, stored_at)) = result.as_ref() {
if last_saf_received.as_ref().map(|dt| stored_at > dt).unwrap_or(true) {
last_saf_received = Some(*stored_at);
}
}

results.push(result.map(|(msg, _)| msg));
}

if let Some(last_saf_received) = last_saf_received {
self.dht_requester
.set_metadata(DhtMetadataKey::LastSafMessageReceived, last_saf_received)
.await?;
}

Ok(results)
}

async fn validate_and_decrypt_incoming_stored_message(
&mut self,
source_peer: Arc<Peer>,
message: ProtoStoredMessage,
) -> Result<DecryptedDhtMessage, StoreAndForwardError> {
) -> Result<(DecryptedDhtMessage, DateTime<Utc>), StoreAndForwardError> {
let node_identity = &self.node_identity;
let peer_manager = &self.peer_manager;
let config = &self.config;

if message.dht_header.is_none() {
return Err(StoreAndForwardError::DhtHeaderNotProvided);
}

let stored_at = match message.stored_at {
None => chrono::MIN_DATETIME,
Some(t) => DateTime::from_utc(
NaiveDateTime::from_timestamp(t.seconds, t.nanos.try_into().unwrap_or(0)),
Utc,
),
};
let stored_at = message
.stored_at
.map(|t| {
DateTime::from_utc(
NaiveDateTime::from_timestamp(t.seconds, t.nanos.try_into().unwrap_or(u32::MAX)),
Utc,
)
})
.unwrap_or(chrono::MIN_DATETIME);

if stored_at > Utc::now() {
return Err(StoreAndForwardError::StoredAtWasInFuture);
}

let dht_header: DhtMessageHeader = message
.dht_header
Expand Down Expand Up @@ -441,31 +476,9 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
DhtInboundMessage::new(MessageTag::new(), dht_header, Arc::clone(&source_peer), message.body);
inbound_msg.is_saf_message = true;

let last_saf_received = self
.dht_requester
.get_metadata::<DateTime<Utc>>(DhtMetadataKey::LastSafMessageReceived)
.await
.ok()
.flatten()
.unwrap_or(chrono::MIN_DATETIME);

if stored_at > last_saf_received {
if let Err(err) = self
.dht_requester
.set_metadata(DhtMetadataKey::LastSafMessageReceived, stored_at)
.await
{
warn!(
target: LOG_TARGET,
"Failed to set last SAF message received timestamp: {:?}", err
);
}
}

Ok(DecryptedDhtMessage::succeeded(
decrypted_body,
authenticated_pk,
inbound_msg,
Ok((
DecryptedDhtMessage::succeeded(decrypted_body, authenticated_pk, inbound_msg),
stored_at,
))
}

Expand Down Expand Up @@ -587,8 +600,7 @@ mod test {
service_spy,
},
};
use chrono::{Duration as OldDuration, Utc};
use prost::Message;
use chrono::Utc;
use std::time::Duration;
use tari_comms::{message::MessageExt, runtime, wrap_in_envelope_body};
use tari_crypto::tari_utilities::hex;
Expand Down Expand Up @@ -688,9 +700,8 @@ mod test {
}
assert_eq!(oms_mock_state.call_count(), 1);

let call = oms_mock_state.pop_call().unwrap();
let body = call.1.to_vec();
let body = EnvelopeBody::decode(body.as_slice()).unwrap();
let (_, body) = oms_mock_state.pop_call().unwrap();
let body = EnvelopeBody::decode(body.as_ref()).unwrap();
let msg = body.decode_part::<StoredMessagesResponse>(0).unwrap().unwrap();
assert_eq!(msg.messages().len(), 0);
assert!(!spy.is_called());
Expand All @@ -702,7 +713,7 @@ mod test {
assert!(fetch_call.contains(format!("{:?}", since).as_str()));

let msg1_time = Utc::now()
.checked_sub_signed(OldDuration::from_std(Duration::from_secs(120)).unwrap())
.checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(120)).unwrap())
.unwrap();
let msg1 = "one".to_string();
mock_state
Expand All @@ -715,7 +726,7 @@ mod test {
.await;

let msg2_time = Utc::now()
.checked_sub_signed(OldDuration::from_std(Duration::from_secs(30)).unwrap())
.checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(30)).unwrap())
.unwrap();
let msg2 = "two".to_string();
mock_state
Expand Down Expand Up @@ -803,11 +814,11 @@ mod test {
.unwrap();

let msg1_time = Utc::now()
.checked_sub_signed(OldDuration::from_std(Duration::from_secs(60)).unwrap())
.checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(60)).unwrap())
.unwrap();
let msg1 = ProtoStoredMessage::new(0, inbound_msg_a.dht_header.clone(), inbound_msg_a.body, msg1_time);
let msg2_time = Utc::now()
.checked_sub_signed(OldDuration::from_std(Duration::from_secs(30)).unwrap())
.checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(30)).unwrap())
.unwrap();
let msg2 = ProtoStoredMessage::new(0, inbound_msg_b.dht_header, inbound_msg_b.body, msg2_time);

Expand All @@ -822,7 +833,7 @@ mod test {
)
.dht_header;
let msg_clear_time = Utc::now()
.checked_sub_signed(OldDuration::from_std(Duration::from_secs(120)).unwrap())
.checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(120)).unwrap())
.unwrap();
let msg_clear = ProtoStoredMessage::new(0, clear_header, clear_msg, msg_clear_time);
let mut message = DecryptedDhtMessage::succeeded(
Expand Down Expand Up @@ -891,4 +902,76 @@ mod test {

assert_eq!(last_saf_received, msg2_time);
}

#[runtime::test]
async fn stored_at_in_future() {
let spy = service_spy();
let (requester, _) = create_store_and_forward_mock();

let peer_manager = build_peer_manager();
let (oms_tx, _) = mpsc::channel(1);

let node_identity = make_node_identity();

let msg_a = wrap_in_envelope_body!(&b"A".to_vec()).to_encoded_bytes();
let inbound_msg_a = make_dht_inbound_message(&node_identity, msg_a, DhtMessageFlags::ENCRYPTED, true, false);
peer_manager
.add_peer(Clone::clone(&*inbound_msg_a.source_peer))
.await
.unwrap();

let msg1 = ProtoStoredMessage::new(
0,
inbound_msg_a.dht_header.clone(),
inbound_msg_a.body,
Utc::now() + chrono::Duration::days(1),
);
let mut message = DecryptedDhtMessage::succeeded(
wrap_in_envelope_body!(StoredMessagesResponse {
messages: vec![msg1.clone()],
request_id: 123,
response_type: 0
}),
None,
make_dht_inbound_message(
&node_identity,
b"Stored message".to_vec(),
DhtMessageFlags::ENCRYPTED,
true,
false,
),
);
message.dht_header.message_type = DhtMessageType::SafStoredMessages;

let (mut dht_requester, mock) = create_dht_actor_mock(1);
task::spawn(mock.run());

let (saf_response_signal_sender, _) = mpsc::channel(1);

let task = MessageHandlerTask::new(
Default::default(),
spy.to_service::<PipelineError>(),
requester,
dht_requester.clone(),
peer_manager,
OutboundMessageRequester::new(oms_tx),
node_identity,
message,
saf_response_signal_sender,
);

task.run().await.unwrap();
let requests = spy.take_requests();
// Message was discarded
assert_eq!(spy.call_count(), 0);
assert_eq!(requests.len(), 0);

let last_saf_received = dht_requester
.get_metadata::<DateTime<Utc>>(DhtMetadataKey::LastSafMessageReceived)
.await
.unwrap();

// LastSafMessageReceived was not set at all
assert!(last_saf_received.is_none());
}
}

0 comments on commit b516fe0

Please sign in to comment.