Skip to content

Commit

Permalink
feat(cron api rust): Implement Cron Events Rust (#145)
Browse files Browse the repository at this point in the history
* deps: add dashmap crate

* fix: use dashmap instead of hashmap

* chore: update project dictionary

* fix: update cron implementation to use dashmap

* fix: simpler

* refactor: store OnCronEvent in dashmap

Refactor  hermes::cron::State to implement CRUD methods to manage
crontabs

* chore(dict): update project dictionary

* feat(deps): add once_cell and saffron crates

* refactor(cron): refactor state into its own module

* feat: CronTagged impls Hash, PartialEq, Eq, PartialOrd, Ord

* chore: cleanup types

* feat(wip): spawn a cron queue task and store its handle

* fix: remove todos, return values

* feat(wip): add cron queue channels and message type.

* feat(cron): OnCronEvent calculates next timestamp for schedule

* build(deps): add tokio and chrono crates

* chore: update project dictionary

* feat: use saffron::Cron to parse schedules and generate next timestamp

* fix: set MIN_DOW to 1 to keep compatibility with saffron

* feat: add types for cron queue

* feat: implement cron queue storage functionality

* fix: update make_delay to use chron and generate CronJobDelay

* refactor: state is managed in threaded async queue

* fix: update Host implementation to use global static state

* fix: implement Ord for CronComponent to be infallible

* fix: cron state can start without queue task

* fix: implement Ord for CronTagged to be infallible

* fix: typo

* fix: remote commands to catch errors for logging

* feat(wip): add todo comments for logging

* build(deps): update chrono crate and types

* feat: expose cron queue state via functions

docs: improve docstrings

* fix: cleanup

* build(deps): remove time crate

* fix: remove pub(crate) from cron internal state

* feat: on cron event handles response from module

* feat: add unit tests for cron queue methods

* fix(wip): disable cron state tests

* feat: add unit tests for cron queue and cron state methods

* fix: delayed crontab does not retrigger

* feat: add triggering mechanism for on-cron events

* fix: cron state unit test

* feat: cron state functions are tested from multiple threads

* feat: trigger queue when removing crontab

* tidy up comments

* fix: pop_app_queues_and_send returns when all items are sent

* feat: add unit tests for cron queue triggering to the hermes queue

* feat: clear waiting_for task if it has passed when triggering

* chore: refactor code for legibility, expose only needed functions

* feat: add unit tests for triggering the cron queue

* fix: triggering the queue removes stale waiting_event

* fix: use newtypes instead of type aliases

Use HashSet instead of BTreeSet for app_name list

* fix: use HashSet instead of BTreeSet for OnCronEvents

tidy up docs

* fix: update cron queue tests

* fix: type conversion hides inner value

* fix: typos in doc comments

* fix: remove redundant mapping

---------

Co-authored-by: Oleksandr Prokhorenko <djminikin@gmail.com>
  • Loading branch information
saibatizoku and minikin committed Apr 2, 2024
1 parent 7a17c6b commit 00e6726
Show file tree
Hide file tree
Showing 8 changed files with 1,412 additions and 123 deletions.
3 changes: 3 additions & 0 deletions .config/dictionaries/project.dic
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ codegen
codepoints
coti
crontabs
crontagged
cryptoxide
dashmap
Datelike
dashmap
dbsync
Expand All @@ -38,6 +40,7 @@ dotenv
dotenvy
dotglob
drep
Datelike
dreps
encryptor
Errno
Expand Down
8 changes: 4 additions & 4 deletions hermes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ anyhow = "1.0.71"
blake2b_simd = "1.0.2"
hex-literal = "0.4.1"
thiserror = "1.0.56"
tokio = "1.34.0"
hex = "0.4.3"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
criterion = "0.5.1"
time = "0.3.34"
chrono = "0.4.34"
chrono-tz = "0.8.6"
libtest-mimic = "0.7.0"
crossbeam-queue = "0.3.11"
bip39 = "2.0.0"
Expand All @@ -81,3 +77,7 @@ clap = "4.5.3"
build-info = "0.0.36"
derive_more = { version= "0.99.17", default-features= false }
build-info-build = "0.0.36"
chrono = "0.4.35"
chrono-tz = "0.8.6"
saffron = "0.1.0"
tokio = "1.36.0"
11 changes: 6 additions & 5 deletions hermes/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ blake2b_simd = { workspace = true }
hex-literal = { workspace = true }
thiserror = { workspace = true }
criterion = { workspace = true, optional = true }
time = { workspace = true }
chrono = { workspace = true }
chrono-tz = { workspace = true }
iana-time-zone = { workspace = true }
libtest-mimic = { workspace = true }
crossbeam-queue = { workspace = true }
bip39 = { workspace = true, features = ["chinese-simplified", "chinese-traditional", "czech", "french", "italian", "japanese", "korean", "spanish" ] }
Expand All @@ -46,6 +42,11 @@ tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "time"] }
build-info = { workspace = true }
derive_more = { workspace = true, features = ["display"], default-features = false }
chrono = { workspace = true, features = ["now"] }
chrono-tz = { workspace = true }
iana-time-zone = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt","sync", "time"] }
saffron = { workspace = true }

