Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose reader positions #465

Merged
merged 12 commits into from
Feb 23, 2024
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ name = "pravega-client"
# - README.md
# - Update CHANGELOG.md.
# - Create git tag.
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
readme = "README.md"
documentation = "https://docs.rs/pravega-client/0.3.6/"
documentation = "https://docs.rs/pravega-client/0.3.7/"
homepage = "https://www.pravega.io/"
repository = "https://github.com/pravega/pravega-client-rust"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion auth/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-client-auth"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion book/src/index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Pravega Rust Client and Pravega language bindings user guide.

Welcome to the Pravega Rust client and Pravega language bindings user guide! This book is a companion to
[Pravega Rust Client](https://docs.rs/pravega-client/0.3.6/pravega_client/) and the supported language bindings over the Pravega Rust
[Pravega Rust Client](https://docs.rs/pravega-client/0.3.7/pravega_client/) and the supported language bindings over the Pravega Rust
Client.

This book contains examples and documentation to explain all of Pravega Rust client's use cases in detail.
Expand Down
2 changes: 1 addition & 1 deletion channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-client-channel"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion config/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-client-config"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion connection_pool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-connection-pool"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion controller-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-controller-client"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
build = "build.rs"
categories = ["network-programming"]
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-client-examples"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion golang/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega_client_c"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega", "golang"]
Expand Down
10 changes: 6 additions & 4 deletions golang/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,11 @@ pub unsafe extern "C" fn stream_manager_new(
client_config: ClientConfigMapping,
err: Option<&mut Buffer>,
) -> *mut StreamManager {
match catch_unwind(|| {
let result = catch_unwind(|| {
let config = client_config.to_client_config();
StreamManager::new(config)
}) {
});
match result {
Ok(manager) => {
clear_error();
Box::into_raw(Box::new(manager))
Expand Down Expand Up @@ -157,10 +158,11 @@ pub unsafe extern "C" fn stream_manager_create_stream(
err: Option<&mut Buffer>,
) -> bool {
let stream_manager = &*manager;
match catch_unwind(AssertUnwindSafe(move || {
let res = catch_unwind(AssertUnwindSafe(move || {
let stream_cfg = stream_config.to_stream_configuration();
stream_manager.create_stream(stream_cfg)
})) {
}));
match res {
Ok(result) => match result {
Ok(val) => val,
Err(e) => {
Expand Down
4 changes: 2 additions & 2 deletions macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

[package]
name = "pravega-client-macros"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
readme = "README.md"
documentation = "https://docs.rs/pravega-client/0.3.5/"
documentation = "https://docs.rs/pravega-client/0.3.7/"
homepage = "https://www.pravega.io/"
repository = "https://github.com/pravega/pravega-client-rust"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
name = "pravega_nodejs"
readme = "README.md"
repository = "https://github.com/pravega/pravega-client-rust"
version = "0.3.6"
version = "0.3.7"

[lib]
crate-type = ["cdylib"]
Expand Down
2 changes: 1 addition & 1 deletion nodejs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pravega/pravega",
"version": "0.3.6",
"version": "0.3.7",
"description": "Pravega client",
"main": "dist/cjs/stream_manager.js",
"module": "dist/esm/index.js",
Expand Down
2 changes: 1 addition & 1 deletion pravegactl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravegactl"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
2 changes: 1 addition & 1 deletion retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ license = "Apache-2.0"
name = "pravega-client-retry"
readme = "README.md"
repository = "https://github.com/pravega/pravega-client-rust"
version = "0.3.6"
version = "0.3.7"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
2 changes: 1 addition & 1 deletion shared/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pravega-client-shared"
version = "0.3.6"
version = "0.3.7"
edition = "2018"
categories = ["network-programming"]
keywords = ["streaming", "client", "pravega"]
Expand Down
61 changes: 57 additions & 4 deletions src/event/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type SegmentReadResult = Result<SegmentDataBuffer, ReaderErrorWithOffset>;

const REBALANCE_INTERVAL: Duration = Duration::from_secs(10);

const UPDATE_OFFSET_INTERVAL: Duration = Duration::from_secs(3);

const READ_BUFFER_SIZE: i32 = 8 * 1024 * 1024; // max size for a single Event

cfg_if::cfg_if! {
Expand Down Expand Up @@ -259,6 +261,7 @@ impl EventReader {
slice_stop_reading,
last_segment_release: Instant::now(),
last_segment_acquire: Instant::now(),
last_offset_update: Instant::now(),
reader_offline: false,
},
rg_state,
Expand Down Expand Up @@ -306,6 +309,7 @@ impl EventReader {
self.meta.last_segment_release = Instant::now();
} else {
//send an indication to the waiting rx that slice has been returned.
debug!(" slice return to rx success {:?} ", slice.meta);
if let Some(tx) = slice.slice_return_tx.take() {
if let Err(_e) = tx.send(Some(slice.meta.clone())) {
warn!(
Expand All @@ -317,6 +321,29 @@ impl EventReader {
panic!("This is unexpected, No sender for SegmentSlice present.");
}
}

//Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
.update_reader_positions(&self.id, offset_map)
.await
.context(StateError {})?;

self.meta.last_offset_update = Instant::now();
}
Ok(())
}

Expand Down Expand Up @@ -525,6 +552,28 @@ impl EventReader {
},
});
}
//Update latest reader positions if UPDATE_OFFSET_INTERVAL is elapsed
if self.meta.last_offset_update.elapsed() > UPDATE_OFFSET_INTERVAL {
let mut offset_map: HashMap<ScopedSegment, Offset> = HashMap::new();
for metadata in self.meta.slices.values() {
offset_map.insert(
ScopedSegment::from(metadata.scoped_segment.as_str()),
Offset::new(metadata.read_offset),
);
}
debug!(
" update reader position {:?} for reader {:?} ",
offset_map, self.id
);
self.rg_state
.lock()
.await
.update_reader_positions(&self.id, offset_map)
.await
.context(StateError {})?;

self.meta.last_offset_update = Instant::now();
}
info!("acquiring segment for reader {:?}", self.id);
// Check if newer segments should be acquired.
if self.meta.last_segment_acquire.elapsed() > REBALANCE_INTERVAL {
Expand Down Expand Up @@ -603,7 +652,7 @@ impl EventReader {

info!(
"segment slice for {:?} is ready for consumption by reader {}",
slice_meta.scoped_segment, self.id,
slice_meta, self.id,
);

Ok(Some(SegmentSlice {
Expand Down Expand Up @@ -803,6 +852,7 @@ struct ReaderState {
slice_stop_reading: HashMap<ScopedSegment, oneshot::Sender<()>>,
last_segment_release: Instant,
last_segment_acquire: Instant,
last_offset_update: Instant,
reader_offline: bool,
}
impl Default for ReaderState {
Expand All @@ -816,6 +866,7 @@ impl Default for ReaderState {
slice_stop_reading: HashMap::new(),
last_segment_release: Instant::now(),
last_segment_acquire: Instant::now(),
last_offset_update: Instant::now(),
reader_offline: false,
}
}
Expand Down Expand Up @@ -870,8 +921,8 @@ impl ReaderState {
match self.slices.remove(&segment) {
Some(meta) => {
debug!(
"Segment slice {:?} has not been dished out for consumption",
&segment
"Segment slice {:?} has not been dished out for consumption {:?} meta",
&segment, meta
);
Some(meta)
}
Expand Down Expand Up @@ -1432,7 +1483,6 @@ mod tests {
.return_once(move |_| Ok(1 as isize));
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock.expect_check_online().return_const(true);

// mock rg_state.assign_segment_to_reader
let res: Result<Option<ScopedSegment>, ReaderGroupStateError> =
Ok(Some(ScopedSegment::from("scope/test/1.#epoch.0")));
Expand Down Expand Up @@ -1536,6 +1586,9 @@ mod tests {
.return_once(move |_| Ok(0 as isize));
rg_mock.expect_check_online().return_const(true);
rg_mock.expect_remove_reader().return_once(move |_, _| Ok(()));
rg_mock
.expect_update_reader_positions()
.return_once(move |_, _| Ok(()));
// create a new Event Reader with the segment slice data.
let mut reader = EventReader::init_event_reader(
Arc::new(Mutex::new(rg_mock)),
Expand Down
36 changes: 32 additions & 4 deletions src/event/reader_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::client_factory::ClientFactoryAsync;
use crate::event::reader::EventReader;
use crate::event::reader_group_state::{Offset, ReaderGroupStateError};

use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream};
use pravega_client_shared::{Reader, Scope, ScopedSegment, ScopedStream, StreamCut};

use crate::sync::table::TableError;
use crate::sync::Table;
Expand All @@ -24,6 +24,7 @@ use snafu::Snafu;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;

cfg_if::cfg_if! {
if #[cfg(test)] {
Expand Down Expand Up @@ -214,6 +215,33 @@ impl ReaderGroup {
pub fn get_managed_streams(&self) -> Vec<ScopedStream> {
self.config.get_streams()
}

/// Return the latest StreamCut in ReaderGroup.

pub async fn get_streamcut(&self) -> StreamCut {
let positions = self.state.lock().await.get_streamcut().await;
let streamcut_map = positions.iter().fold(HashMap::new(), |mut acc, (seg, offset)| {
let scoped_stream = seg.get_scoped_stream();
let segment_id = seg.segment.number;

// Update the segment_offset_map for the corresponding scoped_stream
let entry: &mut HashMap<i64, i64> = acc.entry(scoped_stream).or_default();
entry.insert(segment_id, offset.read);
acc
});
let streamcuts: Vec<StreamCut> = streamcut_map
.into_iter()
.map(|(scoped_stream, segment_offset_map)| StreamCut::new(scoped_stream, segment_offset_map))
.collect();

if let Some(first_streamcut) = streamcuts.first() {
first_streamcut.clone()
} else {
error!("Expected a StreamCut but none found");
let streamcut: Option<StreamCut> = streamcuts.first().cloned();
streamcut.expect("Expected a StreamCut but none found")
}
}
}

/// Specifies the ReaderGroupConfig.
Expand Down Expand Up @@ -443,17 +471,17 @@ pub struct StreamCutV1 {
}

impl StreamCutV1 {
pub(crate) fn new(stream: ScopedStream, positions: HashMap<ScopedSegment, i64>) -> Self {
pub fn new(stream: ScopedStream, positions: HashMap<ScopedSegment, i64>) -> Self {
StreamCutV1 { stream, positions }
}

/// gets a clone of the internal scoped stream
pub(crate) fn get_stream(&self) -> ScopedStream {
pub fn get_stream(&self) -> ScopedStream {
self.stream.clone()
}

/// gets a clone of the internal positions
pub(crate) fn get_positions(&self) -> HashMap<ScopedSegment, i64> {
pub fn get_positions(&self) -> HashMap<ScopedSegment, i64> {
self.positions.clone()
}
}
Expand Down
Loading
Loading