Skip to content

Commit

Permalink
Fix object store chunk subject names
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
Co-authored-by: Casper Beyer <caspervonb@pm.me>
  • Loading branch information
Jarema and caspervonb committed Nov 8, 2022
1 parent d8b14da commit b6825ba
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ pub(crate) fn is_valid_object_name(object_name: &str) -> bool {
OBJECT_NAME_RE.is_match(object_name)
}

pub(crate) fn sanitize_object_name(object_name: &str) -> String {
object_name.replace(['.', ' '], "_")
pub(crate) fn enocde_object_name(object_name: &str) -> String {
base64::encode_config(object_name, base64::URL_SAFE)
}

/// Configuration values for object store buckets.
Expand Down Expand Up @@ -154,7 +154,7 @@ impl ObjectStore {
let mut headers = HeaderMap::default();
headers.insert(NATS_ROLLUP, HeaderValue::from_str(ROLLUP_SUBJECT)?);

let subject = format!("$O.{}.M.{}", &self.name, &object_name);
let subject = format!("$O.{}.M.{}", &self.name, enocde_object_name(object_name));

self.stream
.context
Expand Down Expand Up @@ -185,7 +185,7 @@ impl ObjectStore {
/// ```
pub async fn info<T: AsRef<str>>(&self, object_name: T) -> Result<ObjectInfo, Error> {
let object_name = object_name.as_ref();
let object_name = sanitize_object_name(object_name);
let object_name = enocde_object_name(object_name);
if !is_valid_object_name(&object_name) {
return Err(Box::new(io::Error::new(
io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -234,15 +234,15 @@ impl ObjectStore {
{
let object_meta: ObjectMeta = meta.into();

let object_name = sanitize_object_name(&object_meta.name);
if !is_valid_object_name(&object_name) {
let encoded_object_name = enocde_object_name(&object_meta.name);
if !is_valid_object_name(&encoded_object_name) {
return Err(Box::new(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid object name",
)));
}
// Fetch any existing object info, if there is any for later use.
let maybe_existing_object_info = match self.info(&object_name).await {
let maybe_existing_object_info = match self.info(&encoded_object_name).await {
Ok(object_info) => Some(object_info),
Err(_) => None,
};
Expand Down Expand Up @@ -276,10 +276,9 @@ impl ObjectStore {
.await?;
}
let digest = context.finish();
// Create a random subject prefixed with the object stream name.
let subject = format!("$O.{}.M.{}", &self.name, &object_name);
let subject = format!("$O.{}.M.{}", &self.name, &encoded_object_name);
let object_info = ObjectInfo {
name: object_name,
name: object_meta.name,
description: object_meta.description,
link: object_meta.link,
bucket: self.name.clone(),
Expand All @@ -298,6 +297,7 @@ impl ObjectStore {
headers.insert(NATS_ROLLUP, ROLLUP_SUBJECT.parse::<HeaderValue>()?);
let data = serde_json::to_vec(&object_info)?;

// publish meta.
self.stream
.context
.publish_with_headers(subject, headers, data.into())
Expand Down

0 comments on commit b6825ba

Please sign in to comment.