Skip to content

Commit cee79d6

Browse files
authored
Merge 166e384 into 73ca073
2 parents 73ca073 + 166e384 commit cee79d6

File tree

4 files changed

+196
-5
lines changed

4 files changed

+196
-5
lines changed

examples/expiring-tags.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
//! This example shows how to create tags that expire after a certain time.
2+
//!
3+
//! We use a prefix so we can distinguish between expiring and normal tags, and
4+
//! then encode the expiry date in the tag name after the prefix, in a format
5+
//! that sorts in the same order as the expiry date.
6+
//!
7+
//! The example creates a number of blobs and protects them directly or indirectly
8+
//! with expiring tags. Watch as the expired tags are deleted and the blobs
9+
//! are removed from the store.
10+
use std::{
11+
ops::Deref,
12+
time::{Duration, SystemTime},
13+
};
14+
15+
use chrono::Utc;
16+
use futures_lite::StreamExt;
17+
use iroh_blobs::{
18+
api::{blobs::AddBytesOptions, Store, Tag},
19+
hashseq::HashSeq,
20+
store::fs::options::{BatchOptions, GcConfig, InlineOptions, Options, PathOptions},
21+
BlobFormat, Hash,
22+
};
23+
use tokio::signal::ctrl_c;
24+
25+
/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
26+
///
27+
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
28+
async fn create_expiring_tag(
29+
store: &Store,
30+
hashes: &[Hash],
31+
prefix: &str,
32+
expiry: SystemTime,
33+
) -> anyhow::Result<()> {
34+
let expiry = chrono::DateTime::<chrono::Utc>::from(expiry);
35+
let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
36+
let tagname = format!("{prefix}-{expiry}");
37+
if hashes.is_empty() {
38+
return Ok(());
39+
} else if hashes.len() == 1 {
40+
let hash = hashes[0];
41+
store.tags().set(&tagname, hash).await?;
42+
} else {
43+
let hs = hashes.iter().copied().collect::<HashSeq>();
44+
store
45+
.add_bytes_with_opts(AddBytesOptions {
46+
data: hs.into(),
47+
format: BlobFormat::HashSeq,
48+
})
49+
.with_named_tag(&tagname)
50+
.await?;
51+
};
52+
println!("Created tag {tagname}");
53+
Ok(())
54+
}
55+
56+
async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow::Result<()> {
57+
let prefix = format!("{prefix}-");
58+
let now = chrono::Utc::now();
59+
let end = format!(
60+
"{}-{}",
61+
prefix,
62+
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
63+
);
64+
if bulk {
65+
// delete all tags with the prefix and an expiry date before now
66+
//
67+
// this should be very efficient, since it is just a single database operation
68+
blobs
69+
.tags()
70+
.delete_range(Tag::from(prefix.clone())..Tag::from(end))
71+
.await?;
72+
} else {
73+
// find tags to delete one by one and then delete them
74+
//
75+
// this allows us to print the tags before deleting them
76+
let mut tags = blobs.tags().list().await?;
77+
let mut to_delete = Vec::new();
78+
while let Some(tag) = tags.next().await {
79+
let tag = tag?.name;
80+
if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) {
81+
let Ok(expiry) = std::str::from_utf8(rest) else {
82+
tracing::warn!("Tag {} does have non utf8 expiry", tag);
83+
continue;
84+
};
85+
let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else {
86+
tracing::warn!("Tag {} does have invalid expiry date", tag);
87+
continue;
88+
};
89+
let expiry = expiry.with_timezone(&Utc);
90+
if expiry < now {
91+
to_delete.push(tag);
92+
}
93+
}
94+
}
95+
for tag in to_delete {
96+
println!("Deleting expired tag {tag}\n");
97+
blobs.tags().delete(tag).await?;
98+
}
99+
}
100+
Ok(())
101+
}
102+
103+
async fn print_store_info(store: &Store) -> anyhow::Result<()> {
104+
let now = chrono::Utc::now();
105+
let mut tags = store.tags().list().await?;
106+
println!(
107+
"Current time: {}",
108+
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
109+
);
110+
println!("Tags:");
111+
while let Some(tag) = tags.next().await {
112+
let tag = tag?;
113+
println!(" {tag:?}");
114+
}
115+
let mut blobs = store.list().stream().await?;
116+
println!("Blobs:");
117+
while let Some(item) = blobs.next().await {
118+
println!(" {}", item?);
119+
}
120+
println!();
121+
Ok(())
122+
}
123+
124+
async fn info_task(store: Store) -> anyhow::Result<()> {
125+
tokio::time::sleep(Duration::from_secs(1)).await;
126+
loop {
127+
print_store_info(&store).await?;
128+
tokio::time::sleep(Duration::from_secs(5)).await;
129+
}
130+
}
131+
132+
async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> {
133+
loop {
134+
delete_expired_tags(&store, prefix, false).await?;
135+
tokio::time::sleep(Duration::from_secs(5)).await;
136+
}
137+
}
138+
139+
#[tokio::main]
140+
async fn main() -> anyhow::Result<()> {
141+
tracing_subscriber::fmt::init();
142+
let path = std::env::current_dir()?.join("blobs");
143+
let options = Options {
144+
path: PathOptions::new(&path),
145+
gc: Some(GcConfig {
146+
interval: Duration::from_secs(10),
147+
}),
148+
inline: InlineOptions::default(),
149+
batch: BatchOptions::default(),
150+
};
151+
let store =
152+
iroh_blobs::store::fs::FsStore::load_with_opts(path.join("blobs.db"), options).await?;
153+
154+
// setup: add some data and tag it
155+
{
156+
// add several blobs and tag them with an expiry date 10 seconds in the future
157+
let batch = store.batch().await?;
158+
let a = batch.add_bytes("blob 1".as_bytes()).await?;
159+
let b = batch.add_bytes("blob 2".as_bytes()).await?;
160+
161+
let expires_at = SystemTime::now()
162+
.checked_add(Duration::from_secs(10))
163+
.unwrap();
164+
create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?;
165+
166+
// add a single blob and tag it with an expiry date 60 seconds in the future
167+
let c = batch.add_bytes("blob 3".as_bytes()).await?;
168+
let expires_at = SystemTime::now()
169+
.checked_add(Duration::from_secs(60))
170+
.unwrap();
171+
create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?;
172+
// batch goes out of scope, so data is only protected by the tags we created
173+
}
174+
175+
// delete expired tags every 5 seconds
176+
let delete_task = tokio::spawn(delete_expired_tags_task(store.deref().clone(), "expiring"));
177+
// print all tags and blobs every 5 seconds
178+
let info_task = tokio::spawn(info_task(store.deref().clone()));
179+
180+
ctrl_c().await?;
181+
delete_task.abort();
182+
info_task.abort();
183+
store.shutdown().await?;
184+
Ok(())
185+
}

