Skip to content

Commit

Permalink
Extensible render_roles call
Browse files Browse the repository at this point in the history
  • Loading branch information
tailhook committed Jul 4, 2018
1 parent 3776f5e commit f383d3f
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 27 deletions.
16 changes: 14 additions & 2 deletions example-configs/wasm-with-query/src/bin/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate serde;
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate serde_json;

use std::collections::{BTreeMap, HashSet};
use std::mem;
use std::slice;
use std::panic::set_hook;
Expand All @@ -19,6 +20,7 @@ extern {
enum ErrorKind {
Serialize,
Deserialize,
#[allow(dead_code)]
Internal,
}

Expand All @@ -30,6 +32,12 @@ struct QueryError {
backtrace: Option<String>,
}

#[derive(Debug, Serialize)]
pub struct RolesResult {
to_render: BTreeMap<String, Value>,
all_roles: HashSet<String>,
}

fn main() {
set_hook(Box::new(|panic_info| {
let payload = panic_info.payload();
Expand Down Expand Up @@ -71,8 +79,12 @@ pub extern "C" fn render_roles(ptr: *const u8, len: usize) -> *mut c_void {
return out_ptr as *mut c_void;
}

fn _render_roles(_input: Value) -> Result<Value, String> {
return Ok(json!({"imaginary_role": {}}))
fn _render_roles(_input: Value) -> Result<RolesResult, String> {
return Ok(RolesResult {
to_render: vec![("imaginary_role".to_string(), json!({}))]
.into_iter().collect(),
all_roles: vec!["imaginary_role".to_string()].into_iter().collect(),
})
}

fn _wrapper<'x, F, S, D>(data: &'x [u8], f: F) -> Vec<u8>
Expand Down
1 change: 0 additions & 1 deletion src/daemon/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ fn apply_schedule(hash: &String, is_new: bool,
}

let string_schedule = format!("{}", apply_task.schedule.data);
state.reset_unused_roles(apply_task.roles.keys());
for (role_name, vars) in apply_task.roles {
let mut rlog = match dlog.role(&role_name, true) {
Ok(l) => l,
Expand Down
4 changes: 1 addition & 3 deletions src/daemon/frontend/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ pub fn serve<S: 'static>(state: &SharedState, config: &Arc<Config>,
}
let peers = state.peers();
let election = state.election();
let schedule = state.schedule();
let stable_schedule = state.stable_schedule();
let owned_schedule;
let leader_peer;
Expand Down Expand Up @@ -266,8 +265,7 @@ pub fn serve<S: 'static>(state: &SharedState, config: &Arc<Config>,
peers: peers.peers.len(),
peers_timestamp: Some(peers.timestamp),
leader: leader,
// TODO(tailhook)
roles: 0,
roles: state.num_roles(),
last_stable_timestamp: election.last_stable_timestamp,
num_errors: errors.len() + failed_roles.len(),
errors: &*errors,
Expand Down
3 changes: 2 additions & 1 deletion src/daemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ fn main() {
}).expect("scheduler thread starts");

let m1 = meter.clone();
let s1 = state.clone();
thread::Builder::new().name(String::from("scheduler")).spawn(move || {
m1.track_current_thread_by_name();
query::run(resp_init)
query::run(resp_init, s1)
}).expect("scheduler thread starts");

Ok(())
Expand Down
11 changes: 8 additions & 3 deletions src/daemon/query/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use query::Settings;
use serde_json::{Value as Json, Map};
use scheduler::{Schedule};

use super::RolesResult;


pub struct Responder {
schedule: Arc<Schedule>,
Expand All @@ -22,8 +24,8 @@ impl Responder {
hostname: settings.hostname.clone(),
}
}
pub fn render_roles(&self, id: &str)
-> Result<BTreeMap<String, Json>, Error>
pub fn render_roles(&self, id: &str, _prev: Option<&Schedule>)
-> Result<RolesResult, Error>
{
let empty = Map::new();
let roles = self.schedule.data.as_object()
Expand Down Expand Up @@ -80,7 +82,10 @@ impl Responder {
Json::String(format_rfc3339(SystemTime::now()).to_string()));
result.insert(role_name.clone(), Json::Object(cur_vars));
}
Ok(result)
Ok(RolesResult {
all_roles: result.keys().cloned().collect(),
to_render: result,
})
}
pub fn schedule(&self) -> Arc<Schedule> {
self.schedule.clone()
Expand Down
43 changes: 31 additions & 12 deletions src/daemon/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::path::PathBuf;

Expand All @@ -13,6 +13,7 @@ use serde_json::Value as Json;
use id::Id;
use apply::ApplyData;
use scheduler::Schedule;
use shared::SharedState;
use watchdog;


Expand All @@ -31,6 +32,16 @@ pub enum Request {
#[derive(Debug, Clone)]
pub struct Responder(Arc<Internal>);


#[derive(Debug, Deserialize)]
pub struct RolesResult {
to_render: BTreeMap<String, Json>,
all_roles: HashSet<String>,
// TODO(tailhook)
//#[serde(with="::serde_millis")]
//next_render: Option<SystemTime>,
}

#[derive(Debug)]
struct Internal {
schedule_tx: slot::Sender<Arc<Schedule>>,
Expand Down Expand Up @@ -85,7 +96,7 @@ impl Responder {
}
}

pub fn run(init: ResponderInit) {
pub fn run(init: ResponderInit, shared: SharedState) {
let _guard = watchdog::ExitOnReturn(83);
let stream = init.schedule_rx.map(Request::NewSchedule)
.select(init.rx);
Expand All @@ -97,7 +108,8 @@ pub fn run(init: ResponderInit) {
debug!("Incoming request {:?}", request);
match request {
Request::NewSchedule(schedule) => {
let is_equal = responder.schedule()
let prev = responder.schedule();
let is_equal = prev.as_ref()
.map(|x| x.hash == schedule.hash).unwrap_or(false);
if is_equal {
debug!("Same schedule");
Expand All @@ -123,12 +135,14 @@ pub fn run(init: ResponderInit) {
};
let id: String = thread_rng().sample_iter(&Alphanumeric)
.take(24).collect();
match responder.render_roles(&id) {
let prev_ref = prev.as_ref().map(|x| &**x);
match responder.render_roles(&id, prev_ref) {
Ok(data) => {
shared.update_role_list(&data.all_roles);
init.apply_tx.swap(ApplyData {
id,
schedule: schedule.clone(),
roles: data,
roles: data.to_render,
}).ok();
}
Err(e) => {
Expand All @@ -145,10 +159,15 @@ pub fn run(init: ResponderInit) {
};
let id: String = thread_rng().sample_iter(&Alphanumeric)
.take(24).collect();
match responder.render_roles(&id) {
Ok(roles) => {
match responder.render_roles(&id, None) {
Ok(data) => {
shared.update_role_list(&data.all_roles);
init.apply_tx
.swap(ApplyData { id, schedule, roles })
.swap(ApplyData {
id,
schedule,
roles: data.to_render,
})
.ok();
}
Err(e) => {
Expand All @@ -161,14 +180,14 @@ pub fn run(init: ResponderInit) {
}

impl Impl {
fn render_roles(&mut self, id: &str)
-> Result<BTreeMap<String, Json>, Error>
fn render_roles(&mut self, id: &str, prev: Option<&Schedule>)
-> Result<RolesResult, Error>
{
use self::Impl::*;
match self {
Empty => Err(err_msg("no schedule yet")),
Compat(resp) => resp.render_roles(id),
Wasm(resp) => resp.render_roles(id),
Compat(resp) => resp.render_roles(id, prev),
Wasm(resp) => resp.render_roles(id, prev),
}
}
fn schedule(&self) -> Option<Arc<Schedule>> {
Expand Down
8 changes: 5 additions & 3 deletions src/daemon/query/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::path::Path;

Expand All @@ -8,6 +7,7 @@ use serde_json::{Value as Json};

use scheduler::{Schedule};
use wasm::Program;
use super::RolesResult;


pub struct Responder {
Expand All @@ -24,6 +24,7 @@ pub struct QueryInit<'a> {
#[derive(Debug, Serialize)]
pub struct RolesQuery<'a> {
deployment_id: &'a str,
previous_schedule: Option<&'a Schedule>,
}

#[derive(Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -70,12 +71,13 @@ impl Responder {
})
}

pub fn render_roles(&mut self, id: &str)
-> Result<BTreeMap<String, Json>, Error>
pub fn render_roles(&mut self, id: &str, prev: Option<&Schedule>)
-> Result<RolesResult, Error>
{
let result: Result<_, CatchAllError>;
result = self.wasm.json_call("render_roles", &RolesQuery {
deployment_id: id,
previous_schedule: prev,
})?;
return Ok(result?);
}
Expand Down
13 changes: 11 additions & 2 deletions src/daemon/shared.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::Deref;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{SystemTime};
use std::collections::{HashMap, BTreeMap, HashSet};
use std::collections::btree_map::Entry::{Occupied, Vacant};
Expand Down Expand Up @@ -61,6 +62,7 @@ pub struct SharedData {
pub sandbox: Sandbox,
pub mainloop: Remote,
pub fetch_state: ArcCell<fetch::PublicState>,
num_roles: AtomicUsize,
peers: ArcCell<Peers>,
responder: Responder,
}
Expand Down Expand Up @@ -131,6 +133,7 @@ impl SharedState {
options,
sandbox,
peers: ArcCell::new(Arc::new(Peers::new())),
num_roles: 0.into(),
mainloop: mainloop.clone(),
fetch_state: ArcCell::new(
Arc::new(fetch::PublicState::Unstable)),
Expand Down Expand Up @@ -391,17 +394,23 @@ impl SharedState {
role_errors.remove(role_name);
FAILING_ROLES.set(role_errors.len() as i64);
}
pub fn reset_unused_roles<T, I>(&self, roles: I)
where T: AsRef<str>, I: Iterator<Item=T>
pub fn num_roles(&self) -> usize {
self.num_roles.load(Ordering::SeqCst)
}
pub fn update_role_list<T, I>(&self, roles: I)
where T: AsRef<str>, I: IntoIterator<Item=T>
{
let mut lock = self.lock();
let role_errors = Arc::make_mut(&mut lock.failed_roles);
let old_errors = mem::replace(role_errors, Default::default());
let mut n = 0;
for name in roles {
n += 1;
if old_errors.contains(name.as_ref()) {
role_errors.insert(name.as_ref().into());
}
}
self.num_roles.store(n, Ordering::SeqCst);
FAILING_ROLES.set(role_errors.len() as i64);
}
}
Expand Down

0 comments on commit f383d3f

Please sign in to comment.