[[test]]
name = "wasm-component-integration-tests"
Expand All @@ -54,4 +55,4 @@ harness = false

[build-dependencies]

build-info-build = { workspace = true }
build-info-build = { workspace = true }
245 changes: 240 additions & 5 deletions hermes/bin/src/runtime_extensions/hermes/cron/event.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,55 @@
//! Cron runtime extension event handler implementation.

use std::ops::Sub;

use chrono::Utc;
use saffron::Cron;

use super::{state::cron_queue_rm, Error};
use crate::{
event::HermesEventPayload, runtime_extensions::bindings::hermes::cron::api::CronTagged,
};

/// Duration in nanoseconds used for the Cron Service.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Copy, Clone)]
pub(crate) struct CronDuration(u64);

impl Sub for CronDuration {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self(self.0 - rhs.0)
}
}

impl TryFrom<i64> for CronDuration {
type Error = Error;

fn try_from(value: i64) -> Result<Self, Self::Error> {
let duration: u64 = value.try_into().map_err(|_| Error::InvalidTimestamp)?;
Ok(CronDuration(duration))
}
}

impl From<CronDuration> for u64 {
fn from(value: CronDuration) -> Self {
value.0
}
}

impl From<u64> for CronDuration {
fn from(value: u64) -> Self {
Self(value)
}
}

/// On cron event
struct OnCronEvent {
#[derive(Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub(crate) struct OnCronEvent {
/// The tagged cron event that was triggered.
tag: CronTagged,
pub(crate) tag: CronTagged,
/// This cron event will not retrigger.
last: bool,
pub(crate) last: bool,
}

