Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cron api rust): Implement Cron Events Rust #145

Merged
merged 62 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0f77607
deps: add dashmap crate
saibatizoku Feb 22, 2024
c80a1fd
fix: use dashmap instead of hashmap
saibatizoku Feb 22, 2024
9517caf
chore: update project dictionary
saibatizoku Feb 22, 2024
a10fcdc
fix: update cron implementation to use dashmap
saibatizoku Feb 22, 2024
ae40a21
fix: simpler
saibatizoku Feb 22, 2024
aa69efd
refactor: store OnCronEvent in dashmap
saibatizoku Feb 26, 2024
6524e8d
chore(dict): update project dictionary
saibatizoku Feb 26, 2024
86c318a
feat(deps): add once_cell and saffron crates
saibatizoku Feb 27, 2024
6070fa5
refactor(cron): refactor state into its own module
saibatizoku Feb 27, 2024
2a10ed8
feat: CronTagged impls Hash, PartialEq, Eq, PartialOrd, Ord
saibatizoku Feb 28, 2024
dfab97c
chore: cleanup types
saibatizoku Feb 28, 2024
f0b30db
feat(wip): spawn a cron queue task and store its handle
saibatizoku Feb 28, 2024
0a71272
fix: remove todos, return values
saibatizoku Feb 28, 2024
4340ebe
feat(wip): add cron queue channels and message type.
saibatizoku Feb 29, 2024
ee2a28d
feat(cron): OnCronEvent calculates next timestamp for schedule
saibatizoku Feb 29, 2024
8b24067
build(deps): add tokio and chrono crates
saibatizoku Mar 6, 2024
8dcbbea
chore: update project dictionary
saibatizoku Mar 6, 2024
775ce3e
feat: use saffron::Cron to parse schedules and generate next timestamp
saibatizoku Mar 6, 2024
4b7b4ab
fix: set MIN_DOW to 1 to keep compatibility with saffron
saibatizoku Mar 6, 2024
8394a09
feat: add types for cron queue
saibatizoku Mar 6, 2024
d696603
feat: implement cron queue storage functionality
saibatizoku Mar 6, 2024
e3b3af3
fix: update make_delay to use chron and generate CronJobDelay
saibatizoku Mar 6, 2024
9d6643d
refactor: state is managed in threaded async queue
saibatizoku Mar 6, 2024
fed02a4
fix: update Host implementation to use global static state
saibatizoku Mar 6, 2024
5da5afd
fix: implement Ord for CronComponent to be infallible
saibatizoku Mar 8, 2024
f9a2357
fix: cron state can start without queue task
saibatizoku Mar 8, 2024
dd24151
fix: implement Ord for CronTagged to be infallible
saibatizoku Mar 8, 2024
0a8fbe3
fix: typo
saibatizoku Mar 8, 2024
821bfcb
fix: remote commands to catch errors for logging
saibatizoku Mar 8, 2024
a88bb4f
feat(wip): add todo comments for logging
saibatizoku Mar 8, 2024
b87b7e2
build(deps): update chrono crate and types
saibatizoku Mar 8, 2024
312c608
feat: expose cron queue state via functions
saibatizoku Mar 8, 2024
d15bd5c
fix: cleanup
saibatizoku Mar 8, 2024
99b11c2
Merge remote-tracking branch 'origin/main' into feat/cron-api-rust
saibatizoku Mar 11, 2024
8b40d55
build(deps): remove time crate
saibatizoku Mar 11, 2024
0bb96f5
Merge branch 'main' into feat/cron-api-rust
saibatizoku Mar 12, 2024
ce976c1
fix: remove pub(crate) from cron internal state
saibatizoku Mar 12, 2024
a9556d6
Merge remote-tracking branch 'origin/main' into feat/cron-api-rust
saibatizoku Mar 13, 2024
85867c3
Merge remote-tracking branch 'origin/main' into feat/cron-api-rust
saibatizoku Mar 20, 2024
b02a1df
feat: on cron event handles response from module
saibatizoku Mar 25, 2024
4e95a22
feat: add unit tests for cron queue methods
saibatizoku Mar 25, 2024
89444c3
fix(wip): disable cron state tests
saibatizoku Mar 25, 2024
c2bf357
feat: add unit tests for cron queue and cron state methods
saibatizoku Mar 26, 2024
e57b954
fix: delayed crontab does not retrigger
saibatizoku Mar 26, 2024
f09b93c
feat: add triggering mechanism for on-cron events
saibatizoku Mar 26, 2024
ad74186
fix: cron state unit test
saibatizoku Mar 26, 2024
399e644
feat: cron state functions are tested from multiple threads
saibatizoku Mar 26, 2024
fd675bc
feat: trigger queue when removing crontab
saibatizoku Mar 27, 2024
acffc7c
fix: pop_app_queues_and_send returns when all items are sent
saibatizoku Mar 27, 2024
96056dc
feat: add unit tests for cron queue triggering to the hermes queue
saibatizoku Mar 27, 2024
107f776
feat: clear waiting_for task if it has passed when triggering
saibatizoku Mar 27, 2024
b615c0e
chore: refactor code for legibility, expose only needed functions
saibatizoku Mar 27, 2024
696e12a
feat: add unit tests for triggering the cron queue
saibatizoku Mar 28, 2024
b5e034f
fix: triggering the queue removes stale waiting_event
saibatizoku Mar 28, 2024
adc339f
Merge remote-tracking branch 'origin/main' into feat/cron-api-rust
saibatizoku Mar 30, 2024
245bba6
fix: use newtypes instead of type aliases
saibatizoku Apr 2, 2024
698e539
fix: use HashSet instead of BTreeSet for OnCronEvents
saibatizoku Apr 2, 2024
f79ee8b
fix: update cron queue tests
saibatizoku Apr 2, 2024
57f92da
fix: type conversion hides inner value
saibatizoku Apr 2, 2024
dbd7965
fix: typos in doc comments
saibatizoku Apr 2, 2024
5ffaf1d
fix: remove redundant mapping
saibatizoku Apr 2, 2024
0717a7f
Merge branch 'main' into feat/cron-api-rust
minikin Apr 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ codegen
codepoints
coti
crontabs
crontagged
cryptoxide
dashmap
Datelike
dashmap
dbsync
Expand All @@ -38,6 +40,7 @@ dotenv
dotenvy
dotglob
drep
Datelike
dreps
encryptor
Errno
Expand Down
8 changes: 4 additions & 4 deletions hermes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ anyhow = "1.0.71"
blake2b_simd = "1.0.2"
hex-literal = "0.4.1"
thiserror = "1.0.56"
tokio = "1.34.0"
hex = "0.4.3"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
criterion = "0.5.1"
time = "0.3.34"
chrono = "0.4.34"
chrono-tz = "0.8.6"
libtest-mimic = "0.7.0"
crossbeam-queue = "0.3.11"
bip39 = "2.0.0"
Expand All @@ -81,3 +77,7 @@ clap = "4.5.3"
build-info = "0.0.36"
derive_more = { version= "0.99.17", default-features= false }
build-info-build = "0.0.36"
chrono = "0.4.35"
chrono-tz = "0.8.6"
saffron = "0.1.0"
tokio = "1.36.0"
11 changes: 6 additions & 5 deletions hermes/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ blake2b_simd = { workspace = true }
hex-literal = { workspace = true }
thiserror = { workspace = true }
criterion = { workspace = true, optional = true }
time = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
iana-time-zone = { workspace = true }
libtest-mimic = { workspace = true }
crossbeam-queue = { workspace = true }
bip39 = { workspace = true, features = ["chinese-simplified", "chinese-traditional", "czech", "french", "italian", "japanese", "korean", "spanish" ] }
Expand All @@ -46,6 +42,11 @@ tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "time"] }
build-info = { workspace = true }
derive_more = { workspace = true, features = ["display"], default-features = false }
chrono = { workspace = true, features = ["now"] }
chrono-tz = { workspace = true }
iana-time-zone = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt","sync", "time"] }
saffron = { workspace = true }

