From 7978a015dfe523d7e6ce01d4c2fd409f793f3d1c Mon Sep 17 00:00:00 2001 From: Sergei Jeihala Date: Sun, 17 Mar 2024 18:42:55 +0200 Subject: [PATCH] Processing delay added Added setting to have delayed processing for repeating values. --- modules/mqtt/mqtt.class.php | 53 +++++++++++++++++++----------------- scripts/cycle_mqtt.php | 54 ++++++++++++++++++++++++------------- 2 files changed, 64 insertions(+), 43 deletions(-) diff --git a/modules/mqtt/mqtt.class.php b/modules/mqtt/mqtt.class.php index cd3389d..103b85f 100644 --- a/modules/mqtt/mqtt.class.php +++ b/modules/mqtt/mqtt.class.php @@ -409,13 +409,14 @@ function processMessage($path, $value) $rec['UPDATED'] = date('Y-m-d H:i:s'); SQLInsert('mqtt', $rec); } else { - /* Update values in db */ - $rec['VALUE'] = $value . ''; - $rec['UPDATED'] = date('Y-m-d H:i:s'); - SQLUpdate('mqtt', $rec); if (!$rec['ONLY_NEW_VALUE'] || $rec['VALUE'] <> $old_value) { + /* Update values in db */ + $rec['VALUE'] = $value . ''; + $rec['UPDATED'] = date('Y-m-d H:i:s'); + SQLUpdate('mqtt', $rec); + /* Update property in linked object if it exist */ if ($rec['LINKED_OBJECT'] && $rec['LINKED_PROPERTY']) { if ($rec['REPLACE_LIST'] != '') { @@ -431,11 +432,14 @@ function processMessage($path, $value) } setGlobal($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_PROPERTY'], $value, array('mqtt' => '0')); } + + if ($rec['LINKED_OBJECT'] && $rec['LINKED_METHOD'] && + !(strtolower($rec['LINKED_PROPERTY']) == 'status' && strtolower($rec['LINKED_METHOD']) == 'switch')) { + callMethod($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_METHOD'], array('VALUE' => $rec['VALUE'], 'NEW_VALUE' => $rec['VALUE'], 'OLD_VALUE' => $old_value)); + } + } - if ($rec['LINKED_OBJECT'] && $rec['LINKED_METHOD'] && - !(strtolower($rec['LINKED_PROPERTY']) == 'status' && strtolower($rec['LINKED_METHOD']) == 'switch')) { - callMethod($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_METHOD'], array('VALUE' => $rec['VALUE'], 'NEW_VALUE' => $rec['VALUE'], 'OLD_VALUE' => $old_value)); - } + } } @@ -476,6 +480,7 @@ function admin(&$out) $out['MQTT_HOST'] = $this->config['MQTT_HOST']; $out['MQTT_PORT'] = $this->config['MQTT_PORT']; $out['MQTT_QUERY'] = $this->config['MQTT_QUERY']; + $out['MQTT_DELAY'] = $this->config['MQTT_DELAY']; $out['MQTT_WRITE_METHOD'] = isset($this->config['MQTT_WRITE_METHOD']) ? (int)$this->config['MQTT_WRITE_METHOD'] : 0; $out['MQTT_STRIPMODE'] = isset($this->config['MQTT_STRIPMODE']) ? $this->config['MQTT_STRIPMODE'] : 0; @@ -494,26 +499,24 @@ function admin(&$out) $out['MQTT_AUTH'] = $this->config['MQTT_AUTH']; if ($this->view_mode == 'update_settings') { - global $mqtt_client; - global $mqtt_host; - global $mqtt_username; - global $mqtt_password; - global $mqtt_auth; - global $mqtt_port; - global $mqtt_query; - global $mqtt_stripmode; - - - $this->config['MQTT_CLIENT'] = trim($mqtt_client); - $this->config['MQTT_HOST'] = trim($mqtt_host); - $this->config['MQTT_USERNAME'] = trim($mqtt_username); - $this->config['MQTT_PASSWORD'] = trim($mqtt_password); - $this->config['MQTT_AUTH'] = (int)$mqtt_auth; - $this->config['MQTT_PORT'] = (int)$mqtt_port; - $this->config['MQTT_QUERY'] = trim($mqtt_query); + + $this->config['MQTT_CLIENT'] = gr('mqtt_client'); + $this->config['MQTT_HOST'] = gr('mqtt_host'); + $this->config['MQTT_USERNAME'] = gr('mqtt_username'); + $this->config['MQTT_PASSWORD'] = gr('mqtt_password'); + $this->config['MQTT_AUTH'] = gr('mqtt_auth','int'); + $this->config['MQTT_PORT'] = gr('mqtt_port','int'); + $this->config['MQTT_QUERY'] = gr('mqtt_query'); $this->config['MQTT_WRITE_METHOD'] = gr('mqtt_write_method', 'int'); $this->config['MQTT_STRIPMODE'] = gr('mqtt_stripmode', 'int'); + $mqtt_delay = gr('mqtt_delay'); + if ($mqtt_delay === '') { + unset($this->config['MQTT_DELAY']); + } else { + $this->config['MQTT_DELAY'] = (int)$mqtt_delay; + } + $this->saveConfig(); setGlobal('cycle_mqttControl', 'restart'); diff --git a/scripts/cycle_mqtt.php b/scripts/cycle_mqtt.php index bde9686..7643b2f 100644 --- a/scripts/cycle_mqtt.php +++ b/scripts/cycle_mqtt.php @@ -59,15 +59,21 @@ $stripmode = 0; } +if (isset($mqtt->config['MQTT_DELAY'])) { + $mqtt_delay = $mqtt->config['MQTT_DELAY']; +} else { + $mqtt_delay = 5; +} + $mqtt_client = new Bluerhinos\phpMQTT($host, $port, $client_name); if ($mqtt->config['MQTT_AUTH']) { - $connect = $mqtt_client->connect(true, NULL, $username, $password); + $connect = $mqtt_client->connect(true, NULL, $username, $password); if (!$connect) { exit(1); } } else { - $connect = $mqtt_client->connect(); + $connect = $mqtt_client->connect(); if (!$connect) { exit(1); } @@ -91,21 +97,21 @@ while ($mqtt_client->proc()) { - if ($mqtt->config['MQTT_WRITE_METHOD']==2) { + if ($mqtt->config['MQTT_WRITE_METHOD'] == 2) { $queue = checkOperationsQueue('mqtt_queue'); foreach ($queue as $mqtt_data) { - $topic=$mqtt_data['DATANAME']; - $data_value=json_decode($mqtt_data['DATAVALUE'],true); - $value=$data_value['v']; - $qos=0; + $topic = $mqtt_data['DATANAME']; + $data_value = json_decode($mqtt_data['DATAVALUE'], true); + $value = $data_value['v']; + $qos = 0; if (isset($data_value['q'])) { - $qos=$data_value['q']; + $qos = $data_value['q']; } - $retain=0; + $retain = 0; if (isset($data_value['r'])) { - $retain=$data_value['r']; + $retain = $data_value['r']; } - if ($topic!='') { + if ($topic != '') { echo "Publishing to $topic : $value\n"; $mqtt_client->publish($topic, $value, $qos, $retain); } @@ -119,9 +125,9 @@ setGlobal((str_replace('.php', '', basename(__FILE__))) . 'Run', time(), 1); - if (file_exists('./reboot') || IsSet($_GET['onetime'])) { + if (file_exists('./reboot') || isset($_GET['onetime'])) { - $mqtt_client->close(); + $mqtt_client->close(); $db->Disconnect(); exit; } @@ -136,23 +142,35 @@ * @param mixed $msg Message * @return void */ -function procmsg($topic, $msg) { - //$url = BASE_URL . '/ajax/mqtt.html?op=process&topic='.urlencode($topic)."&msg=".urlencode($msg); - //getURLBackground($url); +function procmsg($topic, $msg) +{ + if (!isset($topic) || !isset($msg)) return false; global $stripmode; + global $mqtt_delay; + global $mqtt_repeating_cache; + + if ($mqtt_delay > 0 && $mqtt_repeating_cache[$topic]['msg'] == $msg && (time() - $mqtt_repeating_cache[$topic]['received']) <= $mqtt_delay) { + // processing cached + return false; + } if ($stripmode) { $rec = SQLSelectOne("SELECT ID FROM `mqtt` where `PATH` like '$topic%' and LINKED_OBJECT>''"); - if(!$rec['ID']) { + if (!$rec['ID']) { echo date("Y-m-d H:i:s") . " Ignore received from {$topic} : $msg\n"; return false; } } echo date("Y-m-d H:i:s") . " Received from {$topic} : $msg\n"; + + if ($mqtt_delay > 0) { + $mqtt_repeating_cache[$topic] = array('msg' => $msg, 'received' => time()); + } + if (function_exists('callAPI')) { - callAPI('/api/module/mqtt','GET',array('topic'=>$topic,'msg'=>$msg)); + callAPI('/api/module/mqtt', 'GET', array('topic' => $topic, 'msg' => $msg)); } else { global $mqtt; $mqtt->processMessage($topic, $msg);