Skip to content

Commit

Permalink
(de-)serializes shred headers through a Cursor
Browse files Browse the repository at this point in the history
Current serialize/de-serialize code for shreds manually tracks offsets
into the payload; This can instead be done with std::io::Cursor.
https://github.com/solana-labs/solana/blob/e812430e2/ledger/src/shred.rs#L232-L258
  • Loading branch information
behzadnouri committed May 1, 2022
1 parent a4c7ada commit fff99c7
Showing 1 changed file with 34 additions and 101 deletions.
135 changes: 34 additions & 101 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ pub use crate::{
};
use {
crate::blockstore::MAX_DATA_SHREDS_PER_SLOT,
bincode::config::Options,
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize},
Expand All @@ -68,7 +67,7 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
},
std::{fmt::Debug, mem::size_of, ops::RangeInclusive},
std::{fmt::Debug, io::Cursor, mem::size_of, ops::RangeInclusive},
thiserror::Error,
};

Expand Down Expand Up @@ -229,34 +228,6 @@ impl ErasureSetId {
}

impl Shred {
fn deserialize_obj<'de, T>(index: &mut usize, size: usize, buf: &'de [u8]) -> bincode::Result<T>
where
T: Deserialize<'de>,
{
let end = std::cmp::min(*index + size, buf.len());
let ret = bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize(&buf[*index..end])?;
*index += size;
Ok(ret)
}

fn serialize_obj_into<'de, T>(
index: &mut usize,
size: usize,
buf: &'de mut [u8],
obj: &T,
) -> bincode::Result<()>
where
T: Serialize,
{
bincode::serialize_into(&mut buf[*index..*index + size], obj)?;
*index += size;
Ok(())
}

pub fn copy_to_packet(&self, packet: &mut Packet) {
let len = self.payload.len();
packet.data[..len].copy_from_slice(&self.payload[..]);
Expand Down Expand Up @@ -306,24 +277,13 @@ impl Shred {
data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT
}

let mut start = 0;
Self::serialize_obj_into(
&mut start,
SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write common header into shred buffer");

Self::serialize_obj_into(
&mut start,
SIZE_OF_DATA_SHRED_HEADER,
&mut payload,
&data_header,
)
.expect("Failed to write data header into shred buffer");
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &data_header).unwrap();
// TODO: Need to check if data is too large!
payload[start..start + data.len()].copy_from_slice(data);
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SHRED_DATA_OFFSET);
payload[offset..offset + data.len()].copy_from_slice(data);
Self {
common_header,
data_header,
Expand All @@ -333,23 +293,19 @@ impl Shred {
}

pub fn new_from_serialized_shred(mut payload: Vec<u8>) -> Result<Self, Error> {
let mut start = 0;
let common_header: ShredCommonHeader =
Self::deserialize_obj(&mut start, SIZE_OF_COMMON_SHRED_HEADER, &payload)?;

// Shreds should be padded out to SHRED_PAYLOAD_SIZE
// so that erasure generation/recovery works correctly
// But only the data_header.size is stored in blockstore.
payload.resize(SHRED_PAYLOAD_SIZE, 0);
let mut cursor = Cursor::new(&payload[..]);
let common_header: ShredCommonHeader = bincode::deserialize_from(&mut cursor)?;
let (data_header, coding_header) = match common_header.shred_type {
ShredType::Code => {
let coding_header: CodingShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_CODING_SHRED_HEADER, &payload)?;
let coding_header = bincode::deserialize_from(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/10109
payload.truncate(SHRED_PAYLOAD_SIZE);
(DataShredHeader::default(), coding_header)
}
ShredType::Data => {
let data_header: DataShredHeader =
Self::deserialize_obj(&mut start, SIZE_OF_DATA_SHRED_HEADER, &payload)?;
let data_header = bincode::deserialize_from(&mut cursor)?;
// see: https://github.com/solana-labs/solana/pull/16602
payload.resize(SHRED_PAYLOAD_SIZE, 0u8);
(data_header, CodingShredHeader::default())
}
};
Expand Down Expand Up @@ -386,24 +342,14 @@ impl Shred {
position,
};
let mut payload = vec![0; SHRED_PAYLOAD_SIZE];
let mut start = 0;
Self::serialize_obj_into(
&mut start,
SIZE_OF_COMMON_SHRED_HEADER,
&mut payload,
&common_header,
)
.expect("Failed to write header into shred buffer");
Self::serialize_obj_into(
&mut start,
SIZE_OF_CODING_SHRED_HEADER,
&mut payload,
&coding_header,
)
.expect("Failed to write coding header into shred buffer");
let mut cursor = Cursor::new(&mut payload[..]);
bincode::serialize_into(&mut cursor, &common_header).unwrap();
bincode::serialize_into(&mut cursor, &coding_header).unwrap();
// Tests may have an empty parity_shard.
if !parity_shard.is_empty() {
payload[SIZE_OF_CODING_SHRED_HEADERS..].copy_from_slice(parity_shard);
let offset = cursor.position() as usize;
debug_assert_eq!(offset, SIZE_OF_CODING_SHRED_HEADERS);
payload[offset..].copy_from_slice(parity_shard);
}
Shred {
common_header,
Expand Down Expand Up @@ -491,9 +437,14 @@ impl Shred {
},
};
match shred_type {
ShredType::Code => Ok(shred),
ShredType::Code => {
if shred.len() != SHRED_PAYLOAD_SIZE {
return Err(Error::InvalidPayloadSize(shred.len()));
}
Ok(shred)
}
ShredType::Data => {
if shred.len() > SHRED_PAYLOAD_SIZE {
if !(SHRED_DATA_OFFSET..SHRED_PAYLOAD_SIZE).contains(&shred.len()) {
return Err(Error::InvalidPayloadSize(shred.len()));
}
shred.resize(SHRED_PAYLOAD_SIZE, 0u8);
Expand Down Expand Up @@ -627,24 +578,12 @@ impl Shred {

pub fn set_index(&mut self, index: u32) {
self.common_header.index = index;
Self::serialize_obj_into(
&mut 0,
SIZE_OF_COMMON_SHRED_HEADER,
&mut self.payload,
&self.common_header,
)
.unwrap();
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}

pub fn set_slot(&mut self, slot: Slot) {
self.common_header.slot = slot;
Self::serialize_obj_into(
&mut 0,
SIZE_OF_COMMON_SHRED_HEADER,
&mut self.payload,
&self.common_header,
)
.unwrap();
bincode::serialize_into(&mut self.payload[..], &self.common_header).unwrap();
}

pub fn signature(&self) -> Signature {
Expand Down Expand Up @@ -695,6 +634,8 @@ impl Shred {
if self.is_data() {
self.data_header.flags |= ShredFlags::LAST_SHRED_IN_SLOT
}
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}

#[cfg(test)]
Expand All @@ -704,17 +645,9 @@ impl Shred {
.flags
.remove(ShredFlags::DATA_COMPLETE_SHRED);
}

// Data header starts after the shred common header
let mut start = SIZE_OF_COMMON_SHRED_HEADER;
let size_of_data_shred_header = SIZE_OF_DATA_SHRED_HEADER;
Self::serialize_obj_into(
&mut start,
size_of_data_shred_header,
&mut self.payload,
&self.data_header,
)
.expect("Failed to write data header into shred buffer");
let buffer = &mut self.payload[SIZE_OF_COMMON_SHRED_HEADER..];
bincode::serialize_into(buffer, &self.data_header).unwrap();
}

pub fn data_complete(&self) -> bool {
Expand Down

0 comments on commit fff99c7

Please sign in to comment.