[[test]]
name = "wasm-component-integration-tests"
Expand All @@ -54,4 +55,4 @@ harness = false

[build-dependencies]

build-info-build = { workspace = true }
build-info-build = { workspace = true }
245 changes: 240 additions & 5 deletions hermes/bin/src/runtime_extensions/hermes/cron/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,55 @@
//! Cron runtime extension event handler implementation.

use std::ops::Sub;

use chrono::Utc;
use saffron::Cron;

use super::{state::cron_queue_rm, Error};
use crate::{
event::HermesEventPayload, runtime_extensions::bindings::hermes::cron::api::CronTagged,
};

/// Duration in nanoseconds used for the Cron Service.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Copy, Clone)]
pub(crate) struct CronDuration(u64);

impl Sub for CronDuration {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}

impl TryFrom<i64> for CronDuration {
type Error = Error;

fn try_from(value: i64) -> Result<Self, Self::Error> {
let duration: u64 = value.try_into().map_err(|_| Error::InvalidTimestamp)?;
Ok(CronDuration(duration))
}
}

impl From<CronDuration> for u64 {
fn from(value: CronDuration) -> Self {
value.0
}
}

impl From<u64> for CronDuration {
fn from(value: u64) -> Self {
Self(value)
}
}

/// On cron event
struct OnCronEvent {
#[derive(Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub(crate) struct OnCronEvent {
/// The tagged cron event that was triggered.
tag: CronTagged,
pub(crate) tag: CronTagged,
/// This cron event will not retrigger.
last: bool,
pub(crate) last: bool,
}

