Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Next
- Add HTTP server running on port 3000. You can toggle stuff on it!
- Crash when the message queue is full, in the hopes that the supervisor restarts us. rumqttc seems to have issues reconnecting sometimes.
- Broadcast discovery info every time we connect to the mqtt server.

## 0.1.5
- Fix STRING types in the JSON mqtt set endpoint as well.
Expand Down
8 changes: 4 additions & 4 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::error::Error;

use crate::utils::Numberish;
use regex::Regex;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::{Serialize, Serializer};
use simple_error::{bail, simple_error};
use slog::debug;
use slog_scope;
Expand Down Expand Up @@ -179,9 +179,9 @@ pub trait DeviceController: Send + Sync {
pub struct AprontestController {
runner: Box<
dyn for<'a> Fn(
&'a [&str],
)
-> Pin<Box<dyn Future<Output = Result<String, Box<dyn Error>>> + 'a + Send>>
&'a [&str],
)
-> Pin<Box<dyn Future<Output = Result<String, Box<dyn Error>>> + 'a + Send>>
+ Send
+ Sync,
>,
Expand Down
30 changes: 29 additions & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::Config;
use crate::controller::{AttributeId, DeviceController, DeviceId};
use crate::syncer::DeviceSyncer;
use crate::utils::{Numberish, ResultExtensions};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server};
Expand All @@ -19,6 +20,7 @@ pub struct HttpServer {
config: Config,
controller: Arc<dyn DeviceController>,
shutdown_signal: Sender<()>,
syncer: Option<Arc<DeviceSyncer>>,
}

#[derive(RustEmbed)]
Expand All @@ -31,12 +33,17 @@ lazy_static! {
}

