Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 3 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,7 @@ build = "build.rs"
[features]

# Enables all non-conflicting features
full = [
"publish",
"access",
"serde",
"reqwest",
"aescbc",
"parse_token",
"blocking",
"std"
]
full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std"]

# Enables all default features
default = ["publish", "serde", "reqwest", "aescbc", "std", "blocking"]
Expand Down Expand Up @@ -54,23 +45,7 @@ reqwest = ["dep:reqwest", "dep:bytes"]
blocking = ["reqwest?/blocking"]

## Enables std library
std = [
"derive_builder/std",
"log/std",
"uuid/std",
"base64/std",
"spin/std",
"snafu/std",
"hmac/std",
"sha2/std",
"time/std",
"bytes?/std",
"getrandom/std",
"rand/default",
"serde?/std",
"serde_json?/std",
"ciborium?/std"
]
std = ["derive_builder/std", "log/std", "uuid/std", "base64/std", "spin/std", "snafu/std", "hmac/std", "sha2/std", "time/std", "bytes?/std", "getrandom/std", "rand/default", "serde?/std", "serde_json?/std", "ciborium?/std"]

## Enables very specific implementations for different platforms.
##
Expand All @@ -96,6 +71,7 @@ async-trait = "0.1"
log = "0.4"
hashbrown = "0.13"
spin = "0.9"
phantom-type = { vestion = "0.4.2", default-features = false }
percent-encoding = { version = "2.1", default-features = false }
base64 = { version = "0.21", features = ["alloc"], default-features = false }
derive_builder = {version = "0.12", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use snafu::Snafu;
/// ```
///
/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
#[derive(Snafu, Debug, Clone)]
#[derive(Snafu, Debug, Clone, PartialEq)]
pub enum PubNubError {
/// this error is returned when the transport layer fails
#[snafu(display("Transport error: {details}"))]
Expand Down
19 changes: 19 additions & 0 deletions src/core/event_engine/effect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::{
core::event_engine::EffectInvocation,
lib::alloc::{string::String, vec::Vec},
};

pub(crate) trait Effect {
type Invocation: EffectInvocation;

/// Unique effect identifier.
fn id(&self) -> String;

/// Run work associated with effect.
fn run<F>(&self, f: F)
where
F: FnMut(Option<Vec<<Self::Invocation as EffectInvocation>::Event>>);

/// Cancel any ongoing effect's work.
fn cancel(&self);
}
260 changes: 260 additions & 0 deletions src/core/event_engine/effect_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use crate::{
core::event_engine::{Effect, EffectHandler, EffectInvocation},
lib::alloc::{rc::Rc, vec, vec::Vec},
};
use phantom_type::PhantomType;
use spin::rwlock::RwLock;

/// State machine effects dispatcher.
#[allow(dead_code)]
pub(crate) struct EffectDispatcher<EH, EF, EI>
where
EI: EffectInvocation<Effect = EF>,
EH: EffectHandler<EI, EF>,
EF: Effect<Invocation = EI>,
{
/// Effect invocation handler.
///
/// Handler responsible for providing actual implementation of
handler: EH,

/// Dispatched effects managed by dispatcher.
///
/// There are effects whose lifetime should be managed by the dispatcher.
/// State machines may have some effects that are exclusive and can only run
/// one type of them at once. The dispatcher handles such effects
/// and cancels them when required.
managed: RwLock<Vec<Rc<EF>>>,

_invocation: PhantomType<EI>,
}

impl<EH, EF, EI> EffectDispatcher<EH, EF, EI>
where
EI: EffectInvocation<Effect = EF>,
EH: EffectHandler<EI, EF>,
EF: Effect<Invocation = EI>,
{
/// Create new effects dispatcher.
pub fn new(handler: EH) -> Self {
EffectDispatcher {
handler,
managed: RwLock::new(vec![]),
_invocation: Default::default(),
}
}

/// Dispatch effect associated with `invocation`.
pub fn dispatch<F>(&self, invocation: &EI, mut f: F)
where
F: FnMut(Option<Vec<EI::Event>>),
{
if let Some(effect) = self.handler.create(invocation) {
let effect = Rc::new(effect);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that RC here?
Why do we passing the remove_managed_effect into the effect and call it inside of it?

Additionally, if we need reference counter it probably won't work with our client as it needs to be thread safe.
Use Arc instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that RC here?

I have a requirement to store managed effect in a vector and make it possible to remove the same effect from list after run completion.
I tried to filter in completion block, but borrow checker won't let me go this way. Furthermore, I used Rc just to make clone once to add it vector and don't need Rc functionality after that.

Why do we passing the remove_managed_effect into the effect and call it inside of it?

I can't follow this one.
The idea was that run function of effect has completion closure, with which we will be able to remove effect from the list of managed effects.

Additionally, if we need reference counter it probably won't work with our client as it needs to be thread safe.
Use Arc instead.

Rc functionality used once (I mean clone) within dispatch function, and that is all. All other access to it through vector which stores it should be safe enough because of RwLock.

Copy link
Contributor

@Xavrax Xavrax May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that run function of effect has completion closure, with which we will be able to remove effect from the list of managed effects.

This one is still confusing to me.
How effects are allowed to managed the call?
Currently in SubscribeEffect it doesn't make any decision about it. Do you think that some effects will affect that?
We still can casually call that line after calling run function.


if invocation.managed() {
let mut managed = self.managed.write();
managed.push(effect.clone());
}

// Placeholder for effect invocation.
effect.run(|events| {
// Try remove effect from list of managed.
self.remove_managed_effect(&effect);

// Notify about effect run completion.
// Placeholder for effect events processing (pass to effects handler).
f(events);
Comment on lines +65 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these lines assume's that f() have to be called after the Effect.
As it's synchronous - I still don't see sense of that call. Maybe Am I missing something ;o?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have implemented effects yet, but I assume that some of them would return events, which should be processed by the event engine.
It is exactly as it says – it is a placeholder for functionality, which I expect to appear with implemented effects.
The function pointer can be passed to the created effect, so it will be called from it.

Copy link
Contributor

@Xavrax Xavrax May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure about that.
Anyway imo you can merge that.
Rest of code looks good.

});
} else if invocation.cancelling() {
self.cancel_effect(invocation);

// Placeholder for effect events processing (pass to effects handler).
f(None);
}
}

/// Handle effect cancellation.
///
/// Effects with managed lifecycle can be cancelled by corresponding effect
/// invocations.
fn cancel_effect(&self, invocation: &EI) {
let mut managed = self.managed.write();
if let Some(position) = managed.iter().position(|e| invocation.cancelling_effect(e)) {
managed.remove(position).cancel();
}
}

/// Remove managed effect.
fn remove_managed_effect(&self, effect: &EF) {
let mut managed = self.managed.write();
if let Some(position) = managed.iter().position(|ef| ef.id() == effect.id()) {
managed.remove(position);
}
}
}

#[cfg(test)]
mod should {
use super::*;
use crate::core::event_engine::Event;

enum TestEvent {}

impl Event for TestEvent {
fn id(&self) -> &str {
"no_id"
}
}

enum TestEffect {
One,
Two,
Three,
}

impl Effect for TestEffect {
type Invocation = TestInvocation;

fn id(&self) -> String {
match self {
Self::One => "EFFECT_ONE".into(),
Self::Two => "EFFECT_TWO".into(),
Self::Three => "EFFECT_THREE".into(),
}
}

fn run<F>(&self, mut f: F)
where
F: FnMut(Option<Vec<TestEvent>>),
{
match self {
Self::Three => {}
_ => f(None),
}
}

fn cancel(&self) {
// Do nothing.
}
}

enum TestInvocation {
One,
Two,
Three,
CancelThree,
}

impl EffectInvocation for TestInvocation {
type Effect = TestEffect;
type Event = TestEvent;

fn id(&self) -> &str {
match self {
Self::One => "EFFECT_ONE_INVOCATION",
Self::Two => "EFFECT_TWO_INVOCATION",
Self::Three => "EFFECT_THREE_INVOCATION",
Self::CancelThree => "EFFECT_THREE_CANCEL_INVOCATION",
}
}

fn managed(&self) -> bool {
matches!(self, Self::Two | Self::Three)
}

fn cancelling(&self) -> bool {
matches!(self, Self::CancelThree)
}

fn cancelling_effect(&self, effect: &Self::Effect) -> bool {
match self {
TestInvocation::CancelThree => matches!(effect, TestEffect::Three),
_ => false,
}
}
}

struct TestEffectHandler {}

impl EffectHandler<TestInvocation, TestEffect> for TestEffectHandler {
fn create(&self, invocation: &TestInvocation) -> Option<TestEffect> {
match invocation {
TestInvocation::One => Some(TestEffect::One),
TestInvocation::Two => Some(TestEffect::Two),
TestInvocation::Three => Some(TestEffect::Three),
_ => None,
}
}
}

#[test]
fn run_not_managed_effect() {
let mut called = false;
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
dispatcher.dispatch(&TestInvocation::One, |_| {
called = true;
});

assert!(called, "Expected to call effect for TestInvocation::One");
assert_eq!(
dispatcher.managed.read().len(),
0,
"Non managed effects shouldn't be stored"
);
}

#[test]
fn run_managed_effect() {
let mut called = false;
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
dispatcher.dispatch(&TestInvocation::Two, |_| {
called = true;
});

assert!(called, "Expected to call effect for TestInvocation::Two");
assert_eq!(
dispatcher.managed.read().len(),
0,
"Managed effect should be removed on completion"
);
}

#[test]
fn cancel_managed_effect() {
let mut called_managed = false;
let mut cancelled_managed = false;
let dispatcher = EffectDispatcher::new(TestEffectHandler {});
dispatcher.dispatch(&TestInvocation::Three, |_| {
called_managed = true;
});

assert!(
!called_managed,
"Expected that effect for TestInvocation::Three won't be called"
);
assert_eq!(
dispatcher.managed.read().len(),
1,
"Managed effect shouldn't complete run because doesn't have completion call"
);

dispatcher.dispatch(&TestInvocation::CancelThree, |_| {
cancelled_managed = true;
});

assert!(
cancelled_managed,
"Expected to call effect for TestInvocation::CancelThree"
);
assert!(
!called_managed,
"Expected that effect for TestInvocation::Three won't be called"
);
assert_eq!(
dispatcher.managed.read().len(),
0,
"Managed effect should be cancelled"
);
}
}
10 changes: 10 additions & 0 deletions src/core/event_engine/effect_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use crate::core::event_engine::{Effect, EffectInvocation};

pub(crate) trait EffectHandler<I, EF>
where
I: EffectInvocation,
EF: Effect,
{
/// Create effect using information of effect `invocation`.
fn create(&self, invocation: &I) -> Option<EF>;
}
22 changes: 22 additions & 0 deletions src/core/event_engine/effect_invocation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::core::event_engine::{Effect, Event};

/// Effect invocation trait.
///
/// Invocation is an intention to run an effect. Effect dispatcher uses intents
/// to schedule actual effect invocation.
pub(crate) trait EffectInvocation {
type Effect: Effect;
type Event: Event;

/// Unique effect invocation identifier.
fn id(&self) -> &str;

/// Whether invoked effect lifetime should be managed by dispatcher or not.
fn managed(&self) -> bool;

/// Whether effect invocation cancels managed effect or not.
fn cancelling(&self) -> bool;

/// Whether effect invocation cancels specific managed effect or not.
fn cancelling_effect(&self, effect: &Self::Effect) -> bool;
}
10 changes: 10 additions & 0 deletions src/core/event_engine/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/// Event engine external event.
///
/// State machine uses events to calculate transition path and list of effects
/// invocations.
///
/// Types which are expected to be used as events should implement this trait.
pub(crate) trait Event {
/// Event identifier.
fn id(&self) -> &str;
}
Loading