impl HermesEventPayload for OnCronEvent {
Expand All @@ -18,12 +58,207 @@ impl HermesEventPayload for OnCronEvent {
}

fn execute(&self, module: &mut crate::wasm::module::ModuleInstance) -> anyhow::Result<()> {
// TODO (@stevenj): https://github.com/input-output-hk/hermes/issues/93
let _res: bool = module.instance.hermes_cron_event().call_on_cron(
let res: bool = module.instance.hermes_cron_event().call_on_cron(
&mut module.store,
&self.tag,
self.last,
)?;
// if the response is `false`, check if the event would
// re-trigger, if so, remove it.
if !res && !self.last {
let app_name = module.store.data().app_name();
cron_queue_rm(app_name, self.tag.clone());
}
Ok(())
saibatizoku marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl OnCronEvent {
/// Get the next scheduled cron event after the optional start timestamp, or after the
/// current timestamp.
///
/// # Parameters
///
/// * `start: Option<CronDuration>` - The optional start timestamp. If `None`, the
/// current time is used.
///
/// # Returns
///
/// * `Some(CronDuration)` - The next timestamp for the `OnCronEvent`.
/// * `None` if the timestamp could not be calculated.
pub(crate) fn tick_after(&self, start: Option<CronDuration>) -> Option<CronDuration> {
let cron = self.cron()?;
if cron.any() {
let datetime = Self::start_datetime(start)?;
let cdt = cron.iter_after(datetime).next()?;
let timestamp = cdt.timestamp_nanos_opt()?;
timestamp.try_into().ok()
} else {
None
}
}

/// Get the next scheduled cron event from the optional start timestamp, or from the
/// current timestamp.
///
/// # Parameters
///
/// * `start: Option<CronDuration>` - The optional start timestamp. If `None`, the
/// current time is used.
///
/// # Returns
///
/// * `Some(CronDuration)` - The next timestamp for the `OnCronEvent`.
/// * `None` if the timestamp could not be calculated.
pub(crate) fn tick_from(&self, start: Option<CronDuration>) -> Option<CronDuration> {
let cron = self.cron()?;
if cron.any() {
let datetime = Self::start_datetime(start)?;
let cdt = cron.iter_from(datetime).next()?;
let timestamp = cdt.timestamp_nanos_opt()?;
timestamp.try_into().ok()
} else {
None
}
}

/// Get the `Cron` from the inner `CronSchedule`.
fn cron(&self) -> Option<Cron> {
let when = &self.tag.when;
when.parse::<Cron>().ok()
}

/// Get the UTC datetime from an optional start timestamp.
///
/// Use the `start` timestamp if provided, otherwise use the current time.
///
/// Returns `None` if the datetime could not be calculated.
fn start_datetime(start: Option<CronDuration>) -> Option<chrono::DateTime<Utc>> {
let datetime = match start {
None => Utc::now(),
Some(ts) => chrono::DateTime::from_timestamp_nanos(u64::from(ts).try_into().ok()?),
};
Some(datetime)
}
}

impl PartialEq for CronTagged {
fn eq(&self, other: &Self) -> bool {
self.tag == other.tag && self.when == other.when
}
}

impl Eq for CronTagged {}
Mr-Leshiy marked this conversation as resolved.
Show resolved Hide resolved

impl PartialOrd for CronTagged {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for CronTagged {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.when.cmp(&other.when).then(self.tag.cmp(&other.tag))
}
}

#[cfg(test)]
mod tests {
use chrono::prelude::*;

use super::*;

#[test]
#[allow(clippy::unwrap_used)]
#[allow(clippy::assertions_on_constants)]
fn test_cron_queue() {
let start = NaiveDate::from_ymd_opt(1970, 1, 1)
.unwrap()
.and_hms_nano_opt(0, 0, 0, 0)
.unwrap();
let datetime = DateTime::from_naive_utc_and_offset(start, Utc);

let cron: Cron = "* * * * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-01-01 00:00:01 UTC
// 1970-01-01 00:00:02 UTC
// 1970-01-01 00:00:03 UTC
// 1970-01-01 00:00:04 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1, 1, 0, (time.0).try_into().unwrap(), 0)
.unwrap()
);
}

let cron: Cron = "0 0 * * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-01-02 00:00:00 UTC
// 1970-01-03 00:00:00 UTC
// 1970-01-04 00:00:00 UTC
// 1970-01-05 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1, 1 + u32::try_from(time.0).unwrap(), 0, 0, 0)
.unwrap()
);
}

// Every first day of the month
let cron: Cron = "0 0 1 * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-02-01 00:00:00 UTC
// 1970-03-01 00:00:00 UTC
// 1970-04-01 00:00:00 UTC
// 1970-05-01 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1 + u32::try_from(time.0).unwrap(), 1, 0, 0, 0)
.unwrap()
);
}

// Every first day of January
let cron: Cron = "0 0 1 1 *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1971-01-01 00:00:00 UTC
// 1972-01-01 00:00:00 UTC
// 1973-01-01 00:00:00 UTC
// 1974-01-01 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970 + i32::try_from(time.0).unwrap(), 1, 1, 0, 0, 0)
.unwrap()
);
}

// every monday and the first day of January
let cron: Cron = "0 0 1 1 7".parse().unwrap();
let times: Vec<DateTime<Utc>> = cron.clone().iter_from(datetime).take(5).collect();
assert_eq!(times, vec![
Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 3, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 10, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 17, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 24, 0, 0, 0).unwrap(),
]);

let cron: Cron = "0 0 1 1,3,5,7,9,11 *".parse().unwrap();
let times: Vec<DateTime<Utc>> = cron.clone().iter_from(datetime).take(5).collect();
assert_eq!(times, vec![
Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 3, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 5, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 7, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 9, 1, 0, 0, 0).unwrap(),
]);
}
}