Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed absolute paths #37

Merged
merged 3 commits into from Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
@@ -1,14 +1,14 @@
[package]
name = "riker"
version = "0.2.2"
version = "0.2.3"
authors = ["Lee Smith <leenozara@gmail.com>"]
edition = "2018"
description = "Easily build fast, highly concurrent and resilient applications. An Actor Framework for Rust."
homepage = "https://riker.rs"
repository = "https://github.com/riker-rs/riker"
license = "MIT"
readme = "README.md"
keywords = ["actors", "actor-model", "async", "cqrs"]
keywords = ["actors", "actor-model", "async", "cqrs", "event_sourcing"]

[badges]
travis-ci = { repository = "riker-rs/riker" }
Expand All @@ -28,13 +28,13 @@ members = [
bytes = "0.4"
chrono = "0.4"
config = "0.9"
futures-preview = "0.3.0-alpha.10"
futures-preview = "0.3.0-alpha.12"
log = { version = "0.4", features = ["std"] }
rand = "0.4"
regex = "0.2"
uuid = { version = "0.6", features = ["v4"] }
pin-utils = "0.1.0-alpha.3"

[dev-dependencies]
riker-default = { path = "riker-default", version = "0.2.0" }
riker-default = { path = "riker-default", version = "0.2.2" }
riker-testkit = "0.1.0"
14 changes: 13 additions & 1 deletion src/actor/actor_cell.rs
Expand Up @@ -614,7 +614,19 @@ impl<Msg> ActorSelectionFactory for Context<Msg>
type Msg = Msg;

fn select(&self, path: &str) -> Result<ActorSelection<Msg>, InvalidPath> {
ActorSelection::new(&self.myself.clone(), path)
let (anchor, path_str) = if path.starts_with("/") {
let anchor = self.system.user_root().clone();
let anchor_path = format!("{}/", anchor.uri.path.deref().clone());
let path = path.to_string().replace(&anchor_path, "");

(anchor, path)
} else {
(self.myself.clone(), path.to_string())
};

ActorSelection::new(anchor,
self.system.dead_letters(),
path_str)
}
}

Expand Down
89 changes: 51 additions & 38 deletions src/actor/selection.rs
Expand Up @@ -3,7 +3,6 @@ use std::ops::Deref;

use crate::protocol::{Message, SystemMsg, ActorMsg};
use crate::actor::{Tell, SysTell};
use crate::actor::CellInternal;
use crate::actor::actor_ref::ActorRef;
use crate::actor::channel::dead_letter;
use crate::validate::{InvalidPath, validate_path};
Expand Down Expand Up @@ -33,24 +32,18 @@ use crate::validate::{InvalidPath, validate_path};
#[derive(Debug)]
pub struct ActorSelection<Msg: Message> {
anchor: ActorRef<Msg>,
path: Vec<Selection>,
path_str: String,
dl: ActorRef<Msg>,
path_vec: Vec<Selection>,
path: String,
}

