Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 198 additions & 50 deletions v2/crates/wifi-densepose-sensing-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5476,6 +5476,100 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) {
}
}

/// Map one sensing-broadcast JSON document into the `VitalsSnapshot`(s) to
/// publish over MQTT (issues #872/#898).
///
/// Multi-node sources carry a `nodes` array where **each node has its own
/// `classification`** (`motion_level`, `presence`, `confidence`) and RSSI — so
/// each node must surface its *own* presence/motion, not the room-level
/// aggregate. Previously the bridge applied the aggregate `classification` to
/// every per-node Home-Assistant device, so a node in an empty corner inherited
/// another node's "present" (and `motion_level: "absent"` was mis-mapped to full
/// motion). Vitals (breathing / heart rate) and the person count are room-level
/// and shared across the per-node devices. Falls back to a single aggregate
/// snapshot when there is no per-node data (e.g. wifi / simulate sources).
#[cfg(feature = "mqtt")]
fn vitals_snapshots_from_sensing_json(
v: &serde_json::Value,
base_id: &str,
) -> Vec<wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot> {
use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot;

// motion_level string -> motion scalar. "absent"/"none"/"still"/"idle"/""
// are non-moving; anything else (walking, …) is motion. `fallback` is used
// when the field is absent so a partial per-node payload defers to the
// room aggregate rather than silently reading 0.
fn motion_of(level: Option<&str>, fallback: f64) -> f64 {
match level {
Some("none") | Some("still") | Some("idle") | Some("absent") | Some("") => 0.0,
Some(_) => 1.0,
None => fallback,
}
}

let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
let vit = &v["vital_signs"];
let breathing = vit["breathing_rate_bpm"].as_f64();
let hr = vit["heart_rate_bpm"].as_f64();
let n_persons = v["persons"]
.as_array()
.map(|a| a.len() as u32)
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
.unwrap_or(0);

// Room-level aggregate: the no-nodes fallback, and the per-node default for
// any field a node omits.
let acls = &v["classification"];
let agg_presence = acls["presence"].as_bool().unwrap_or(false);
let agg_motion = motion_of(acls["motion_level"].as_str(), 0.0);
let agg_conf = acls["confidence"].as_f64().unwrap_or(0.0);

let mk = |node_id: String, presence: bool, motion: f64, conf: f64, rssi: Option<f64>| {
VitalsSnapshot {
node_id,
timestamp_ms: ts,
presence,
motion,
presence_score: if presence { conf.max(0.0) } else { 0.0 },
breathing_rate_bpm: breathing,
heartrate_bpm: hr,
n_persons,
rssi_dbm: rssi,
vital_confidence: conf,
..Default::default()
}
};

match v["nodes"].as_array() {
Some(arr) if !arr.is_empty() => arr
.iter()
.map(|node| {
let n = node["node_id"].as_u64().unwrap_or(0);
// Each node carries its OWN classification — use it, deferring to
// the room aggregate only for fields the node omits.
let ncls = &node["classification"];
let presence = ncls["presence"].as_bool().unwrap_or(agg_presence);
let motion = motion_of(ncls["motion_level"].as_str(), agg_motion);
let conf = ncls["confidence"].as_f64().unwrap_or(agg_conf);
mk(
format!("{base_id}-node{n}"),
presence,
motion,
conf,
node["rssi_dbm"].as_f64(),
)
})
.collect(),
_ => vec![mk(
base_id.to_string(),
agg_presence,
agg_motion,
agg_conf,
v["nodes"][0]["rssi_dbm"].as_f64(),
)],
}
}

// ── Main ─────────────────────────────────────────────────────────────────────

/// If `--ui-path` points nowhere (wrong cwd), try common repo layouts relative to cwd.
Expand Down Expand Up @@ -6200,56 +6294,13 @@ async fn main() {
let Ok(v) = serde_json::from_str::<serde_json::Value>(&json) else {
continue;
};
let cls = &v["classification"];
let vit = &v["vital_signs"];
let presence = cls["presence"].as_bool().unwrap_or(false);
let n_persons = v["persons"]
.as_array()
.map(|a| a.len() as u32)
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
.unwrap_or(0);
let motion = match cls["motion_level"].as_str() {
Some("none") | Some("still") | Some("idle") | Some("") => 0.0,
Some(_) => 1.0,
None => 0.0,
};
let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
let conf = cls["confidence"].as_f64().unwrap_or(0.0);
let presence_score = if presence { conf.max(0.0) } else { 0.0 };
let breathing = vit["breathing_rate_bpm"].as_f64();
let hr = vit["heart_rate_bpm"].as_f64();
// #898: emit one snapshot per physical node so each
// surfaces as its own Home-Assistant device (with
// its own RSSI + availability). Falls back to a
// single aggregate snapshot when there is no
// per-node data (e.g. wifi / simulate sources).
let mk = |nid: String, rssi: Option<f64>| mqtt::state::VitalsSnapshot {
node_id: nid,
timestamp_ms: ts,
presence,
motion,
presence_score,
breathing_rate_bpm: breathing,
heartrate_bpm: hr,
n_persons,
rssi_dbm: rssi,
vital_confidence: conf,
..Default::default()
};
match v["nodes"].as_array() {
Some(arr) if !arr.is_empty() => {
for node in arr {
let n = node["node_id"].as_u64().unwrap_or(0);
let nid = format!("{node_id}-node{n}");
let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64()));
}
}
_ => {
let _ = vtx.send(mk(
node_id.clone(),
v["nodes"][0]["rssi_dbm"].as_f64(),
));
}
// #898/#872: emit one snapshot per physical node so
// each surfaces as its own Home-Assistant device with
// its *own* presence/motion/RSSI (see
// vitals_snapshots_from_sensing_json). Falls back to a
// single aggregate snapshot for per-node-less sources.
for snap in vitals_snapshots_from_sensing_json(&v, &node_id) {
let _ = vtx.send(snap);
}
}
});
Expand Down Expand Up @@ -7068,3 +7119,100 @@ mod rolling_p95_tests {
assert_eq!(p.len(), 1);
}
}

