Skip to content

Commit

Permalink
Processing delay added
Browse files Browse the repository at this point in the history
Added setting to have delayed processing for repeating values.
  • Loading branch information
sergejey committed Mar 17, 2024
1 parent 922e3ad commit 7978a01
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
53 changes: 28 additions & 25 deletions modules/mqtt/mqtt.class.php
Expand Up @@ -409,13 +409,14 @@ function processMessage($path, $value)
$rec['UPDATED'] = date('Y-m-d H:i:s');
SQLInsert('mqtt', $rec);
} else {
/* Update values in db */
$rec['VALUE'] = $value . '';
$rec['UPDATED'] = date('Y-m-d H:i:s');
SQLUpdate('mqtt', $rec);

if (!$rec['ONLY_NEW_VALUE'] || $rec['VALUE'] <> $old_value) {

/* Update values in db */
$rec['VALUE'] = $value . '';
$rec['UPDATED'] = date('Y-m-d H:i:s');
SQLUpdate('mqtt', $rec);

/* Update property in linked object if it exist */
if ($rec['LINKED_OBJECT'] && $rec['LINKED_PROPERTY']) {
if ($rec['REPLACE_LIST'] != '') {
Expand All @@ -431,11 +432,14 @@ function processMessage($path, $value)
}
setGlobal($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_PROPERTY'], $value, array('mqtt' => '0'));
}

if ($rec['LINKED_OBJECT'] && $rec['LINKED_METHOD'] &&
!(strtolower($rec['LINKED_PROPERTY']) == 'status' && strtolower($rec['LINKED_METHOD']) == 'switch')) {
callMethod($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_METHOD'], array('VALUE' => $rec['VALUE'], 'NEW_VALUE' => $rec['VALUE'], 'OLD_VALUE' => $old_value));
}

}
if ($rec['LINKED_OBJECT'] && $rec['LINKED_METHOD'] &&
!(strtolower($rec['LINKED_PROPERTY']) == 'status' && strtolower($rec['LINKED_METHOD']) == 'switch')) {
callMethod($rec['LINKED_OBJECT'] . '.' . $rec['LINKED_METHOD'], array('VALUE' => $rec['VALUE'], 'NEW_VALUE' => $rec['VALUE'], 'OLD_VALUE' => $old_value));
}


}
}
Expand Down Expand Up @@ -476,6 +480,7 @@ function admin(&$out)
$out['MQTT_HOST'] = $this->config['MQTT_HOST'];
$out['MQTT_PORT'] = $this->config['MQTT_PORT'];
$out['MQTT_QUERY'] = $this->config['MQTT_QUERY'];
$out['MQTT_DELAY'] = $this->config['MQTT_DELAY'];
$out['MQTT_WRITE_METHOD'] = isset($this->config['MQTT_WRITE_METHOD']) ? (int)$this->config['MQTT_WRITE_METHOD'] : 0;
$out['MQTT_STRIPMODE'] = isset($this->config['MQTT_STRIPMODE']) ? $this->config['MQTT_STRIPMODE'] : 0;

Expand All @@ -494,26 +499,24 @@ function admin(&$out)
$out['MQTT_AUTH'] = $this->config['MQTT_AUTH'];

if ($this->view_mode == 'update_settings') {
global $mqtt_client;
global $mqtt_host;
global $mqtt_username;
global $mqtt_password;
global $mqtt_auth;
global $mqtt_port;
global $mqtt_query;
global $mqtt_stripmode;


$this->config['MQTT_CLIENT'] = trim($mqtt_client);
$this->config['MQTT_HOST'] = trim($mqtt_host);
$this->config['MQTT_USERNAME'] = trim($mqtt_username);
$this->config['MQTT_PASSWORD'] = trim($mqtt_password);
$this->config['MQTT_AUTH'] = (int)$mqtt_auth;
$this->config['MQTT_PORT'] = (int)$mqtt_port;
$this->config['MQTT_QUERY'] = trim($mqtt_query);

$this->config['MQTT_CLIENT'] = gr('mqtt_client');
$this->config['MQTT_HOST'] = gr('mqtt_host');
$this->config['MQTT_USERNAME'] = gr('mqtt_username');
$this->config['MQTT_PASSWORD'] = gr('mqtt_password');
$this->config['MQTT_AUTH'] = gr('mqtt_auth','int');
$this->config['MQTT_PORT'] = gr('mqtt_port','int');
$this->config['MQTT_QUERY'] = gr('mqtt_query');
$this->config['MQTT_WRITE_METHOD'] = gr('mqtt_write_method', 'int');
$this->config['MQTT_STRIPMODE'] = gr('mqtt_stripmode', 'int');

$mqtt_delay = gr('mqtt_delay');
if ($mqtt_delay === '') {
unset($this->config['MQTT_DELAY']);
} else {
$this->config['MQTT_DELAY'] = (int)$mqtt_delay;
}

$this->saveConfig();

setGlobal('cycle_mqttControl', 'restart');
Expand Down
54 changes: 36 additions & 18 deletions scripts/cycle_mqtt.php
Expand Up @@ -59,15 +59,21 @@
$stripmode = 0;
}

if (isset($mqtt->config['MQTT_DELAY'])) {
$mqtt_delay = $mqtt->config['MQTT_DELAY'];
} else {
$mqtt_delay = 5;
}

$mqtt_client = new Bluerhinos\phpMQTT($host, $port, $client_name);

if ($mqtt->config['MQTT_AUTH']) {
$connect = $mqtt_client->connect(true, NULL, $username, $password);
$connect = $mqtt_client->connect(true, NULL, $username, $password);
if (!$connect) {
exit(1);
}
} else {
$connect = $mqtt_client->connect();
$connect = $mqtt_client->connect();
if (!$connect) {
exit(1);
}
Expand All @@ -91,21 +97,21 @@
while ($mqtt_client->proc()) {


if ($mqtt->config['MQTT_WRITE_METHOD']==2) {
if ($mqtt->config['MQTT_WRITE_METHOD'] == 2) {
$queue = checkOperationsQueue('mqtt_queue');
foreach ($queue as $mqtt_data) {
$topic=$mqtt_data['DATANAME'];
$data_value=json_decode($mqtt_data['DATAVALUE'],true);
$value=$data_value['v'];
$qos=0;
$topic = $mqtt_data['DATANAME'];
$data_value = json_decode($mqtt_data['DATAVALUE'], true);
$value = $data_value['v'];
$qos = 0;
if (isset($data_value['q'])) {
$qos=$data_value['q'];
$qos = $data_value['q'];
}
$retain=0;
$retain = 0;
if (isset($data_value['r'])) {
$retain=$data_value['r'];
$retain = $data_value['r'];
}
if ($topic!='') {
if ($topic != '') {
echo "Publishing to $topic : $value\n";
$mqtt_client->publish($topic, $value, $qos, $retain);
}
Expand All @@ -119,9 +125,9 @@

setGlobal((str_replace('.php', '', basename(__FILE__))) . 'Run', time(), 1);

if (file_exists('./reboot') || IsSet($_GET['onetime'])) {
if (file_exists('./reboot') || isset($_GET['onetime'])) {

$mqtt_client->close();
$mqtt_client->close();
$db->Disconnect();
exit;
}
Expand All @@ -136,23 +142,35 @@
* @param mixed $msg Message
* @return void
*/
function procmsg($topic, $msg) {
//$url = BASE_URL . '/ajax/mqtt.html?op=process&topic='.urlencode($topic)."&msg=".urlencode($msg);
//getURLBackground($url);
function procmsg($topic, $msg)
{

if (!isset($topic) || !isset($msg)) return false;
global $stripmode;
global $mqtt_delay;
global $mqtt_repeating_cache;

if ($mqtt_delay > 0 && $mqtt_repeating_cache[$topic]['msg'] == $msg && (time() - $mqtt_repeating_cache[$topic]['received']) <= $mqtt_delay) {
// processing cached
return false;
}

if ($stripmode) {
$rec = SQLSelectOne("SELECT ID FROM `mqtt` where `PATH` like '$topic%' and LINKED_OBJECT>''");
if(!$rec['ID']) {
if (!$rec['ID']) {
echo date("Y-m-d H:i:s") . " Ignore received from {$topic} : $msg\n";
return false;
}
}

echo date("Y-m-d H:i:s") . " Received from {$topic} : $msg\n";

if ($mqtt_delay > 0) {
$mqtt_repeating_cache[$topic] = array('msg' => $msg, 'received' => time());
}

if (function_exists('callAPI')) {
callAPI('/api/module/mqtt','GET',array('topic'=>$topic,'msg'=>$msg));
callAPI('/api/module/mqtt', 'GET', array('topic' => $topic, 'msg' => $msg));
} else {
global $mqtt;
$mqtt->processMessage($topic, $msg);
Expand Down

0 comments on commit 7978a01

Please sign in to comment.