impl<Msg: Message> ActorSelection<Msg> {
pub fn new(anchor: &ActorRef<Msg>, path: &str) -> Result<ActorSelection<Msg>, InvalidPath> {
validate_path(path)?;

let (anchor, path_str) = if path.starts_with("/") {
let anchor = anchor.user_root();
let anchor_path: String = format!("{}/", anchor.uri.path.deref().clone());
let path = path.to_string().replace(&anchor_path, "");
(anchor, path)
} else {
(anchor.clone(), path.to_string())
};

let path: Vec<Selection> = path_str.split_terminator('/').map({|seg|
pub fn new(anchor: ActorRef<Msg>,
dl: &ActorRef<Msg>,
path: String) -> Result<ActorSelection<Msg>, InvalidPath> {
validate_path(&path)?;

let path_vec: Vec<Selection> = path.split_terminator('/').map({|seg|
match seg {
".." => Selection::SelectParent,
"*" => Selection::SelectAllChildren,
Expand All @@ -60,7 +53,12 @@ impl<Msg: Message> ActorSelection<Msg> {
}
}).collect();

Ok(ActorSelection { anchor: anchor, path: path, path_str: path_str })
Ok(ActorSelection {
anchor,
dl: dl.clone(),
path_vec,
path
})
}
}

Expand All @@ -86,21 +84,22 @@ impl<Msg: Message> Tell for ActorSelection<Msg> {
where T: Into<ActorMsg<Self::Msg>>
{
fn walk<'a, I, Msg>(anchor: &ActorRef<Msg>,
mut path: Peekable<I>,
dl: &ActorRef<Msg>,
mut path_vec: Peekable<I>,
msg: ActorMsg<Msg>,
sender: &Option<ActorRef<Msg>>,
path_str: &String)
path: &String)
where I: Iterator<Item=&'a Selection>, Msg: Message
{
let seg = path.next();
let seg = path_vec.next();

match seg {
Some(&Selection::SelectParent) => {
if path.peek().is_none() {
if path_vec.peek().is_none() {
let parent = anchor.parent();
parent.tell(msg, sender.clone());
} else {
walk(&anchor.parent(), path, msg, sender, path_str);
walk(&anchor.parent(), dl, path_vec, msg, sender, path);
}
},
Some(&Selection::SelectAllChildren) => {
Expand All @@ -110,23 +109,37 @@ impl<Msg: Message> Tell for ActorSelection<Msg> {
},
Some(&Selection::SelectChildName(ref name)) => {
let child = anchor.children().filter({|c| c.name() == name}).last();
if path.peek().is_none() && child.is_some() {
child.unwrap().tell(msg, sender.clone());
} else if path.peek().is_some() && child.is_some() {
walk(&child.as_ref().unwrap(), path, msg, sender, path_str);
if path_vec.peek().is_none() && child.is_some() {
child.unwrap()
.tell(msg, sender.clone());
} else if path_vec.peek().is_some() && child.is_some() {
walk(&child.as_ref().unwrap(),
dl,
path_vec,
msg,
sender,
path);
} else {
let sp = sender.clone().map(|s| s.uri.path.deref().clone());
dead_letter(&anchor.cell.dead_letters(),
let sp = sender.clone()
.map(|s| {
s.uri.path.deref().clone()
});
dead_letter(dl,
sp,
path_str.clone(),
path.clone(),
msg);
}
},
None => {}
}
}

walk(&self.anchor, self.path.iter().peekable(), msg.into(), &sender, &self.path_str);
walk(&self.anchor,
&self.dl,
self.path_vec.iter().peekable(),
msg.into(),
&sender,
&self.path);
}
}

Expand All @@ -135,20 +148,20 @@ impl<Msg: Message> SysTell for ActorSelection<Msg> {

fn sys_tell(&self, msg: SystemMsg<Msg>, sender: Option<ActorRef<Msg>>) {
fn walk<'a, I, Msg>(anchor: &ActorRef<Msg>,
mut path: Peekable<I>,
mut path_vec: Peekable<I>,
msg: SystemMsg<Msg>,
sender: &Option<ActorRef<Msg>>)
where I: Iterator<Item=&'a Selection>, Msg: Message
{
let seg = path.next();
let seg = path_vec.next();

match seg {
Some(&Selection::SelectParent) => {
if path.peek().is_none() {
if path_vec.peek().is_none() {
let parent = anchor.parent();
parent.sys_tell(msg, sender.clone());
} else {
walk(&anchor.parent(), path, msg, sender);
walk(&anchor.parent(), path_vec, msg, sender);
}
},
Some(&Selection::SelectAllChildren) => {
Expand All @@ -158,10 +171,10 @@ impl<Msg: Message> SysTell for ActorSelection<Msg> {
},
Some(&Selection::SelectChildName(ref name)) => {
let child = anchor.children().filter({|c| c.name() == name}).last();
if path.peek().is_none() && child.is_some() {
if path_vec.peek().is_none() && child.is_some() {
child.unwrap().sys_tell(msg, sender.clone());
} else if path.peek().is_some() && child.is_some() {
walk(&child.as_ref().unwrap(), path, msg, sender);
} else if path_vec.peek().is_some() && child.is_some() {
walk(&child.as_ref().unwrap(), path_vec, msg, sender);
} else {
// dead_letter(&anchor.cell.system().dead_letters(), sender.clone().map(|s| s.path().clone().location.path), path_str.clone(), msg);
}
Expand All @@ -170,6 +183,6 @@ impl<Msg: Message> SysTell for ActorSelection<Msg> {
}
}

walk(&self.anchor, self.path.iter().peekable(), msg, &sender);
walk(&self.anchor, self.path_vec.iter().peekable(), msg, &sender);
}
}
16 changes: 15 additions & 1 deletion src/system/system.rs
Expand Up @@ -2,6 +2,7 @@ use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, Duration};
use std::ops::Deref;

use chrono::prelude::*;
use config::Config;
Expand Down Expand Up @@ -382,7 +383,20 @@ impl<Msg> ActorSelectionFactory for ActorSystem<Msg>

fn select(&self, path: &str)
-> Result<ActorSelection<Msg>, InvalidPath> {
ActorSelection::new(self.user_root(), path)
let anchor = self.user_root();
let (anchor, path_str) = if path.starts_with("/") {
let anchor = self.user_root();
let anchor_path = format!("{}/",anchor.uri.path.deref().clone());
let path = path.to_string().replace(&anchor_path, "");

(anchor, path)
} else {
(anchor, path.to_string())
};

ActorSelection::new(anchor.clone(),
self.dead_letters(),
path_str)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/validate.rs
Expand Up @@ -48,7 +48,7 @@ pub struct InvalidPath {

impl Error for InvalidPath {
fn description(&self) -> &str {
"Invalid path. Must contain only a-Z, 0-9, /, _, .., or -"
"Invalid path. Must contain only a-Z, 0-9, /, _, .., - or *"
}
}

Expand Down
97 changes: 90 additions & 7 deletions tests/selection.rs
Expand Up @@ -126,12 +126,12 @@ fn select_all_children_of_child() {
p_assert_eq!(listen, ());

// select absolute test actors through actor selection: /root/user/select-actor/*
// let selection = system.select("/user/select-actor/*").unwrap();
// selection.tell(TestMsg(probe), None);
let selection = system.select("/user/select-actor/*").unwrap();
selection.tell(TestMsg(probe), None);

// // actors 'child_a' and 'child_b' should both fire a probe event
// p_assert_eq!(listen, ());
// p_assert_eq!(listen, ());
// actors 'child_a' and 'child_b' should both fire a probe event
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
}

#[derive(Clone)]
Expand Down Expand Up @@ -170,6 +170,10 @@ impl Actor for SelectTestActorCtx {
let sel = ctx.select("/user/select-actor/child_a").unwrap();
sel.tell(msg.clone(), None);

// // absolute all: /user/select-actor/*
let sel = ctx.select("/user/select-actor/*").unwrap();
sel.tell(msg.clone(), None);

// all: *
let sel = ctx.select("*").unwrap();
sel.tell(msg, None);
Expand All @@ -187,11 +191,90 @@ fn select_ctx() {
let (probe, listen) = probe();
actor.tell(TestMsg(probe), None);

// five test results expected:

// seven test results expected:
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
}


// *** Dead letters test ***
struct DeadLettersActor {
probe: Option<TestProbe>,
}

impl DeadLettersActor {
fn new() -> BoxActor<TestMsg> {
let actor = DeadLettersActor {
probe: None
};

Box::new(actor)
}
}

impl Actor for DeadLettersActor {
type Msg = TestMsg;

fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
// subscribe to dead_letters
let msg = ChannelMsg::Subscribe(All.into(), ctx.myself());
ctx.system.dead_letters().tell(msg, None);
}

fn receive(&mut self, _: &Context<Self::Msg>, msg: Self::Msg, _: Option<ActorRef<Self::Msg>>) {
msg.0.event(()); // notify listen then probe has been received.
self.probe = Some(msg.0);
}

fn other_receive(&mut self, _: &Context<Self::Msg>, msg: ActorMsg<Self::Msg>, _: Option<ActorRef<Self::Msg>>) {
if let ActorMsg::DeadLetter(dl) = msg {
println!("DeadLetter: {} => {} ({:?})", dl.sender, dl.recipient, dl.msg);
self.probe.event(());
}
}
}

#[test]
fn select_no_actors() {
let model: DefaultModel<TestMsg> = DefaultModel::new();
let sys = ActorSystem::new(&model).unwrap();

let props = Props::new(Box::new(DeadLettersActor::new));
let act = sys.actor_of(props, "dl-subscriber").unwrap();

let (probe, listen) = probe();
act.tell(TestMsg(probe.clone()), None);

// wait for the probe to arrive at the dl sub before doing select
listen.recv();

let sel = sys.select("nothing-here").unwrap();

sel.tell(TestMsg(probe), None);

p_assert_eq!(listen, ());
}

#[test]
fn select_invalid_path() {
let model: DefaultModel<TestMsg> = DefaultModel::new();
let sys = ActorSystem::new(&model).unwrap();

// todo randomize the invalid character

let sel = sys.select("foo/`");
assert!(sel.is_err());
let sel = sys.select("foo/@");
assert!(sel.is_err());
let sel = sys.select("!");
assert!(sel.is_err());
let sel = sys.select("foo/$");
assert!(sel.is_err());
let sel = sys.select("&");
assert!(sel.is_err());
}