#[cfg(all(test, feature = "mqtt"))]
mod mqtt_bridge_tests {
use super::vitals_snapshots_from_sensing_json;
use serde_json::json;

/// Regression for the per-node presence bug (#872/#898): each node must
/// surface its OWN classification, not the room-level aggregate. Node 1 is
/// present+moving; node 2 is absent — node 2 must NOT inherit node 1's
/// "present".
#[test]
fn per_node_presence_uses_each_nodes_own_classification() {
let v = json!({
"timestamp": 1.0,
"classification": { "presence": true, "motion_level": "walking", "confidence": 0.9 },
"vital_signs": { "breathing_rate_bpm": 14.0, "heart_rate_bpm": 60.0 },
"persons": [{}, {}],
"nodes": [
{ "node_id": 1, "rssi_dbm": -40.0,
"classification": { "presence": true, "motion_level": "walking", "confidence": 0.8 } },
{ "node_id": 2, "rssi_dbm": -70.0,
"classification": { "presence": false, "motion_level": "absent", "confidence": 0.1 } }
]
});
let snaps = vitals_snapshots_from_sensing_json(&v, "ruview");
assert_eq!(snaps.len(), 2, "one snapshot per node");

let n1 = snaps.iter().find(|s| s.node_id == "ruview-node1").unwrap();
let n2 = snaps.iter().find(|s| s.node_id == "ruview-node2").unwrap();

assert!(n1.presence && n1.motion > 0.0, "node1 present + moving");
assert!(
!n2.presence && n2.motion == 0.0,
"node2 must be absent — not inherit the room aggregate"
);
// Per-node RSSI preserved.
assert_eq!(n1.rssi_dbm, Some(-40.0));
assert_eq!(n2.rssi_dbm, Some(-70.0));
// Vitals + person count are room-level, shared across node devices.
assert_eq!(n1.n_persons, 2);
assert_eq!(n2.n_persons, 2);
assert_eq!(n1.breathing_rate_bpm, Some(14.0));
assert_eq!(n2.heartrate_bpm, Some(60.0));
// presence_score is gated on presence.
assert!(n1.presence_score > 0.0);
assert_eq!(n2.presence_score, 0.0);
}

/// A node that omits a classification field defers to the room aggregate
/// rather than silently reading false/0.
#[test]
fn per_node_missing_fields_fall_back_to_aggregate() {
let v = json!({
"timestamp": 1.0,
"classification": { "presence": true, "motion_level": "still", "confidence": 0.7 },
"vital_signs": {},
"nodes": [ { "node_id": 3, "rssi_dbm": -55.0 } ] // no per-node classification
});
let snaps = vitals_snapshots_from_sensing_json(&v, "n");
assert_eq!(snaps.len(), 1);
assert_eq!(snaps[0].node_id, "n-node3");
assert!(snaps[0].presence, "defers to aggregate presence");
assert_eq!(snaps[0].motion, 0.0, "aggregate 'still' => no motion");
}

/// No `nodes` array (wifi / simulate sources): single aggregate snapshot
/// keyed by the base id.
#[test]
fn falls_back_to_single_aggregate_when_no_nodes() {
let v = json!({
"timestamp": 2.0,
"classification": { "presence": true, "motion_level": "idle", "confidence": 0.6 },
"vital_signs": { "breathing_rate_bpm": 12.0 },
"persons": [{}]
});
let snaps = vitals_snapshots_from_sensing_json(&v, "ruview");
assert_eq!(snaps.len(), 1);
assert_eq!(snaps[0].node_id, "ruview");
assert!(snaps[0].presence);
assert_eq!(snaps[0].motion, 0.0, "idle => no motion");
assert_eq!(snaps[0].n_persons, 1);
}

/// `motion_level: "absent"` must map to zero motion (the old aggregate
/// match fell through to `Some(_) => 1.0`, treating absent as full motion).
#[test]
fn absent_motion_level_is_zero_motion() {
let v = json!({
"timestamp": 0.0,
"classification": { "presence": false, "motion_level": "absent", "confidence": 0.0 },
"vital_signs": {}
});
let snaps = vitals_snapshots_from_sensing_json(&v, "x");
assert_eq!(snaps[0].motion, 0.0);
assert!(!snaps[0].presence);
}
}
Loading