Skip to content

Commit

Permalink
updating the segment attribute to store record_size
Browse files Browse the repository at this point in the history
Signed-off-by: Ankur_Anand <Ankur.Anand@dell.com>
  • Loading branch information
AnkurAnand11 committed Nov 23, 2023
1 parent bf9f2a2 commit e48c56d
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 21 deletions.
12 changes: 6 additions & 6 deletions src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,21 @@ impl IndexRecord {
}
}

fn write_fields(&self, record_size: u64) -> Result<Vec<u8>, BincodeError> {
fn write_fields(&self, record_size: usize) -> Result<Vec<u8>, BincodeError> {
let mut res = vec![];
res.extend_from_slice(&EventCommand::TYPE_CODE.to_be_bytes());
res.extend_from_slice(&((record_size - 8) as i32).to_be_bytes());
let encoded = CONFIG.serialize(&self)?;
let length = encoded.len();
res.extend(encoded);
if res.len() > record_size as usize {
if res.len() > record_size {
return Err(BincodeError::from(ErrorKind::Custom(format!(
"Record size {} exceeds the max size allowed {}",
res.len(),
record_size,
))));
}
let padding = vec![0u8; record_size as usize - length - 8];
let padding = vec![0u8; record_size - length - 8];
res.extend(padding);
Ok(res)
}
Expand Down Expand Up @@ -156,8 +156,8 @@ impl IndexRecord {
pub trait Fields {
fn get_field_values(&self) -> Vec<(&'static str, u64)>;

fn get_record_size(&self) -> u64 {
(4 * 1024) as u64
fn get_record_size() -> usize {
RECORD_SIZE as usize
}
}

Expand Down Expand Up @@ -195,7 +195,7 @@ pub(crate) mod test {
let data = vec![1, 2, 3, 4];
let fields = vec![("hello", 0), ("index", 1), ("stream", 2)];
let record = IndexRecord::new(fields, data.clone());
let encoded = record.write_fields(RECORD_SIZE).expect("serialize record");
let encoded = record.write_fields(RECORD_SIZE as usize).expect("serialize record");
assert_eq!(encoded.len(), RECORD_SIZE as usize);
let decoded = IndexRecord::read_from(&encoded).expect("deserialize record");
assert_eq!(decoded.data, data);
Expand Down
68 changes: 56 additions & 12 deletions src/index/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use async_stream::try_stream;
use futures::stream::Stream;
use snafu::{ensure, Snafu};
use std::io::SeekFrom;
use pravega_wire_protocol::commands::GetSegmentAttributeCommand;
use pravega_wire_protocol::wire_commands::{Replies, Requests};
use crate::segment::raw_client::RawClient;
use tracing::{info};
use crate::util::get_request_id;

#[derive(Debug, Snafu)]
#[snafu(visibility = "pub")]
Expand Down Expand Up @@ -86,6 +91,7 @@ pub struct IndexReader {
factory: ClientFactoryAsync,
meta: SegmentMetadataClient,
segment_reader: AsyncSegmentReaderImpl,
record_size: usize,
}

impl IndexReader {
Expand All @@ -110,11 +116,48 @@ impl IndexReader {
let meta = factory
.create_segment_metadata_client(scoped_segment.clone())
.await;

let controller_client = factory.controller_client();
let endpoint = controller_client
.get_endpoint_for_segment(&scoped_segment)
.await
.expect("get endpoint for segment");
let raw_client = factory.create_raw_client_for_endpoint(endpoint);
let segment_name = scoped_segment.to_string();
let delegation_token = controller_client
.get_or_refresh_delegation_token_for(stream.clone())
.await
.expect("controller error when refreshing token");
let request = Requests::GetSegmentAttribute(GetSegmentAttributeCommand {
request_id: get_request_id(),
segment_name: segment_name.clone(),
attribute_id: 111,
delegation_token: delegation_token,

Check failure on line 135 in src/index/reader.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
});
let reply = raw_client
.send_request(&request)
.await
.expect("update segment attribute");

let record_size = match reply {
Replies::SegmentAttribute(cmd) =>{
if cmd.value == i64::MIN {
info!("Segment attribute for record_size is not set.Falling back to default RECORD_SIZE = {:?}", RECORD_SIZE);
RECORD_SIZE as usize
} else {
cmd.value as usize
}},
_ => {
info!("get segment attribute for record_size failed due to {:?}", reply);
info!("Falling back to default RECORD_SIZE = {:?}", RECORD_SIZE);
RECORD_SIZE as usize }
};
IndexReader {
stream,
factory,
meta,
segment_reader,
record_size,
}
}

Expand All @@ -127,7 +170,7 @@ impl IndexReader {
/// stream, the first record data will be returned.
/// If the value of searching field is larger than the latest Record, a FieldNotFound error will be returned.
pub async fn search_offset(&self, field: (&'static str, u64)) -> Result<u64, IndexReaderError> {
const RECORD_SIZE_SIGNED: i64 = RECORD_SIZE as i64;
let record_size_signed: i64 = self.record_size as i64;

let target_key = IndexRecord::hash_key_to_u128(field.0);
let target_value = field.1;
Expand All @@ -139,13 +182,13 @@ impl IndexReader {
msg: format!("error when fetching tail offset: {:?}", e),
})? as i64;
let mut start = 0;
let num_of_record = (tail - head) / RECORD_SIZE_SIGNED;
let num_of_record = (tail - head) / record_size_signed;
let mut end = num_of_record - 1;

while start <= end {
let mid = start + (end - start) / 2;
let record = self
.read_record_from_random_offset((head + mid * RECORD_SIZE_SIGNED) as u64)
.read_record_from_random_offset((head + mid * record_size_signed) as u64)
.await?;

if let Some(e) = record.fields.iter().find(|&e| e.0 == target_key) {
Expand All @@ -169,7 +212,7 @@ impl IndexReader {
msg: format!("key/value: {}/{}", field.0, field.1),
})
} else {
Ok((head + start * RECORD_SIZE_SIGNED) as u64)
Ok((head + start * record_size_signed) as u64)
}
}

Expand All @@ -187,32 +230,33 @@ impl IndexReader {
end_offset: u64,
) -> Result<impl Stream<Item = Result<Vec<u8>, IndexReaderError>> + 'stream, IndexReaderError> {
ensure!(
start_offset % (RECORD_SIZE) == 0,
start_offset % (self.record_size as u64) == 0,
InvalidOffset {
msg: format!(
"Start offset {} is invalid as it cannot be divided by the record size {}",
start_offset, RECORD_SIZE
start_offset, self.record_size
)
}
);
if end_offset != u64::MAX {
ensure!(
end_offset % (RECORD_SIZE) == 0,
end_offset % (self.record_size as u64) == 0,
InvalidOffset {
msg: format!(
"End offset {} is invalid as it cannot be divided by the record size {}",
end_offset, RECORD_SIZE
end_offset, self.record_size
)
}
);
}
Ok(try_stream! {
let stream = self.stream.clone();
let record_size = self.record_size;
let mut byte_reader = self.factory.create_byte_reader(stream).await;
let mut num_of_records_to_read = if end_offset == u64::MAX {
u64::MAX
} else {
(end_offset - start_offset) / (RECORD_SIZE as u64)
(end_offset - start_offset) / (record_size as u64)
};
byte_reader.seek(SeekFrom::Start(start_offset))
.await
Expand All @@ -221,7 +265,7 @@ impl IndexReader {
})?;
loop {
let mut buf = vec!{};
let mut size_to_read = RECORD_SIZE as usize;
let mut size_to_read = record_size as usize;
while size_to_read != 0 {
let mut tmp_buf = vec![0; size_to_read];
let size = byte_reader
Expand Down Expand Up @@ -257,7 +301,7 @@ impl IndexReader {
/// Data in the last record.
pub async fn last_record_data(&self) -> Result<Vec<u8>, IndexReaderError> {
let last_offset = self.tail_offset().await?;
let last_record_offset = last_offset - RECORD_SIZE;
let last_record_offset = last_offset - self.record_size as u64;
let last_record = self.read_record_from_random_offset(last_record_offset).await?;
Ok(last_record.data)
}
Expand Down Expand Up @@ -291,7 +335,7 @@ impl IndexReader {
) -> Result<IndexRecord, IndexReaderError> {
let segment_read_cmd = self
.segment_reader
.read(offset as i64, RECORD_SIZE as i32)
.read(offset as i64, self.record_size as i32)
.await
.map_err(|e| IndexReaderError::Internal {
msg: format!("segment reader error: {:?}", e),
Expand Down
46 changes: 43 additions & 3 deletions src/index/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@

use crate::byte::ByteWriter;
use crate::client_factory::ClientFactoryAsync;
use crate::index::{Fields, IndexRecord, RECORD_SIZE};
use crate::index::{Fields, IndexRecord};

use pravega_client_shared::ScopedStream;

use bincode2::Error as BincodeError;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use std::fmt::Debug;
use std::marker::PhantomData;
use pravega_wire_protocol::commands::{UpdateSegmentAttributeCommand};
use pravega_wire_protocol::wire_commands::{Requests};
use crate::segment::raw_client::RawClient;
use crate::util::get_request_id;

const MAX_FIELDS_SIZE: usize = 100;

Expand Down Expand Up @@ -95,14 +99,50 @@ impl<T: Fields + PartialOrd + PartialEq + Debug> IndexWriter<T> {
let mut byte_writer = factory.create_byte_writer(stream.clone()).await;
byte_writer.seek_to_tail().await;

let segments = factory
.controller_client()
.get_current_segments(&stream)
.await
.expect("get current segments");
assert_eq!(1, segments.key_segment_map.len(), "Index stream is configured with more than one segment");
let segment_with_range = segments.key_segment_map.iter().next().unwrap().1.clone();
let segment_name = segment_with_range.scoped_segment;

let controller_client = factory.controller_client();

let endpoint = controller_client
.get_endpoint_for_segment(&segment_name)
.await
.expect("get endpoint for segment");

let raw_client = factory.create_raw_client_for_endpoint(endpoint);
let delegation_token = controller_client
.get_or_refresh_delegation_token_for(stream.clone())
.await
.expect("controller error when refreshing token");
let segment_name = segment_name.to_string();
let uid = 111;
let request = Requests::UpdateSegmentAttribute(UpdateSegmentAttributeCommand {
request_id: get_request_id(),
segment_name: segment_name.clone(),
attribute_id: uid,
new_value: T::get_record_size() as i64,
expected_value: i64::MIN,
delegation_token: delegation_token,

Check failure on line 131 in src/index/writer.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
});
let reply = raw_client

Check failure on line 133 in src/index/writer.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused variable: `reply`
.send_request(&request)
.await
.expect("update segment attribute");

let index_reader = factory.create_index_reader(stream.clone()).await;
let tail_offset = index_reader.tail_offset().await.expect("get tail offset");
let head_offset = index_reader
.head_offset()
.await
.expect("get readable head offset");
let hashed_fields = if head_offset != tail_offset {
let prev_record_offset = tail_offset - RECORD_SIZE;
let prev_record_offset = tail_offset - T::get_record_size() as u64;
let record = index_reader
.read_record_from_random_offset(prev_record_offset)
.await
Expand Down Expand Up @@ -164,7 +204,7 @@ impl<T: Fields + PartialOrd + PartialEq + Debug> IndexWriter<T> {
}

async fn append_internal(&mut self, data: Vec<u8>) -> Result<(), IndexWriterError> {
let record_size = self.fields.as_ref().unwrap().get_record_size();
let record_size = T::get_record_size();
let fields_list = self.fields.as_ref().unwrap().get_field_values();
let record = IndexRecord::new(fields_list, data);
let encoded = record.write_fields(record_size).context(InvalidData {})?;
Expand Down

0 comments on commit e48c56d

Please sign in to comment.