Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(de-)serializes shred headers through a Cursor #24876

Merged
merged 2 commits into from
May 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 76 additions & 101 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub use crate::{
};
use {
crate::blockstore::MAX_DATA_SHREDS_PER_SLOT,
bincode::config::Options,
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize},
Expand All @@ -68,7 +67,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
std::{fmt::Debug, mem::size_of, ops::RangeInclusive},
std::{fmt::Debug, io::Cursor, mem::size_of, ops::RangeInclusive},
thiserror::Error,
};

Expand Down Expand Up @@ -229,34 +228,6 @@ impl ErasureSetId {
}

impl Shred {
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
where
T: Deserialize<'de>,
{
let end = std::cmp::min(*index + size, buf.len());
let ret = bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize(&buf[*index..end])?;
*index += size;
Ok(ret)
}

fn serialize_obj_into<'de, T>(
index: &mut usize,
size: usize,
buf: &'de mut [u8],
obj: &T,
) -> bincode::Result<()>
where
T: Serialize,
{
bincode::serialize_into(&mut buf[*index..*index + size], obj)?;
*index += size;
Ok(())
}

pub fn copy_to_packet(&self, packet: &mut Packet) {
let len = self.payload.len();
packet.data[..len].copy_from_slice(&self.payload[..]);
Expand Down Expand Up @@ -306,24 +277,13 @@ impl Shred {
data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT
}

let mut start = 0;
Self::serialize_obj_into(
&mut start,
SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write common header into shred buffer");

Self::serialize_obj_into(
&mut start,
SIZE_OF_DATA_SHRED_HEADER,
&mut payload,
&data_header,
)
.expect("Failed to write data header into shred buffer");
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &data_header).unwrap();
// TODO: Need to check if data is too large!
payload[start..start + data.len()].copy_from_slice(data);
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SHRED_DATA_OFFSET);
steviez marked this conversation as resolved.
Show resolved Hide resolved
payload[offset..offset + data.len()].copy_from_slice(data);
Self {
common_header,
data_header,
Expand All @@ -333,23 +293,19 @@ impl Shred {
}

pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;

// Shreds should be padded out to SHRED_PAYLOAD_SIZE
// so that erasure generation/recovery works correctly
// But only the data_header.size is stored in blockstore.
payload.resize(SHRED_PAYLOAD_SIZE, 0);
let mut cursor = Cursor::new(&payload[..]);
let common_header: ShredCommonHeader = bincode::deserialize_from(&mut cursor)?;
let (data_header, coding_header) = match common_header.shred_type {
ShredType::Code => {
let coding_header: CodingShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
let coding_header = bincode::deserialize_from(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/10109
payload.truncate(SHRED_PAYLOAD_SIZE);
(DataShredHeader::default(), coding_header)
}
ShredType::Data => {
let data_header: DataShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?;
let data_header = bincode::deserialize_from(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/16602
payload.resize(SHRED_PAYLOAD_SIZE, 0u8);
(data_header, CodingShredHeader::default())
}
};
Expand Down Expand Up @@ -386,24 +342,14 @@ impl Shred {
position,
};
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut start = 0;
Self::serialize_obj_into(
&mut start,
SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write header into shred buffer");
Self::serialize_obj_into(
&mut start,
SIZE_OF_CODING_SHRED_HEADER,
&mut payload,
&coding_header,
)
.expect("Failed to write coding header into shred buffer");
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &coding_header).unwrap();
// Tests may have an empty parity_shard.
if !parity_shard.is_empty() {
payload[SIZE_OF_CODING_SHRED_HEADERS..].copy_from_slice(parity_shard);
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SIZE_OF_CODING_SHRED_HEADERS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - think this could be in a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

payload[offset..].copy_from_slice(parity_shard);
}
Shred {
common_header,
Expand Down Expand Up @@ -491,9 +437,14 @@ impl Shred {
},
};
match shred_type {
ShredType::Code => Ok(shred),
ShredType::Code => {
if shred.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(shred.len()));
}
Ok(shred)
}
ShredType::Data => {
if shred.len() > SHRED_PAYLOAD_SIZE {
if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Expand Down Expand Up @@ -627,24 +578,12 @@ impl Shred {

pub fn set_index(&mut self, index: u32) {
self.common_header.index = index;
Self::serialize_obj_into(
&mut 0,
SIZE_OF_COMMON_SHRED_HEADER,
&mut self.payload,
&self.common_header,
)
.unwrap();
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}

pub fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
Self::serialize_obj_into(
&mut 0,
SIZE_OF_COMMON_SHRED_HEADER,
&mut self.payload,
&self.common_header,
)
.unwrap();
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}

pub fn signature(&self) -> Signature {
Expand Down Expand Up @@ -695,6 +634,8 @@ impl Shred {
if self.is_data() {
self.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT
}
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}

#[cfg(test)]
Expand All @@ -704,17 +645,9 @@ impl Shred {
.flags
.remove(ShredFlags::DATA_COMPLETE_SHRED);
}

// Data header starts after the shred common header
let mut start = SIZE_OF_COMMON_SHRED_HEADER;
let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER;
Self::serialize_obj_into(
&mut start,
size_of_data_shred_header,
&mut self.payload,
&self.data_header,
)
.expect("Failed to write data header into shred buffer");
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}

pub fn data_complete(&self) -> bool {
Expand Down Expand Up @@ -1276,6 +1209,48 @@ mod tests {
);
}

#[test]
fn test_serde_compat_shred_data_empty() {
const SEED: &str = "E3M5hm8yAEB7iPhQxFypAkLqxNeZCTuGBDMa8Jdrghoo";
const PAYLOAD: &str = "nRNFVBEsV9FEM5KfmsCXJsgELRSkCV55drTavdy5aZPnsp\
B8WvsgY99ZuNHDnwkrqe6Lx7ARVmercwugR5HwDcLA9ivKMypk9PNucDPLs67TXWy6k9R\
ozKmy";
let mut rng = {
let seed = <[u8; 32]>::try_from(bs58_decode(SEED)).unwrap();
ChaChaRng::from_seed(seed)
};
let keypair = Keypair::generate(&mut rng);
let mut shred = Shred::new_from_data(
142076266, // slot
21443, // index
51279, // parent_offset
&[], // data
true, // is_last_data
false, // is_last_in_slot
49, // reference_tick
59445, // version
21414, // fec_set_index
);
shred.sign(&keypair);
assert!(shred.verify(&keypair.pubkey()));
assert_matches!(shred.sanitize(), Ok(()));
let payload = bs58_decode(PAYLOAD);
let mut packet = Packet::default();
packet.data[..payload.len()].copy_from_slice(&payload);
packet.meta.size = payload.len();
assert_eq!(shred.bytes_to_store(), payload);
assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap());
assert_eq!(
shred.reference_tick(),
Shred::reference_tick_from_data(&packet.data)
);
assert_eq!(Shred::get_slot_from_packet(&packet), Some(shred.slot()));
assert_eq!(
get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
}

#[test]
fn test_serde_compat_shred_code() {
const SEED: &str = "4jfjh3UZVyaEgvyG9oQmNyFY9yHDmbeH9eUhnBKkrcrN";
Expand Down