Skip to content

Commit

Permalink
Implement object store
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb committed Dec 6, 2021
1 parent e69ae3a commit ffab098
Show file tree
Hide file tree
Showing 6 changed files with 695 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ libc = "0.2.98"
winapi = { version = "0.3.9", features = ["winsock2"] }

[dev-dependencies]
rand = "0.8.4"
criterion = { version ="0.3.5", features = ["html_reports"] }
env_logger = "0.9.0"
historian = "4.0.4"
Expand Down
24 changes: 24 additions & 0 deletions src/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ const ORDERED_IDLE_HEARTBEAT: Duration = Duration::from_nanos(5_000_000_000);

// TODO re-organize this into a jetstream directory
pub use crate::jetstream_kv::*;
pub use crate::jetstream_object::*;
pub use crate::jetstream_push_subscription::PushSubscription;
pub use crate::jetstream_types::*;

Expand Down Expand Up @@ -1459,6 +1460,29 @@ impl JetStream {
self.js_request(&subject, b"")
}

/// Purge stream messages matching a subject.
pub fn purge_stream_subject<S: AsRef<str>>(
&self,
stream: S,
filter_subject: &str,
) -> io::Result<PurgeResponse> {
let stream: &str = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}

let subject = format!("{}STREAM.PURGE.{}", self.api_prefix(), stream);
let request = serde_json::to_vec(&PurgeRequest {
filter: Some(filter_subject.to_string()),
..Default::default()
})?;

self.js_request(&subject, &request)
}

/// Get a message from a stream.
pub fn get_message<S: AsRef<str>>(&self, stream: S, seq: u64) -> io::Result<StreamMessage> {
let stream: &str = stream.as_ref();
Expand Down
Loading

0 comments on commit ffab098

Please sign in to comment.