src/api/tags.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,22 @@ impl Tags {
5858
Ok(stream.next().await.transpose()?)
5959
}
6060

61-
pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<()> {
61+
pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<TagInfo> {
6262
trace!("{:?}", options);
63+
let info = TagInfo {
64+
name: options.name.clone(),
65+
hash: options.value.hash,
66+
format: options.value.format,
67+
};
6368
self.client.rpc(options).await??;
64-
Ok(())
69+
Ok(info)
6570
}
6671

6772
pub async fn set(
6873
&self,
6974
name: impl AsRef<[u8]>,
7075
value: impl Into<HashAndFormat>,
71-
) -> super::RequestResult<()> {
76+
) -> super::RequestResult<TagInfo> {
7277
self.set_with_opts(SetOptions {
7378
name: Tag::from(name.as_ref()),
7479
value: value.into(),

src/store/fs/meta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ impl Actor {
409409
options: BatchOptions,
410410
) -> anyhow::Result<Self> {
411411
debug!("creating or opening meta database at {}", db_path.display());
412-
let db = match redb::Database::create(db_path) {
412+
let db = match redb::Database::create(&db_path) {
413413
Ok(db) => db,
414414
Err(DatabaseError::UpgradeRequired(1)) => {
415415
return Err(anyhow::anyhow!("migration from v1 no longer supported"));

src/store/fs/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
7+
pub use super::gc::GcConfig;
8+
use super::{meta::raw_outboard_size, temp_name};
89
use crate::Hash;
910

1011
/// Options for directories used by the file store.

0 commit comments

Comments
 (0)