Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Implement Action Runner on Manager (#844)
Browse files Browse the repository at this point in the history
* Implement Action Runner on Manager

This patch implements the IML ActionRunner as a manager side plugin. The
plugin will bookkeep active sessions and any in-flight RPCs to IML
agents.

Other IML services will integrate with this service to make and receive
calls via Unix Domain Socket.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund committed Apr 12, 2019
1 parent 305646c commit 62ddfdb
Show file tree
Hide file tree
Showing 39 changed files with 1,201 additions and 382 deletions.
3 changes: 2 additions & 1 deletion .copr/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ srpm:
rm -rf ${BUILDROOT}/_topdir
if ! rpm -q lustre-client 2> /dev/null; then \
cp lustre-client.repo /etc/yum.repos.d/; \
yum -yq install lustre-client;\
yum -y -q install lustre-client;\
fi
cargo build --release
cp target/release/iml-{action-runner,agent,agent-comms,agent-daemon,stratagem,warp-drive} \
iml-agent-comms.service \
iml-stratagem.service \
iml-action-runner.service \
iml-action-runner.socket \
iml-agent/systemd-units/* \
iml-warp-drive/systemd-units/* \
${TMPDIR}/release/rust-iml
Expand Down
128 changes: 60 additions & 68 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ['iml-wire-types', 'iml-agent-comms', 'iml-rabbit', 'iml-manager-env', 'iml-agent', 'iml-services', 'liblustreapi-sys', 'liblustreapi', 'stratagem-runner', 'tokio-runtime-shutdown', 'iml-manager-messaging', 'iml-warp-drive']
members = ['iml-wire-types', 'iml-agent-comms', 'iml-rabbit', 'iml-manager-env', 'iml-agent', 'iml-services', 'liblustreapi-sys', 'liblustreapi', 'stratagem-runner', 'tokio-runtime-shutdown', 'iml-warp-drive']
5 changes: 4 additions & 1 deletion chroma_core/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,17 @@ def invoke_agent(self, host, command, args={}):
self._log_subprocesses(e.subprocesses)
raise

def invoke_rust_agent(self, host, command, args={}):
pass

def invoke_agent_expect_result(self, host, command, args={}):
from chroma_core.services.job_scheduler.agent_rpc import AgentException

result = self.invoke_agent(host, command, args)

# This case is to deal with upgrades, once every installation is using the new protocol then we should not allow this.
# Once everything is 3.0 or later we will also have version information in the wrapper header.
if (result == None) or ((type(result) == dict) and ("error" not in result) and ("result" not in result)):
if (result is None) or ((type(result) == dict) and ("error" not in result) and ("result" not in result)):
job_log.info("Invalid result %s fixed up on called to %s with args %s" % (result, command, args))

# Prior to 3.0 update_packages returned {'update_packages': data} so fix this up. This code is here so that all
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion ci/azure-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
rustup component add clippy
displayName: Install clippy
- script: |
cargo clippy
cargo clippy --all-targets --all-features -- -W clippy::pedantic
displayName: Check linting
${{ if parameters.crate }}:
workingDirectory: $(Build.SourcesDirectory)/${{ parameters.crate }}
5 changes: 4 additions & 1 deletion iml-action-runner.service
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ PartOf=iml-manager.target
After=rabbitmq-server.service
After=iml-settings-populator.service
Requires=iml-settings-populator.service
Requires=iml-action-runner.socket
BindsTo=iml-action-runner.socket
After=iml-action-runner.socket


[Service]
Expand All @@ -16,4 +19,4 @@ StandardOutput=journal
StandardError=journal

[Install]
WantedBy=iml-manager.target
WantedBy=iml-manager.target
11 changes: 11 additions & 0 deletions iml-action-runner.socket
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[Unit]
Description=IML Action Runner Socket
PartOf=iml-manager.target

[Socket]
ListenStream=/var/run/iml-action-runner.sock
RemoveOnStop=true

[Install]
WantedBy=sockets.target
WantedBy=iml-manager.target
1 change: 0 additions & 1 deletion iml-agent-comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ futures = "0.1.26"
iml-wire-types = { path = "../iml-wire-types", version = "0.1.0" }
iml-rabbit = { path = "../iml-rabbit", version = "0.1.0" }
iml-manager-env = { path = "../iml-manager-env", version = "0.1.0" }
iml-manager-messaging = { path = "../iml-manager-messaging", version = "0.1.0" }
lapin-futures = "0.18.0"
log = "0.4.6"
serde = { version = "1", features = ["derive"] }
Expand Down
6 changes: 1 addition & 5 deletions iml-agent-comms/src/flush_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ pub fn flush(
if drained.is_empty() {
let when = Instant::now() + Duration::from_millis(500);

Box::new(
Delay::new(when)
.map_err(failure::Error::from)
.map(move |_| Loop::Continue(xs)),
)
Box::new(Delay::new(when).from_err().map(move |_| Loop::Continue(xs)))
} else {
log::debug!("flush returning {:?} items", drained.len());
Box::new(future::ok(Loop::Break(drained)))
Expand Down
2 changes: 1 addition & 1 deletion iml-agent-comms/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn remove_stale(hosts: &mut Hosts, fqdn: &Fqdn, client_start_time: &str) {
/// Gets or inserts a new host cooresponding to the given fqdn
pub fn get_or_insert(hosts: &mut Hosts, fqdn: Fqdn, client_start_time: String) -> &Host {
hosts.entry(fqdn.clone()).or_insert_with(|| {
log::info!("Adding host {:?}", fqdn);
log::info!("Adding host {}", fqdn);

Host::new(fqdn, client_start_time)
})
Expand Down
34 changes: 21 additions & 13 deletions iml-agent-comms/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use futures::{
use iml_agent_comms::{
flush_queue,
host::{self, SharedHosts},
messaging::{consume_agent_tx_queue, AgentData},
messaging::{consume_agent_tx_queue, AgentData, AGENT_TX_RUST},
session::{self, Session, Sessions},
};
use iml_manager_messaging::{send_agent_message, send_plugin_message};
use iml_rabbit::{self, TcpClient, TcpClientFuture};
use iml_rabbit::{self, send_message, TcpClient, TcpClientFuture};
use iml_wire_types::{
Envelope, Fqdn, ManagerMessage, ManagerMessages, Message, PluginMessage, PluginName,
};
Expand All @@ -28,16 +27,19 @@ fn data_handler(sessions: Sessions, client: TcpClient, data: AgentData) -> impl
if has_key {
log::debug!("Forwarding valid message {}", data);

Either::A(send_plugin_message(
Either::A(send_message(
client.clone(),
format!("agent_{}_rx", data.plugin),
data.into(),
"",
format!("rust_agent_{}_rx", data.plugin),
PluginMessage::from(data),
))
} else {
log::warn!("Terminating session because unknown {}", data);

Either::B(send_agent_message(
Either::B(send_message(
client.clone(),
"",
AGENT_TX_RUST,
ManagerMessage::SessionTerminate {
fqdn: data.fqdn.clone(),
plugin: data.plugin.clone(),
Expand Down Expand Up @@ -65,9 +67,10 @@ fn session_create_req_handler(
let fut = if let Some(last) = last {
log::warn!("Destroying session {} to create new one", last);

Either::A(send_plugin_message(
Either::A(send_message(
client.clone(),
format!("agent_{}_rx", plugin),
"",
format!("rust_agent_{}_rx", plugin),
PluginMessage::SessionTerminate {
fqdn: last.fqdn,
plugin: last.plugin,
Expand All @@ -79,9 +82,10 @@ fn session_create_req_handler(
};

fut.and_then(move |client| {
send_plugin_message(
send_message(
client.clone(),
format!("agent_{}_rx", plugin.clone()),
"",
format!("rust_agent_{}_rx", plugin.clone()),
PluginMessage::SessionCreate {
fqdn: fqdn.clone(),
plugin: plugin.clone(),
Expand All @@ -91,8 +95,10 @@ fn session_create_req_handler(
.map(|client| (client, fqdn, plugin, session.id))
})
.and_then(move |(client, fqdn, plugin, session_id)| {
send_agent_message(
send_message(
client.clone(),
"",
AGENT_TX_RUST,
ManagerMessage::SessionCreateResponse {
fqdn,
plugin,
Expand Down Expand Up @@ -125,7 +131,9 @@ fn main() {

tokio::run(lazy(move || {
tokio::spawn(lazy(move || {
consume_agent_tx_queue()
iml_rabbit::connect_to_rabbit()
.and_then(iml_rabbit::create_channel)
.and_then(|ch| consume_agent_tx_queue(ch, AGENT_TX_RUST))
.and_then(move |stream| {
log::info!("Started consuming agent_tx queue");

Expand Down
41 changes: 23 additions & 18 deletions iml-agent-comms/src/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

use futures::prelude::*;
use iml_rabbit::{
basic_consume, create_channel, declare_transient_queue, TcpClient, TcpStreamConsumerFuture,
basic_consume, declare_transient_queue, TcpChannel, TcpClient, TcpStreamConsumerFuture,
};
use iml_wire_types::{Fqdn, Id, ManagerMessage, Message, PluginMessage, PluginName, Seq};
use lapin_futures::channel::BasicConsumeOptions;

pub static AGENT_TX_RUST: &'static str = "agent_tx_rust";

#[derive(Debug, serde::Serialize)]
pub struct AgentData {
pub fqdn: Fqdn,
pub plugin: PluginName,
Expand Down Expand Up @@ -77,8 +80,10 @@ pub fn terminate_agent_session(
session_id: Id,
client: TcpClient,
) -> impl Future<Item = TcpClient, Error = failure::Error> {
iml_manager_messaging::send_agent_message(
iml_rabbit::send_message(
client,
"",
AGENT_TX_RUST,
ManagerMessage::SessionTerminate {
fqdn: fqdn.clone(),
plugin: plugin.clone(),
Expand All @@ -87,20 +92,20 @@ pub fn terminate_agent_session(
)
}

pub fn consume_agent_tx_queue() -> impl TcpStreamConsumerFuture {
iml_rabbit::connect_to_rabbit()
.and_then(create_channel)
.and_then(|ch| declare_transient_queue("agent_tx_rust".to_string(), ch))
.and_then(|(ch, q)| {
basic_consume(
ch,
q,
"agent_tx_rust",
Some(BasicConsumeOptions {
no_ack: true,
exclusive: true,
..Default::default()
}),
)
})
pub fn consume_agent_tx_queue(
channel: TcpChannel,
queue_name: impl Into<String>,
) -> impl TcpStreamConsumerFuture {
declare_transient_queue(channel, queue_name).and_then(|(ch, q)| {
basic_consume(
ch,
q,
"",
Some(BasicConsumeOptions {
no_ack: true,
exclusive: true,
..Default::default()
}),
)
})
}
33 changes: 14 additions & 19 deletions iml-agent/src/action_plugins/action_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,16 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use crate::{
action_plugins::manage_stratagem,
agent_error::{ImlAgentError, Result},
};
use crate::{action_plugins::manage_stratagem, agent_error::ImlAgentError};
use futures::{future::IntoFuture, Future};
use iml_wire_types::ActionName;
use iml_wire_types::{ActionName, ToJsonValue};
use std::collections::HashMap;

pub type AgentResult = std::result::Result<serde_json::Value, String>;

/// Convert a `Result` into an `AgentResult`
pub fn convert<T>(r: Result<T>) -> AgentResult
where
T: serde::Serialize + 'static + Send,
{
r.and_then(|x| serde_json::to_value(x).map_err(|e| e.into()))
.map_err(|e| format!("{:?}", e))
}

type BoxedFuture = Box<Future<Item = AgentResult, Error = ()> + 'static + Send>;
type BoxedFuture = Box<
Future<Item = std::result::Result<serde_json::value::Value, String>, Error = ()>
+ 'static
+ Send,
>;

type Callback = Box<Fn(serde_json::value::Value) -> BoxedFuture + Send + Sync>;

Expand All @@ -38,9 +28,14 @@ where
Box::new(
serde_json::from_value(v)
.into_future()
.map_err(|e| e.into())
.from_err()
.and_then(f)
.then(|x| Ok(convert(x)))
.then(|x| {
Ok(match x {
Ok(x) => x.to_json_value(),
Err(e) => e.to_json_value(),
})
})
.map_err(|_: ImlAgentError| ()),
) as BoxedFuture
}
Expand Down
2 changes: 1 addition & 1 deletion iml-agent/src/action_plugins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
pub mod action_plugin;
pub mod manage_stratagem;

pub use action_plugin::{convert, create_registry, Actions, AgentResult};
pub use action_plugin::{create_registry, Actions};
8 changes: 7 additions & 1 deletion iml-agent/src/agent_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

use iml_wire_types::PluginName;
use iml_wire_types::{PluginName, ToJsonValue};
use std::fmt;

pub type Result<T> = std::result::Result<T, ImlAgentError>;
Expand Down Expand Up @@ -201,3 +201,9 @@ impl<T> From<futures::sync::mpsc::SendError<T>> for ImlAgentError {
ImlAgentError::SendError
}
}

impl ToJsonValue for ImlAgentError {
fn to_json_value(&self) -> std::result::Result<serde_json::Value, String> {
Ok(serde_json::Value::String(format!("{:?}", self)))
}
}
Loading

0 comments on commit 62ddfdb

Please sign in to comment.