impl HttpServer {
pub fn new(config: &Config, controller: Arc<dyn DeviceController>) -> Arc<HttpServer> {
pub fn new(
config: &Config,
controller: Arc<dyn DeviceController>,
syncer: Option<Arc<DeviceSyncer>>,
) -> Arc<HttpServer> {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

let this = Arc::new(HttpServer {
config: config.clone(),
controller,
syncer,
shutdown_signal: tx,
});

Expand Down Expand Up @@ -120,6 +127,10 @@ impl HttpServer {
error!(slog_scope::logger(), "device_list_failed"; "error" => ?e);
Ok(Self::json_error_response(&e))
}),
(&Method::GET, "/api/events") => self.last_messages().await.or_else(|e| {
error!(slog_scope::logger(), "last_messages_failed"; "error" => ?e);
Ok(Self::json_error_response(&e))
}),
(&Method::POST, path) if SET_DEVICE_ATTRIBUTE_REGEX.is_match(path) => {
return self.set_attribute(request).await.or_else(|e| {
error!(slog_scope::logger(), "set_attribute_failed"; "error" => ?e);
Expand All @@ -145,6 +156,23 @@ impl HttpServer {
}
}

async fn last_messages(self: Arc<Self>) -> Result<Response<Body>, Box<dyn Error>> {
let result: Vec<_> = {
let lock = self
.syncer
.as_ref()
.ok_or_else(|| simple_error!("No MQTT syncer!"))?
.last_n_messages
.lock()
.await;
(*lock).iter().cloned().collect()
};
Ok(Self::json_response(
200,
serde_json::json!({ "events": result }),
))
}

async fn run_command_output(
self: Arc<Self>,
mut command: Command,
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
let controller = controller::FakeController::new();
let controller = Arc::new(controller);

let _syncer = if config.has_mqtt() {
let syncer = if config.has_mqtt() {
Some(syncer::DeviceSyncer::new(&config, controller.clone()))
} else {
None
};
let _http = if http_port.is_some() {
Some(HttpServer::new(&config, controller.clone()))
Some(HttpServer::new(&config, controller.clone(), syncer))
} else {
None
};
Expand Down
109 changes: 91 additions & 18 deletions src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,61 @@ use crate::converter::device_to_discovery_payload;
use crate::utils::ResultExtensions;
use async_channel::{bounded, Receiver, Sender};
use rumqttc::{Event, EventLoop, Incoming, Publish, Request, Subscribe};
use serde::{Serialize, Serializer};
use serde_json::value::Value::Object;
use simple_error::{bail, simple_error};
use slog::{debug, error, info, trace, warn};
use slog::{crit, debug, error, info, trace, warn};
use slog_scope;
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::error::Error;
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::Duration;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MaybeJsonString {
pub byte_contents: Vec<u8>,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum LoggedMessage {
OutgoingMessage(String, MaybeJsonString),
IncomingMessage(String, MaybeJsonString),
Connected,
Disconnected,
}

impl MaybeJsonString {
pub fn new<P: Clone + Into<Vec<u8>>>(bytes: &P) -> MaybeJsonString {
MaybeJsonString {
byte_contents: bytes.clone().into(),
}
}
}

impl Serialize for MaybeJsonString {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
let str = match std::str::from_utf8(&self.byte_contents) {
Ok(v) => v,
Err(_) => return serializer.serialize_bytes(&self.byte_contents),
};
match serde_json::from_str(str) {
Ok(Object(m)) => m.serialize(serializer),
_ => serializer.serialize_str(str),
}
}
}

pub struct DeviceSyncer {
config: Config,
controller: Arc<dyn DeviceController>,
sender: Sender<Request>,
repoll: Sender<DeviceId>,
pub last_n_messages: Mutex<VecDeque<LoggedMessage>>,
}

impl<'a> DeviceSyncer {
Expand All @@ -32,6 +73,7 @@ impl<'a> DeviceSyncer {
controller,
sender: ev.handle(),
repoll: repoll_sender,
last_n_messages: Mutex::new(VecDeque::with_capacity(10)),
};
let this = Arc::new(syncer);
trace!(slog_scope::logger(), "start_thread");
Expand All @@ -48,14 +90,16 @@ impl<'a> DeviceSyncer {
.await
}
});
this
}

if this.config.discovery_topic_prefix.is_some() {
async fn start_broadcast_discovery_broadcast(self: Arc<Self>) {
if self.config.discovery_topic_prefix.is_some() {
tokio::task::spawn({
let this = this.clone();
let this = self.clone();
async move { this.broadcast_discovery().await }
});
}
this
}

async fn do_subscribe(&self) -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -212,6 +256,14 @@ impl<'a> DeviceSyncer {
Ok(())
}

async fn log_message(self: Arc<Self>, message: LoggedMessage) {
let mut msgs = self.last_n_messages.lock().await;
if msgs.len() == 10 {
msgs.pop_front();
};
msgs.push_back(message)
}

async fn loop_once(self: Arc<Self>, ev: &mut EventLoop) -> Result<(), Box<dyn Error>> {
let message = match ev.poll().await? {
Event::Incoming(i) => i,
Expand All @@ -223,10 +275,18 @@ impl<'a> DeviceSyncer {
return match message {
Incoming::Connect(_) => Ok(()),
Incoming::ConnAck(_) => {
self.do_subscribe().await?;
self.clone().log_message(LoggedMessage::Connected).await;
self.clone().do_subscribe().await?;
self.start_broadcast_discovery_broadcast().await;
Ok(())
}
Incoming::Publish(message) => {
self.clone()
.log_message(LoggedMessage::IncomingMessage(
message.topic.clone(),
MaybeJsonString::new(&message.payload.deref()),
))
.await;
let this = self.clone();
tokio::task::spawn(async move {
this.process_one(message)
Expand All @@ -249,7 +309,10 @@ impl<'a> DeviceSyncer {
Incoming::UnsubAck(_) => bail!("Unexpected unsuback!"),
Incoming::PingReq => Ok(()),
Incoming::PingResp => Ok(()),
Incoming::Disconnect => Ok(()),
Incoming::Disconnect => {
self.clone().log_message(LoggedMessage::Disconnected).await;
Ok(())
}
};
}

Expand All @@ -271,7 +334,7 @@ impl<'a> DeviceSyncer {
}
}

async fn poll_device_(&self, device_id: DeviceId) -> Result<(), Box<dyn Error>> {
async fn poll_device_(self: Arc<Self>, device_id: DeviceId) -> Result<(), Box<dyn Error>> {
let device_info = { self.controller.describe(device_id).await? };
let attributes = device_info
.attributes
Expand All @@ -287,17 +350,24 @@ impl<'a> DeviceSyncer {
let payload = serde_json::Value::Object(attributes).to_string();
trace!(slog_scope::logger(), "poll_device_status"; "device_id" => device_id, "payload" => &payload);

let mut publish = Publish::new(
self.config
.to_topic_string(&TopicType::StatusTopic(device_id))
.unwrap(),
rumqttc::QoS::AtLeastOnce,
payload,
);
let topic = self
.config
.to_topic_string(&TopicType::StatusTopic(device_id))
.unwrap();
let logged_message =
LoggedMessage::OutgoingMessage(topic.clone(), MaybeJsonString::new(&payload));
let mut publish = Publish::new(topic, rumqttc::QoS::AtLeastOnce, payload);
publish.retain = true;
self.sender.try_send(Request::Publish(publish))?;

Ok(())
match self.sender.try_send(Request::Publish(publish)) {
Ok(_) => {
self.log_message(logged_message).await;
Ok(())
}
Err(e) => {
crit!(slog_scope::logger(), "sending_failed_crashing_to_maybe_reconnect"; "error" => ?e);
panic!(e)
}
}
}

async fn poll_device(self: Arc<Self>, device_id: DeviceId) -> () {
Expand Down Expand Up @@ -362,13 +432,16 @@ impl<'a> DeviceSyncer {
let config = v.discovery_info.to_string();
info!(slog_scope::logger(), "discovered_device"; "id" => id, "name" => &device.name);
debug!(slog_scope::logger(), "broadcast_discovery_result"; "id" => id, "topic" => &topic, "config" => &config);
let log_message =
LoggedMessage::OutgoingMessage(topic.clone(), MaybeJsonString::new(&config));
self.sender
.send(Request::Publish(Publish::new(
topic,
rumqttc::QoS::AtLeastOnce,
config,
)))
.await?;
self.log_message(log_message).await;
Ok(())
}
None => {
Expand Down
16 changes: 16 additions & 0 deletions src/web/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const Nav = (props) => {
<ul className="navbar-nav">
<NavLink id="home" name="Home" {...props} />
<NavLink id="add" name="Add Device" {...props} />
<NavLink id="mqtt" name="MQTT Log" {...props} />
<NavLink id="aprontest" name="aprontest output" {...props} />
</ul>
</div>
Expand Down Expand Up @@ -216,6 +217,20 @@ const DeviceDetails = ({device, changeName, setAttribute}) => {
</div>
}

const MqttLog = () => {
const [events, setEvents] = React.useState(["Loading"]);

React.useEffect(() => {
api('/api/events').then(l => setEvents(l.events.reverse()));
}, []);

return <div>
{events.map((e) => {
return <reactJsonView.default name="event" sortKeys={true} src={e} />
})}
</div>;
}

const HomePage = ({device, setDevice}) => {
const [deviceRefresh, setDeviceRefresh] = React.useState(0);
const [devicesList, setDevicesList] = React.useState(null);
Expand Down Expand Up @@ -338,6 +353,7 @@ const Root = () => {
<div className="p-4">
{active === 'home' ? <HomePage device={device} setDevice={setDevice} /> : null}
{active === 'add' ? <AddDevice /> : null}
{active === 'mqtt' ? <MqttLog /> : null}
{active === 'aprontest' ? <RawApronTest /> : null}
</div>
<ErrorToast message={error} onDismiss={() => setError(null)} />
Expand Down