diff --git a/Cargo.lock b/Cargo.lock index 3044c85..8a169d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -107,13 +107,13 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" [[package]] name = "async-trait" -version = "0.1.75" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -346,7 +346,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -482,7 +482,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -493,7 +493,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -528,7 +528,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -619,7 +619,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -829,7 +829,7 @@ dependencies = [ "serde", "serde-wasm-bindgen 0.5.0", "serde_urlencoded", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "web-sys", ] @@ -846,7 +846,7 @@ dependencies = [ "serde", "serde-wasm-bindgen 0.6.3", "serde_urlencoded", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "web-sys", ] @@ -866,7 +866,7 @@ dependencies = [ "pin-project", "serde", "serde_json", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -887,7 +887,7 @@ dependencies = [ "pin-project", "serde", "serde_json", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -908,7 +908,7 @@ dependencies = [ "pin-project", "serde", "serde_json", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -944,7 +944,7 @@ dependencies = [ "js-sys", "serde", "serde_json", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "web-sys", ] @@ -959,7 +959,7 @@ dependencies = [ "js-sys", "serde", "serde_json", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "web-sys", ] @@ -1040,7 +1040,7 @@ dependencies = [ "js-sys", "pinned", "serde", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -1059,7 +1059,7 @@ dependencies = [ "js-sys", "pinned", "serde", - "thiserror", + "thiserror 1.0.52", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -1074,7 +1074,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -1147,6 +1147,7 @@ dependencies = [ "serde_json", "serde_yaml", "tokio", + "tokio-modbus", "tokio-serial", "tokio-stream", "tower-http", @@ -1460,7 +1461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9311685eb9a34808bbb0608ad2fcab9ae216266beca5848613e95553ac914e3b" dependencies = [ "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -1826,7 +1827,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -1849,7 +1850,7 @@ checksum = "a829027bd95e54cfe13e3e258a1ae7b645960553fb82b75ff852c29688ee595b" dependencies = [ "futures", "rustversion", - "thiserror", + "thiserror 1.0.52", ] [[package]] @@ -1922,7 +1923,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -1961,9 +1962,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" dependencies = [ "unicode-ident", ] @@ -2145,7 +2146,7 @@ dependencies = [ "rustls-native-certs", "rustls-pemfile", "rustls-webpki", - "thiserror", + "thiserror 1.0.52", "tokio", "tokio-rustls", ] @@ -2299,7 +2300,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -2386,9 +2387,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "socket2" @@ -2446,7 +2447,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -2467,9 +2468,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.76" +version = "2.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" dependencies = [ "proc-macro2", "quote", @@ -2518,7 +2519,16 @@ version = "1.0.52" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.52", +] + +[[package]] +name = "thiserror" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0" +dependencies = [ + "thiserror-impl 2.0.16", ] [[package]] @@ -2529,7 +2539,18 @@ checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", ] [[package]] @@ -2573,7 +2594,25 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", +] + +[[package]] +name = "tokio-modbus" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdd5def942f13ebaebe636834026213ad9501e1cbd77687545cc08af4febd8a8" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "futures-core", + "futures-util", + "log", + "smallvec", + "thiserror 2.0.16", + "tokio", + "tokio-util", ] [[package]] @@ -2732,7 +2771,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -2756,7 +2795,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8f0f68e58d297ba8b22b8b5a96a87b863ba6bb46aaf51e19a4b02c5a6dd5b7f" dependencies = [ - "thiserror", + "thiserror 1.0.52", ] [[package]] @@ -2894,7 +2933,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", "wasm-bindgen-shared", ] @@ -2928,7 +2967,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3168,7 +3207,7 @@ dependencies = [ "rustversion", "serde", "slab", - "thiserror", + "thiserror 1.0.52", "tokio", "tracing", "wasm-bindgen", @@ -3205,7 +3244,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] @@ -3253,7 +3292,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.76", + "syn 2.0.106", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 28c34df..56a9ce7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -30,6 +30,7 @@ axum = "0.7.3" tower-http = { version="0.5.0", features=["fs", "cors"] } include_dir = "0.7.3" mime_guess = "2.0.4" +tokio-modbus = "0.16.1" [profile.release] lto = true diff --git a/server/src/actors/ahoy_dtu.rs b/server/src/actors/ahoy_dtu.rs index 4ba7c5a..4bb4f2b 100644 --- a/server/src/actors/ahoy_dtu.rs +++ b/server/src/actors/ahoy_dtu.rs @@ -1,16 +1,12 @@ -/* - -{"inverter":0,"cmd":11,"tx_request":81,"payload":[1500,0]} -POST http://ahoy-dtu/api/ctrl - -*/ - -use std::time::Duration; +use std::{ + cmp::{max, min}, + time::Duration, +}; use reqwest::Url; use serde::Serialize; -use super::PowerSwitch; +use crate::actors::Regulator; #[derive(Serialize)] struct Payload { @@ -39,27 +35,18 @@ pub(crate) struct AhoyDtu { } #[async_trait::async_trait] -impl PowerSwitch for AhoyDtu { - async fn on(&mut self) {} +impl Regulator for AhoyDtu { + async fn change_power(&mut self, power: isize) { + let target = (power + self.current_watts as isize) as isize; + let target = max(0, target); + let target = min(target, self.upper_limit_watts as isize); + self.set_absolute(target as usize).await; - async fn off(&mut self) { - self.set_power(0).await; + self.current_watts = target as usize; } - async fn set_power(&mut self, power: isize) { - let target = power + self.current_watts as isize; - let target = if target > self.upper_limit_watts as isize { - self.upper_limit_watts - } else { - if target > 0 { - target as usize - } else { - 0 - } - }; - self.set_absolute(target).await; - - self.current_watts = target; + fn power(&self) -> usize { + self.current_watts } } diff --git a/server/src/actors/ez1m.rs b/server/src/actors/ez1m.rs new file mode 100644 index 0000000..38032e5 --- /dev/null +++ b/server/src/actors/ez1m.rs @@ -0,0 +1,101 @@ +use std::{ + cmp::{max, min}, + time::Duration, +}; + +use reqwest::Url; +use tokio::task; + +use crate::actors::Regulator; + +pub(crate) struct EZ1M { + pub(crate) url: String, + + pub(crate) current_watts: usize, + pub(crate) upper_limit_watts: usize, +} + +#[async_trait::async_trait] +impl Regulator for EZ1M { + async fn change_power(&mut self, power: isize) { + let target = (power + self.current_watts as isize) as isize; + let target = max(0, target); + let target = min(target, self.upper_limit_watts as isize); + + self.current_watts = target as usize; + if target < 30 { + self.turn_off().await; + } else { + self.turn_on().await; + self.set_absolute(target as usize).await; + } + } + + fn power(&self) -> usize { + self.current_watts + } +} + +impl EZ1M { + async fn set_absolute(&mut self, power: usize) { + let url = self.url.clone(); + let power = power * 2; + + let mut url = Url::parse(&url).unwrap(); + url.set_path("/setMaxPower"); + + let client_builder = reqwest::ClientBuilder::new(); + let client_builder = client_builder.connect_timeout(Duration::from_secs(5)); + let client = client_builder.build().unwrap(); + + task::spawn(client.get(url).query(&[("p", power)]).send()); + } + + async fn turn_off(&mut self) { + let url = self.url.clone(); + + let mut url = Url::parse(&url).unwrap(); + url.set_path("/setOnOff"); + + let client_builder = reqwest::ClientBuilder::new(); + let client_builder = client_builder.connect_timeout(Duration::from_secs(5)); + let client = client_builder.build().unwrap(); + + task::spawn(client.get(url).query(&[("status", 1)]).send()); + } + + async fn turn_on(&mut self) { + let url = self.url.clone(); + + let mut url = Url::parse(&url).unwrap(); + url.set_path("/setOnOff"); + + let client_builder = reqwest::ClientBuilder::new(); + let client_builder = client_builder.connect_timeout(Duration::from_secs(5)); + let client = client_builder.build().unwrap(); + + task::spawn(client.get(url).query(&[("status", 0)]).send()); + } +} + +mod test { + use std::time::Duration; + + use reqwest::Url; + + // #[tokio::test] + pub async fn lae() { + let power = 33; + let url = "http://192.168.178.54:8050/".to_string(); + + let mut url = Url::parse(&url).unwrap(); + url.set_path("/setMaxPower"); + + let client_builder = reqwest::ClientBuilder::new(); + let client_builder = client_builder.connect_timeout(Duration::from_secs(5)); + let client = client_builder.build().unwrap(); + + let n = client.get(url).query(&[("p", 31)]).send().await; + dbg!(n); + } +} diff --git a/server/src/actors/hs100.rs b/server/src/actors/hs100.rs index d3882c9..2784872 100644 --- a/server/src/actors/hs100.rs +++ b/server/src/actors/hs100.rs @@ -33,6 +33,4 @@ impl PowerSwitch for HS100Switch { .await .unwrap(); } - - async fn set_power(&mut self, _: isize) {} } diff --git a/server/src/actors/marstek.rs b/server/src/actors/marstek.rs new file mode 100644 index 0000000..69534e5 --- /dev/null +++ b/server/src/actors/marstek.rs @@ -0,0 +1,102 @@ +use std::cmp::{max, min}; + +use tokio::task; +use tokio_modbus::{client::tcp, Slave}; + +use crate::actors::Regulator; +use tokio_modbus::prelude::*; + +mod marstek_registers { + pub(super) const STATE: u16 = 0xa41a; + pub(super) const FORCIBLE_CHARGE_WATTS: u16 = 0xa424; + pub(super) const FORCIBLE_DISCHARGE_WATTS: u16 = 0xa425; + + pub(super) const STATE_CHARGE: u16 = 1; + pub(super) const STATE_DISCHARGE: u16 = 2; +} + +pub(crate) struct MarstekCharge { + pub(crate) url: String, + pub(crate) upper_limit_watts: usize, + pub(crate) current_watts: usize, +} + +#[async_trait::async_trait] +impl Regulator for MarstekCharge { + async fn change_power(&mut self, power: isize) { + println!( + "Change power of Marstek Charge by {} current power {}", + power, self.current_watts + ); + let target = power; + + let target = (target + self.current_watts as isize) as isize; + let target = max(0, target) as usize; + + let target = min(target, self.upper_limit_watts); + let socket_addr = self.url.parse().unwrap(); + let slave = Slave(1); + + println!("Setting Marstek Charge to power of {}", target); + + task::spawn(async move { + let mut ctx = tcp::connect_slave(socket_addr, slave).await.unwrap(); + let _ = ctx + .write_single_register(marstek_registers::STATE, marstek_registers::STATE_CHARGE) + .await; + let _ = ctx + .write_single_register(marstek_registers::FORCIBLE_CHARGE_WATTS, target as u16) + .await; + }); + + self.current_watts = target; + } + + fn power(&self) -> usize { + self.current_watts as usize + } +} + +pub(crate) struct MarstekDischarge { + pub(crate) url: String, + pub(crate) upper_limit_watts: usize, + pub(crate) current_watts: usize, +} + +#[async_trait::async_trait] +impl Regulator for MarstekDischarge { + async fn change_power(&mut self, power: isize) { + println!( + "Change power of Marstek Discharge by {} current power {}", + power, self.current_watts + ); + // Consumption (positive values) should be produced by the actor + let target = power; + + let target = target + self.current_watts as isize; + let target = max(0, target) as usize; + + let target = min(target, self.upper_limit_watts); + + let socket_addr = self.url.parse().unwrap(); + let slave = Slave(1); + + println!("Setting Marstek Discharge to power of {}", target); + + self.current_watts = target; + + task::spawn(async move { + let mut ctx = tcp::connect_slave(socket_addr, slave).await.unwrap(); + let _ = ctx + .write_single_register(marstek_registers::STATE, marstek_registers::STATE_DISCHARGE) + .await; + let _ = ctx + .write_single_register(marstek_registers::FORCIBLE_DISCHARGE_WATTS, target as u16) + .await; + }); + } + + fn power(&self) -> usize { + self.current_watts as usize + } +} diff --git a/server/src/actors/mod.rs b/server/src/actors/mod.rs index 238c116..5dead45 100644 --- a/server/src/actors/mod.rs +++ b/server/src/actors/mod.rs @@ -1,321 +1,424 @@ +use std::cmp::{max, min}; + use chrono::{DateTime, Duration, Utc}; -use opendtu::OpenDtu; use tokio::sync::mpsc::Receiver; -use crate::{ActorConfiguration, ActorMode, ActorType, Configuration}; +use crate::{ + actors::{ + ahoy_dtu::AhoyDtu, + ez1m::EZ1M, + marstek::{MarstekCharge, MarstekDischarge}, + opendtu::OpenDtu, + tasmota::TasmotaSwitch, + }, + ActorConfiguration, Configuration, RegulatingActorType, SwitchingActorType, +}; -use self::{ahoy_dtu::AhoyDtu, hs100::HS100Switch, tasmota::TasmotaSwitch}; +use self::hs100::HS100Switch; mod ahoy_dtu; +mod ez1m; mod hs100; +mod marstek; mod opendtu; mod tasmota; struct ActorState { - disable_threshold: isize, - enable_threshold: isize, - duration_seconds: usize, - last_set: Option>, + wait_until: Option>, + time_until_effective_seconds: usize, + special_state: ActorStateType, +} + +impl ActorState { + fn is_busy(&self) -> bool { + self.wait_until > Some(Utc::now()) + } + + fn set_busy(&mut self) { + self.wait_until = max( + self.wait_until, + Some(Utc::now() + Duration::seconds(self.time_until_effective_seconds as i64)), + ); + } +} + +enum ActorStateType { + Switching(SwitchingActorState), + Regulating(RegulatingActorState), +} +struct SwitchingActorState { switch: Box, on: bool, - actor_mode: ActorMode, - last_updated: Option>, + power: usize, } -#[async_trait::async_trait] -trait PowerSwitch { - async fn on(&mut self); - async fn off(&mut self); - async fn set_power(&mut self, power: isize); +struct RegulatingActorState { + regulator: Box, + max_power: usize, } impl ActorState { - fn from_configuration(config: &ActorConfiguration) -> Self { - let switch: Box = match &config.actor { - ActorType::HS100(hs100) => Box::new(HS100Switch { - address: hs100.address.clone(), - }), - ActorType::Tasmota(tasmota) => Box::new(TasmotaSwitch { - url: tasmota.url.clone(), - }), - ActorType::Ahoy(ahoy) => Box::new(AhoyDtu { - url: ahoy.url.clone(), - upper_limit_watts: ahoy.upper_limit_watts, - inverter_no: ahoy.inverter_no, - current_watts: 0, - }), - ActorType::OpenDtu(opendtu) => Box::new(OpenDtu { - url: opendtu.url.clone(), - password: opendtu.password.clone(), - serial: opendtu.serial.clone(), - max_power: opendtu.max_power, - current_watts: 0, - upper_limit_watts: opendtu.upper_limit_watts, - }), - }; - Self { - disable_threshold: config.disable_threshold, - enable_threshold: config.enable_threshold, - duration_seconds: config.duration_seconds, - last_set: None, - last_updated: None, - switch, - on: false, - actor_mode: config.actor_mode.clone(), + fn is_active(&self) -> bool { + match &self.special_state { + ActorStateType::Switching(switching_actor_state) => switching_actor_state.on, + ActorStateType::Regulating(regulating_actor_state) => { + regulating_actor_state.regulator.power() > 0 + } } } -} - -pub(crate) async fn control_actors(rx: &mut Receiver, config: &Configuration) { - let mut devs = config - .actors - .iter() - .map(|actor| ActorState::from_configuration(&actor)) - .collect::>(); - - if devs.len() == 0 { - return; - } - for dev in devs.iter_mut() { - dev.switch.off().await; - dev.switch.set_power(0).await; + async fn turn_off(&mut self) { + let dev = &mut self.special_state; + match dev { + ActorStateType::Switching(switching_actor_state) => { + switching_actor_state.switch.off().await + } + ActorStateType::Regulating(regulating_actor_state) => { + regulating_actor_state.regulator.change_power(0).await + } + } } - while let Some(received) = rx.recv().await { - let dev = get_actor(&mut devs); - let ActorState { - disable_threshold, - enable_threshold, - duration_seconds, - last_set, - switch, - actor_mode, - on, - last_updated, - } = dev; - - let should_be_on = compute_actor_state( - *on, - received, - *enable_threshold, - *disable_threshold, - actor_mode.clone(), - ); - let now = chrono::Utc::now(); - - if should_be_on != *on { - if let Some(last_set_inner) = last_set { - let diff = now - *last_set_inner; - if diff > Duration::seconds(*duration_seconds as i64) { - *on = should_be_on; - *last_set = Some(now.clone()); - if *on { - let _ = switch.on().await; - } else { - let _ = switch.off().await; - } + async fn increase_effect_by(&mut self, power: isize) -> usize { + if self.is_busy() { + return 0; + } + match &mut self.special_state { + ActorStateType::Switching(state) => { + if state.power as isize > power { + return 0; } - } else { - *on = should_be_on; - *last_set = Some(chrono::Utc::now()); - if *on { - let _ = switch.on().await; + if state.on { + return 0; + } + + state.switch.on().await; + state.on = true; + + let power = state.power; + self.set_busy(); + power + } + ActorStateType::Regulating(state) => { + let remaining_potential = + state.max_power as isize - state.regulator.power() as isize; + if remaining_potential <= 0 { + 0 } else { - let _ = switch.off().await; + state.regulator.change_power(power).await; + self.set_busy(); + min(remaining_potential, power) as usize } } } + } - if let Some(last_updated_inner) = last_updated { - let diff = now - *last_updated_inner; - if diff > Duration::seconds(*duration_seconds as i64) { - *last_updated = Some(now); - let _ = switch.set_power(received as isize).await; - } else { + async fn reduce_effect_by(&mut self, power: usize) -> usize { + if self.is_active() { + match &mut self.special_state { + ActorStateType::Switching(state) => { + state.switch.off().await; + state.on = false; + let power = state.power; + self.set_busy(); + power + } + ActorStateType::Regulating(state) => { + let remaining_potential = + state.max_power as isize - state.regulator.power() as isize; + state.regulator.change_power(-(power as isize)).await; + self.set_busy(); + min(remaining_potential, power as isize) as usize + } } } else { - *last_updated = Some(now); - let _ = switch.set_power(received as isize).await; + 0 } } } -fn get_actor(devs: &mut Vec) -> &mut ActorState { - devs.sort_by(|x, y| (&x.actor_mode).cmp(&y.actor_mode)); - let index = devs.iter().position(|d| d.actor_mode == ActorMode::Charge); - if let Some(index) = index { - let (dischargers, chargers) = devs.split_at_mut(index); - - if dischargers.len() > 0 && dischargers.iter().any(|d| d.on) { - return get_random_element(dischargers).unwrap(); - } - if chargers.len() > 0 && chargers.iter().any(|d| d.on) { - return get_random_element(chargers).unwrap(); - } - - if dischargers.len() > 0 && rand::random::() { - return get_random_element(dischargers).unwrap(); - } - return get_random_element(chargers).unwrap(); - } else { - return get_random_element(devs).unwrap(); - } +#[async_trait::async_trait] +trait PowerSwitch { + async fn on(&mut self); + async fn off(&mut self); } -fn get_random_element(arr: &mut [T]) -> Option<&mut T> { - if arr.len() == 0 { - return None; - } - let random_number = rand::random::() % arr.len(); - let dev = arr.get_mut(random_number); - dev +#[async_trait::async_trait] +trait Regulator { + async fn change_power(&mut self, power: isize); + fn power(&self) -> usize; } -fn compute_actor_state( - on: bool, - received: i32, - enable_threshold: isize, - disable_threshold: isize, - actor_mode: ActorMode, -) -> bool { - match actor_mode { - ActorMode::Charge => { - if !on { - received < enable_threshold as i32 - } else { - !(received > disable_threshold as i32) +impl ActorState { + fn from_configuration(config: &ActorConfiguration) -> Self { + match config { + ActorConfiguration::Switching(switching_actor_configuration) => { + match &switching_actor_configuration.actor { + SwitchingActorType::HS100(hs100_configuration) => { + let switch = Box::new(HS100Switch { + address: hs100_configuration.address.clone(), + }); + ActorState { + time_until_effective_seconds: switching_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Switching(SwitchingActorState { + switch: switch, + on: false, + power: switching_actor_configuration.power, + }), + } + } + + SwitchingActorType::Tasmota(tasmota_configuration) => { + let switch: Box = Box::new(TasmotaSwitch { + url: tasmota_configuration.url.clone(), + }); + ActorState { + time_until_effective_seconds: switching_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Switching(SwitchingActorState { + switch: switch, + on: false, + power: switching_actor_configuration.power, + }), + } + } + } } - } - ActorMode::Discharge => { - if !on { - received > enable_threshold as i32 - } else { - !(received < disable_threshold as i32) + ActorConfiguration::Regulating(regulating_actor_configuration) => { + match ®ulating_actor_configuration.actor { + RegulatingActorType::Ahoy(ahoy_configuration) => { + let regulator = Box::new(AhoyDtu { + url: ahoy_configuration.url.clone(), + upper_limit_watts: ahoy_configuration.upper_limit_watts, + inverter_no: ahoy_configuration.inverter_no, + current_watts: 0, + }); + ActorState { + time_until_effective_seconds: regulating_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Regulating(RegulatingActorState { + regulator: regulator, + max_power: regulating_actor_configuration.max_power, + }), + } + } + RegulatingActorType::OpenDtu(open_dtu_configuration) => { + let regulator = Box::new(OpenDtu { + url: open_dtu_configuration.url.clone(), + upper_limit_watts: open_dtu_configuration.upper_limit_watts, + current_watts: 0, + serial: open_dtu_configuration.serial.clone(), + max_power: open_dtu_configuration.max_power, + password: open_dtu_configuration.password.clone(), + }); + ActorState { + time_until_effective_seconds: regulating_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Regulating(RegulatingActorState { + regulator: regulator, + max_power: regulating_actor_configuration.max_power, + }), + } + } + RegulatingActorType::EZ1M(ez1m_configuration) => { + let regulator = Box::new(EZ1M { + url: ez1m_configuration.url.clone(), + upper_limit_watts: ez1m_configuration.upper_limit_watts, + current_watts: 0, + }); + ActorState { + time_until_effective_seconds: regulating_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Regulating(RegulatingActorState { + regulator: regulator, + max_power: regulating_actor_configuration.max_power, + }), + } + } + RegulatingActorType::MarstekCharge(marstek_configuration) => { + let regulator = Box::new(MarstekCharge { + url: marstek_configuration.url.clone(), + upper_limit_watts: marstek_configuration.upper_limit_watts, + current_watts: 0, + }); + ActorState { + time_until_effective_seconds: regulating_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Regulating(RegulatingActorState { + regulator: regulator, + max_power: regulating_actor_configuration.max_power, + }), + } + } + RegulatingActorType::MarstekDischarge(marstek_configuration) => { + let regulator = Box::new(MarstekDischarge { + url: marstek_configuration.url.clone(), + upper_limit_watts: marstek_configuration.upper_limit_watts, + current_watts: 0, + }); + ActorState { + time_until_effective_seconds: regulating_actor_configuration + .time_until_effective_seconds, + wait_until: None, + + special_state: ActorStateType::Regulating(RegulatingActorState { + regulator: regulator, + max_power: regulating_actor_configuration.max_power, + }), + } + } + } } } } } -#[cfg(test)] -mod test { - use crate::ActorMode; - - use super::*; - - #[test] - pub fn charges_if_below_threshold() { - let should_be_on = compute_actor_state(false, -100, -50, 100, ActorMode::Charge); - assert_eq!(should_be_on, true); - } - - #[test] - pub fn does_nothing_if_charging_and_if_below_threshold() { - let should_be_on = compute_actor_state(true, -100, -50, 100, ActorMode::Charge); - assert_eq!(should_be_on, true); - } - - #[test] - pub fn stays_on_if_charging_and_if_between_thresholds() { - let should_be_on = compute_actor_state(true, -30, -50, 100, ActorMode::Charge); - assert_eq!(should_be_on, true); - } - - #[test] - pub fn stops_charging_if_above_threshold() { - let should_be_on = compute_actor_state(true, 200, -50, 100, ActorMode::Charge); - assert_eq!(should_be_on, false); - } +enum SystemState { + AllOff, + Producing, + Consuming, +} - #[test] - pub fn stays_not_charging_if_above_threshold() { - let should_be_on = compute_actor_state(false, 200, -50, 100, ActorMode::Charge); - assert_eq!(should_be_on, false); +fn compute_system_state(producers: &[ActorState], consumers: &[ActorState]) -> SystemState { + if producers.iter().any(|p| p.is_active()) { + SystemState::Producing + } else if consumers.iter().any(|p| p.is_active()) { + SystemState::Consuming + } else { + SystemState::AllOff } +} - #[test] - pub fn discharges_if_above_threshold() { - let should_be_on = compute_actor_state(false, 200, 100, 0, ActorMode::Discharge); - assert_eq!(should_be_on, true); - } +pub(crate) async fn control_actors( + rx: &mut Receiver<(i32, DateTime)>, + config: &Configuration, +) { + let mut producers = config + .actors + .producers + .iter() + .map(|actor| ActorState::from_configuration(&actor)) + .collect::>(); - #[test] - pub fn keeps_on_discharging_if_above_threshold() { - let should_be_on = compute_actor_state(true, 200, 100, 0, ActorMode::Discharge); - assert_eq!(should_be_on, true); - } + let mut consumers = config + .actors + .consumers + .iter() + .map(|actor| ActorState::from_configuration(&actor)) + .collect::>(); - #[test] - pub fn stays_on_if_discharging_and_if_between_thresholds() { - let should_be_on = compute_actor_state(true, 50, 100, 0, ActorMode::Discharge); - assert_eq!(should_be_on, true); - } + let margin = (config.lower_limit + config.upper_limit) / 2; - #[test] - pub fn stops_discharging_if_below_threshold() { - let should_be_on = compute_actor_state(false, -100, 100, 0, ActorMode::Discharge); - assert_eq!(should_be_on, false); + if producers.len() == 0 && consumers.len() == 0 { + return; } - #[test] - pub fn stays_not_discharging_if_below_threshold() { - let should_be_on = compute_actor_state(false, -100, 100, 0, ActorMode::Discharge); - assert_eq!(should_be_on, false); + for dev in producers.iter_mut() { + dev.turn_off().await; } - struct DummyActor; - - #[async_trait::async_trait] - impl PowerSwitch for DummyActor { - async fn on(&mut self) {} - - async fn off(&mut self) {} - async fn set_power(&mut self, _: isize) {} + for dev in consumers.iter_mut() { + dev.turn_off().await; } - #[test] - pub fn returns_a_charger_if_a_charger_is_turned_on() { - let mut devs = vec![ - actor(ActorMode::Charge, true), - actor(ActorMode::Discharge, false), - ]; - let result = get_actor(&mut devs); + while let Some((received, timestamp)) = rx.recv().await { + let system_state = compute_system_state(&producers, &consumers); + let received = received as isize; - assert_eq!(result.on, true); - } + // Skip events that are too old + if timestamp > Utc::now() + Duration::seconds(5) { + continue; + } + match system_state { + SystemState::AllOff => { + if received > config.upper_limit { + let mut difference_left = (received - margin) as isize; + for producer in producers.iter_mut() { + if difference_left <= 0 { + break; + } + let actual_effect = + producer.increase_effect_by(difference_left).await as isize; + difference_left -= actual_effect; + } + } + if received < config.lower_limit { + let mut difference_left = -(received - margin) as isize; + for consumer in consumers.iter_mut() { + if difference_left <= 0 { + break; + } + let actual_effect = + consumer.increase_effect_by(difference_left).await as isize; + difference_left -= actual_effect; + } + } + } + SystemState::Producing => { + if received < config.lower_limit { + let mut difference_left = -(received - margin) as isize; + for producer in producers.iter_mut().rev() { + if difference_left <= 0 { + break; + } + let actual_effect = + producer.reduce_effect_by(difference_left as usize).await as isize; + difference_left -= actual_effect; + } + } - #[test] - pub fn returns_a_discharger_if_a_discharger_is_turned_on() { - let mut devs = vec![ - actor(ActorMode::Charge, false), - actor(ActorMode::Discharge, true), - ]; - let result = get_actor(&mut devs); + if received > config.upper_limit { + let mut difference_left = (received - margin) as isize; + for producer in producers.iter_mut() { + if difference_left < 0 { + break; + } + let actual_effect = + producer.increase_effect_by(difference_left).await as isize; + difference_left -= actual_effect; + } + } + } + SystemState::Consuming => { + if received > config.upper_limit { + let mut difference_left = received - margin; + for consumer in consumers.iter_mut().rev() { + if difference_left <= 0 { + break; + } + + let actual_effect = + consumer.reduce_effect_by(difference_left as usize).await as isize; + difference_left -= actual_effect; + } + } - assert_eq!(result.actor_mode, ActorMode::Discharge); - assert_eq!(result.on, true); - } - #[test] - pub fn returns_any_turned_off_element_if_no_actor_is_turned_on() { - let mut devs = vec![ - actor(ActorMode::Charge, false), - actor(ActorMode::Discharge, false), - ]; - let result = get_actor(&mut devs); - - assert_eq!(result.on, false); - } + if received < config.lower_limit { + let mut difference_left = -(received - margin) as isize; + for consumer in consumers.iter_mut() { + if difference_left <= 0 { + break; + } - fn actor(actor_mode: ActorMode, on: bool) -> ActorState { - ActorState { - disable_threshold: 100, - enable_threshold: -100, - duration_seconds: 360, - last_set: None, - switch: Box::new(DummyActor), - on, - actor_mode, - last_updated: None, + let actual_effect = + consumer.increase_effect_by(difference_left).await as isize; + difference_left -= actual_effect; + } + } + } } } } diff --git a/server/src/actors/opendtu.rs b/server/src/actors/opendtu.rs index 9fa9118..5d36f8a 100644 --- a/server/src/actors/opendtu.rs +++ b/server/src/actors/opendtu.rs @@ -1,9 +1,13 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + cmp::{max, min}, + collections::HashMap, + time::Duration, +}; use reqwest::Url; use serde::Serialize; -use super::PowerSwitch; +use crate::actors::Regulator; pub(crate) struct OpenDtu { pub(crate) serial: String, @@ -16,27 +20,18 @@ pub(crate) struct OpenDtu { } #[async_trait::async_trait] -impl PowerSwitch for OpenDtu { - async fn on(&mut self) {} - - async fn off(&mut self) { - self.set_power(0).await; +impl Regulator for OpenDtu { + async fn change_power(&mut self, power: isize) { + let target = (power + self.current_watts as isize) as isize; + let target = max(0, target); + let target = min(target, self.upper_limit_watts as isize); + + self.current_watts = target as usize; + self.set_absolute(target as usize).await; } - async fn set_power(&mut self, power: isize) { - let target = power + self.current_watts as isize; - let target = if target > self.upper_limit_watts as isize { - self.upper_limit_watts - } else { - if target > 0 { - target as usize - } else { - 0 - } - }; - self.set_absolute(target).await; - - self.current_watts = target; + fn power(&self) -> usize { + self.current_watts } } diff --git a/server/src/actors/tasmota.rs b/server/src/actors/tasmota.rs index 0706835..e0cb290 100644 --- a/server/src/actors/tasmota.rs +++ b/server/src/actors/tasmota.rs @@ -1,4 +1,5 @@ use reqwest::Url; +use tokio::task; use super::PowerSwitch; @@ -11,14 +12,12 @@ impl PowerSwitch for TasmotaSwitch { async fn on(&mut self) { let mut url = Url::parse(&self.url).unwrap().join("cm").unwrap(); url.query_pairs_mut().append_pair("cmnd", "Power on"); - let _ = reqwest::get(url).await; + task::spawn(reqwest::get(url)); } async fn off(&mut self) { let mut url = Url::parse(&self.url).unwrap().join("cm").unwrap(); url.query_pairs_mut().append_pair("cmnd", "Power off"); - let _ = reqwest::get(url).await; + task::spawn(reqwest::get(url)); } - - async fn set_power(&mut self, _: isize) {} } diff --git a/server/src/business/mod.rs b/server/src/business/mod.rs index 1477bcc..c393d5c 100644 --- a/server/src/business/mod.rs +++ b/server/src/business/mod.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use hackdose_server_shared::DataPoint; use hackdose_sml_parser::application::{ domain::{AnyValue, SmlMessages}, @@ -13,7 +13,7 @@ use tokio_stream::StreamExt; use crate::{data::EnergyData, smart_meter::body::find_watts}; pub(crate) async fn handle_power_events( - tx: &mut Sender, + tx: &mut Sender<(i32, DateTime)>, mutex: Arc>>, mut power_events: impl Stream + Unpin + Send + 'static, mut energy_data: EnergyData, @@ -23,14 +23,12 @@ pub(crate) async fn handle_power_events( match watts { Some(watts) => { - let data = DataPoint { - date: Utc::now(), - value: watts, - }; + let date = Utc::now(); + let data = DataPoint { date, value: watts }; energy_data.put(data).await; energy_data.log_data(data).await; - tx.send(watts).await.unwrap(); + tx.send((watts, date)).await.unwrap(); } None => {} } diff --git a/server/src/data/mod.rs b/server/src/data/mod.rs index c7caeff..dc66275 100644 --- a/server/src/data/mod.rs +++ b/server/src/data/mod.rs @@ -67,7 +67,7 @@ impl EnergyData { let data = File::open(config.log_location.clone()).await; match data { Ok(data) => { - let mut rdr = BufReader::new(data).lines(); + let mut rdr = BufReader::with_capacity(1024 * 1024, data).lines(); while let Ok(Some(line)) = rdr.next_line().await { let l = line .split(";") diff --git a/server/src/main.rs b/server/src/main.rs index 125cc52..aed5ca5 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,5 @@ use business::handle_power_events; +use chrono::{DateTime, Utc}; use clap::Parser; use data::EnergyData; use hackdose_sml_parser::application::domain::AnyValue; @@ -24,26 +25,38 @@ mod rest; mod smart_meter; #[derive(Serialize, Deserialize, Clone)] -struct ActorConfiguration { - actor: ActorType, - disable_threshold: isize, - enable_threshold: isize, - duration_seconds: usize, - actor_mode: ActorMode, +struct SwitchingActorConfiguration { + actor: SwitchingActorType, + time_until_effective_seconds: usize, + power: usize, } -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] -enum ActorMode { - Discharge, - Charge, +#[derive(Serialize, Deserialize, Clone)] +struct RegulatingActorConfiguration { + actor: RegulatingActorType, + time_until_effective_seconds: usize, + max_power: usize, +} + +#[derive(Serialize, Deserialize, Clone)] +enum ActorConfiguration { + Switching(SwitchingActorConfiguration), + Regulating(RegulatingActorConfiguration), } #[derive(Serialize, Deserialize, Clone)] -enum ActorType { +enum SwitchingActorType { HS100(HS100Configuration), Tasmota(TasmotaConfiguration), +} + +#[derive(Serialize, Deserialize, Clone)] +enum RegulatingActorType { Ahoy(AhoyConfiguration), OpenDtu(OpenDtuConfiguration), + MarstekCharge(MarstekConfiguration), + MarstekDischarge(MarstekConfiguration), + EZ1M(EZ1MConfiguration), } #[derive(Serialize, Deserialize, Clone)] struct HS100Configuration { @@ -71,13 +84,34 @@ struct OpenDtuConfiguration { upper_limit_watts: usize, } +#[derive(Serialize, Deserialize, Clone)] +struct EZ1MConfiguration { + url: String, + upper_limit_watts: usize, +} + +#[derive(Serialize, Deserialize, Clone)] + +pub(crate) struct MarstekConfiguration { + pub(crate) url: String, + pub(crate) upper_limit_watts: usize, +} + #[derive(Serialize, Deserialize, Clone)] pub(crate) struct Configuration { - actors: Vec, + actors: Actors, log_location: PathBuf, gpio_location: Option, ttys_location: String, gpio_power_pin: u32, + lower_limit: isize, + upper_limit: isize, +} + +#[derive(Serialize, Deserialize, Clone)] +pub(crate) struct Actors { + consumers: Vec, + producers: Vec, } #[derive(Parser, Debug)] @@ -110,7 +144,7 @@ async fn main() { let stream = uart_ir_sensor_data_stream(&config); let power_events = sml_message_stream(stream); - let (mut tx, mut rx) = tokio::sync::mpsc::channel::(100); + let (mut tx, mut rx) = tokio::sync::mpsc::channel::<(i32, DateTime)>(100); let mutex = Arc::new(tokio::sync::Mutex::new(HashMap::::new())); let power_event_mutex = mutex.clone();