impl HermesEventPayload for OnCronEvent {
Expand All @@ -18,12 +58,207 @@ impl HermesEventPayload for OnCronEvent {
}

fn execute(&self, module: &mut crate::wasm::module::ModuleInstance) -> anyhow::Result<()> {
// TODO (@stevenj): https://github.com/input-output-hk/hermes/issues/93
let _res: bool = module.instance.hermes_cron_event().call_on_cron(
let res: bool = module.instance.hermes_cron_event().call_on_cron(
&mut module.store,
&self.tag,
self.last,
)?;
// if the response is `false`, check if the event would
// re-trigger, if so, remove it.
if !res && !self.last {
let app_name = module.store.data().app_name();
cron_queue_rm(app_name, self.tag.clone());
}
Ok(())
}
}

impl OnCronEvent {
/// Get the next scheduled cron event after the optional start timestamp, or after the
/// current timestamp.
///
/// # Parameters
///
/// * `start: Option<CronDuration>` - The optional start timestamp. If `None`, the
/// current time is used.
///
/// # Returns
///
/// * `Some(CronDuration)` - The next timestamp for the `OnCronEvent`.
/// * `None` if the timestamp could not be calculated.
pub(crate) fn tick_after(&self, start: Option<CronDuration>) -> Option<CronDuration> {
let cron = self.cron()?;
if cron.any() {
let datetime = Self::start_datetime(start)?;
let cdt = cron.iter_after(datetime).next()?;
let timestamp = cdt.timestamp_nanos_opt()?;
timestamp.try_into().ok()
} else {
None
}
}

/// Get the next scheduled cron event from the optional start timestamp, or from the
/// current timestamp.
///
/// # Parameters
///
/// * `start: Option<CronDuration>` - The optional start timestamp. If `None`, the
/// current time is used.
///
/// # Returns
///
/// * `Some(CronDuration)` - The next timestamp for the `OnCronEvent`.
/// * `None` if the timestamp could not be calculated.
pub(crate) fn tick_from(&self, start: Option<CronDuration>) -> Option<CronDuration> {
let cron = self.cron()?;
if cron.any() {
let datetime = Self::start_datetime(start)?;
let cdt = cron.iter_from(datetime).next()?;
let timestamp = cdt.timestamp_nanos_opt()?;
timestamp.try_into().ok()
} else {
None
}
}

/// Get the `Cron` from the inner `CronSchedule`.
fn cron(&self) -> Option<Cron> {
let when = &self.tag.when;
when.parse::<Cron>().ok()
}

/// Get the UTC datetime from an optional start timestamp.
///
/// Use the `start` timestamp if provided, otherwise use the current time.
///
/// Returns `None` if the datetime could not be calculated.
fn start_datetime(start: Option<CronDuration>) -> Option<chrono::DateTime<Utc>> {
let datetime = match start {
None => Utc::now(),
Some(ts) => chrono::DateTime::from_timestamp_nanos(u64::from(ts).try_into().ok()?),
};
Some(datetime)
}
}

impl PartialEq for CronTagged {
fn eq(&self, other: &Self) -> bool {
self.tag == other.tag && self.when == other.when
}
}

impl Eq for CronTagged {}

impl PartialOrd for CronTagged {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for CronTagged {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.when.cmp(&other.when).then(self.tag.cmp(&other.tag))
}
}

#[cfg(test)]
mod tests {
use chrono::prelude::*;

use super::*;

#[test]
#[allow(clippy::unwrap_used)]
#[allow(clippy::assertions_on_constants)]
fn test_cron_queue() {
let start = NaiveDate::from_ymd_opt(1970, 1, 1)
.unwrap()
.and_hms_nano_opt(0, 0, 0, 0)
.unwrap();
let datetime = DateTime::from_naive_utc_and_offset(start, Utc);

let cron: Cron = "* * * * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-01-01 00:00:01 UTC
// 1970-01-01 00:00:02 UTC
// 1970-01-01 00:00:03 UTC
// 1970-01-01 00:00:04 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1, 1, 0, (time.0).try_into().unwrap(), 0)
.unwrap()
);
}

let cron: Cron = "0 0 * * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-01-02 00:00:00 UTC
// 1970-01-03 00:00:00 UTC
// 1970-01-04 00:00:00 UTC
// 1970-01-05 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1, 1 + u32::try_from(time.0).unwrap(), 0, 0, 0)
.unwrap()
);
}

// Every first day of the month
let cron: Cron = "0 0 1 * *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1970-02-01 00:00:00 UTC
// 1970-03-01 00:00:00 UTC
// 1970-04-01 00:00:00 UTC
// 1970-05-01 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970, 1 + u32::try_from(time.0).unwrap(), 1, 0, 0, 0)
.unwrap()
);
}

// Every first day of January
let cron: Cron = "0 0 1 1 *".parse().unwrap();
for time in cron.clone().iter_from(datetime).enumerate().take(5) {
// generates
// 1970-01-01 00:00:00 UTC
// 1971-01-01 00:00:00 UTC
// 1972-01-01 00:00:00 UTC
// 1973-01-01 00:00:00 UTC
// 1974-01-01 00:00:00 UTC
assert_eq!(
time.1,
Utc.with_ymd_and_hms(1970 + i32::try_from(time.0).unwrap(), 1, 1, 0, 0, 0)
.unwrap()
);
}

// every monday and the first day of January
let cron: Cron = "0 0 1 1 7".parse().unwrap();
let times: Vec<DateTime<Utc>> = cron.clone().iter_from(datetime).take(5).collect();
assert_eq!(times, vec![
Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 3, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 10, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 17, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 1, 24, 0, 0, 0).unwrap(),
]);

let cron: Cron = "0 0 1 1,3,5,7,9,11 *".parse().unwrap();
let times: Vec<DateTime<Utc>> = cron.clone().iter_from(datetime).take(5).collect();
assert_eq!(times, vec![
Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 3, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 5, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 7, 1, 0, 0, 0).unwrap(),
Utc.with_ymd_and_hms(1970, 9, 1, 0, 0, 0).unwrap(),
]);
}
}

0 comments on commit 00e6726

Please sign in to comment.