Skip to content

Commit

Permalink
Initial watchdog commit
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasDP committed Jan 20, 2020
1 parent 546497f commit 24bc5b2
Show file tree
Hide file tree
Showing 17 changed files with 1,511 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
@@ -1,6 +1,7 @@
[workspace]
members = [
"jormungandr-lib",
"jormungandr-watchdog",
"jormungandr",
"jcli",
"jormungandr-integration-tests",
Expand Down
22 changes: 22 additions & 0 deletions jormungandr-watchdog/Cargo.toml
@@ -0,0 +1,22 @@
[package]
name = "jormungandr-watchdog"
version = "0.1.0"
authors = [
"Nicolas Di Prima <nicolas.diprima@iohk.io>",
]
edition = "2018"
license = "Apache-2.0 OR MIT"
readme = "README.md"
repository = "https://github.com/input-output-hk/jormungandr"
description = """
Process and service management and API
"""

[dependencies]
tokio = { version = "^0.2.9", features = ["sync"] }
futures-util = "0.3"
thiserror = "1.0"
async-trait = "^0.1.22"

[dev-dependencies]
tokio = { version = "^0.2.9", features = ["full"] }
13 changes: 13 additions & 0 deletions jormungandr-watchdog/LICENSE-APACHE
@@ -0,0 +1,13 @@
Copyright (c) 2020 Input Output HK

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
25 changes: 25 additions & 0 deletions jormungandr-watchdog/LICENSE-MIT
@@ -0,0 +1,25 @@
Copyright (c) 2020 Input Output HK

Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:

The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
28 changes: 28 additions & 0 deletions jormungandr-watchdog/README.md
@@ -0,0 +1,28 @@
# Jormungandr Watchdog

Core functionality for the update of Jörmungandr to run as a micro-service
architecture.

## How to use

Add the following in your `Cargo.toml` file:

```toml
[dependencies]
jormungandr-watchdog = "0.1"
```

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or <https://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or <https://opensource.org/licenses/MIT>)

at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
7 changes: 7 additions & 0 deletions jormungandr-watchdog/src/lib.rs
@@ -0,0 +1,7 @@
pub mod service;
mod watchdog;

pub use service::{Service, ServiceIdentifier, ServiceState};
pub use watchdog::{
ControlHandler, CoreServices, WatchdogBuilder, WatchdogError, WatchdogMonitor, WatchdogQuery,
};
78 changes: 78 additions & 0 deletions jormungandr-watchdog/src/service/control.rs
@@ -0,0 +1,78 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
stream::Stream,
sync::watch::{self, Receiver, Sender},
};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Control {
Shutdown,
Kill,
}

pub struct Controller {
sender: Sender<Control>,
receiver: Receiver<Control>,
}

pub struct ControlReader {
receiver: Receiver<Control>,
}

impl Controller {
#[allow(clippy::new_without_default)]
pub async fn new() -> Self {
let (sender, mut receiver) = watch::channel(Control::Kill);

let _ = receiver.recv().await;

Controller { sender, receiver }
}

pub fn reader(&self) -> ControlReader {
ControlReader {
receiver: self.receiver.clone(),
}
}

pub fn send(&mut self, control: Control) {
if self.sender.broadcast(control).is_err() {
// the `Controller` own a Receiver so broadcast
// cannot fail
unsafe { std::hint::unreachable_unchecked() }
}
}

pub async fn reset(&mut self) -> ControlReader {
let mut reader = self.reader();
self.send(Control::Kill);
reader.updated().await.unwrap();
reader
}
}

impl ControlReader {
pub async fn updated(&mut self) -> Option<Control> {
self.receiver.recv().await
}
}

impl Future for ControlReader {
type Output = Option<Control>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.get_mut().receiver).poll_next(cx)
}
}

impl Stream for ControlReader {
type Item = Control;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.poll(cx)
}
}
48 changes: 48 additions & 0 deletions jormungandr-watchdog/src/service/intercom.rs
@@ -0,0 +1,48 @@
use std::ops::{Deref, DerefMut};
use tokio::sync::mpsc;

pub struct NoIntercom;

pub trait Intercom: 'static {}

pub struct IntercomSender<T>(mpsc::Sender<T>);

pub struct IntercomReceiver<T>(mpsc::Receiver<T>);

impl Intercom for NoIntercom {}

pub fn channel<T: Intercom>() -> (IntercomSender<T>, IntercomReceiver<T>) {
let (sender, receiver) = mpsc::channel(10);

(IntercomSender(sender), IntercomReceiver(receiver))
}

impl<T> Clone for IntercomSender<T> {
fn clone(&self) -> Self {
IntercomSender(self.0.clone())
}
}

impl<T> Deref for IntercomSender<T> {
type Target = mpsc::Sender<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for IntercomSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<T> Deref for IntercomReceiver<T> {
type Target = mpsc::Receiver<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for IntercomReceiver<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

0 comments on commit 24bc5b2

Please sign in to comment.