Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect if response is not received from MQTT broker for specified duration. #41

Closed
AfiqAmmar opened this issue Jul 4, 2023 · 8 comments

Comments

@AfiqAmmar
Copy link

AfiqAmmar commented Jul 4, 2023

First, thank you for building this package, it really helps me in my laravel development.

I use this library to subscribe and receive payloads from my MQTT broker that publish new payload every 0.4 seconds in the command console. My question is, "Is there a way to detect if the response is not receive from the MQTT broker for a specified duration?"

I tried using timeout for the last response received and put it in the resgisterLoopEventHandler function but it does not work. Here is my current code:

try{
    $mqtt = MQTT::connection();
    $mqtt->subscribe($topic, function (string $topic, string $message) use (&$lastPayloadTime, $mqtt){
        $lastPayloadTime = Carbon::now();
        echo $lastPayloadTime . "\n";
        echo sprintf('Received QoS level 1 message on topic [%s]: %s', $topic, $message) . "\n";
        $mqtt->interrupt();
    }, 0);

    $mqtt->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use (&$lastPayloadTime, $mqtt, $timeout) {
        if ($lastPayloadTime->diffInSeconds(Carbon::now()) > $timeout) {
            echo "Payload not received within $timeoutDuration seconds.\n";
            $client->interrupt();
        }
    });

    $mqtt->loop();
    
    $mqtt->disconnect();

}catch (MqttClientException $e) {
    echo $e;
}

I really appreciate your help on this. Thank you

@Namoshek
Copy link
Collaborator

Namoshek commented Jul 4, 2023

Since $lastPayloadTime is initially not set, the if in the loop event handler is potentially an issue. Try to invert the call:

if (Carbon::now()->diffInSeconds($lastPayloadTime) > $timeout) { ... }

Also make sure $timeout is defined (above code does not contain it).


Maybe it is enough to use $elapsedTime passed to the loop event handler though? It contains the fractional seconds since the loop was started.

@AfiqAmmar
Copy link
Author

AfiqAmmar commented Jul 5, 2023

Thank you for the reply.

Here is my updated code:

$topic = 'topic';
$timeout = 5;

try{
    $mqtt = MQTT::connection();
    $mqtt->subscribe($topic, function (string $topic, string $message) use (&$lastPayloadTime, $mqtt){
        $lastPayloadTime = Carbon::now();
        echo $lastPayloadTime . "\n";
        echo sprintf('Received QoS level 1 message on topic [%s]: %s', $topic, $message) . "\n";
        // $mqtt->interrupt();
    }, 0);

    $mqtt->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use (&$lastPayloadTime, $mqtt, $timeout) {
        if ($elapsedTime > $timeout) {
            echo "Payload not received within $elapsedTime seconds.\n";
            $client->interrupt();
        }
    });

    $mqtt->loop();
    
    $mqtt->disconnect();

}catch (MqttClientException $e) {
    echo $e;
}

I have tried using the $elapsedTime to detect if message is not received after 5 seconds but it does not work. And I also have tried using $lastPayloadTime and set it initially but it also does not work. It seems like the loop is not exited if the payload is not receive, the loop will only be exited if the MQQT broker is close. Is there a way to detect if the message is not received after specified duration?

Btw, I comment the $mqtt->interrupt() because it will exit the process after receiving one message. It

@Namoshek
Copy link
Collaborator

Namoshek commented Jul 5, 2023

This code works perfectly fine for me and interrupts after 5 seconds as expected. What is your operating system, PHP version and version of php-mqtt/client as well as php-mqtt/laravel-client (according to composer show)?

However, you can cleanup the code a bit. You don't need all of the references:

$topic = 'topic';
$timeout = 5;

try{
    $mqtt = MQTT::connection();

    $mqtt->subscribe($topic, function (string $topic, string $message) use ($mqtt) {
        echo sprintf('Received QoS level 0 message on topic [%s]: %s', $topic, $message) . PHP_EOL;
        $mqtt->interrupt();
    }, 0);

    $mqtt->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use ($timeout) {
        if ($elapsedTime < $timeout) {
            return;
        }

        echo sprintf('Payload not received within %s seconds.', $elapsedTime) . PHP_EOL;
        $client->interrupt();
    });

    $mqtt->loop(true);
    
    $mqtt->disconnect();
} catch (MqttClientException $e) {
    echo $e->getMessage();
}

@AfiqAmmar
Copy link
Author

The code also works fine for me. However, after receiving one message, it exit the loop. I want to receive the messages continuously and if the message is not received for more than 5 seconds, it will exit the loop and give alert. And if it receive the messages back, it will enter the loop again. That is why on my previous code, I commented the $mqtt->interrupt() because I want to continuously receive the messages.

My MQTT Broker will publish message every 0.5 seconds and I will receive the message every 0.5 seconds as well.

My OS is Windows 10, PHP version is 8.2.4, php-mqtt/laravel-client is 1.2. I do not have php-mqtt/client, is it required?

@Namoshek
Copy link
Collaborator

Namoshek commented Jul 6, 2023

Oh ok, then I misunderstood your request. In this case the $elapsedTime will not suffice and you do in fact need an additional variable to hold the time of the last retrieved messages:

$topic = 'topic';
$timeout = 5;
$lastMessageReceivedAt = null;

try{
    $mqtt = MQTT::connection();

    $mqtt->subscribe($topic, function (string $topic, string $message) use ($mqtt, &$lastMessageReceivedAt) {
        $lastMessageReceivedAt = microtime(true);
        echo sprintf('Received QoS level 0 message on topic [%s]: %s', $topic, $message) . PHP_EOL;
    }, 0);

    $mqtt->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) use ($timeout, &$lastMessageReceivedAt) {
        if ($lastMessageReceivedAt === null || $lastMessageReceivedAt + $timeout > microtime(true)) {
            return;
        }

        echo sprintf('Payload not received within %s seconds.', $elapsedTime) . PHP_EOL;
        $client->interrupt();
    });

    $mqtt->loop(true);
    
    $mqtt->disconnect();
} catch (MqttClientException $e) {
    echo $e->getMessage();
}

@AfiqAmmar
Copy link
Author

The code works perfectly fine, thank you so much.

Whenever the payload is not received after 5 seconds, it will exit the process and I have to run the command again to receive payloads. I want to ask if there is a way to prevent the process from being exited and the command keeps running to receive payloads and give alert when the payload is not received after 5 seconds and also give alert when payload is received again.

Thank you.

@Namoshek
Copy link
Collaborator

Namoshek commented Jul 7, 2023

That's rather simple to implement as well. You'll need some kind of state, I called it $alertFiring. Since I assume that this alerting should also work before the first message was received, $lastMessageReceivedAt needs to be initialized compared to the previous script.

I've also moved the alerting logic to a message received event handler to keep it separate from the subscription. This makes it a bit easier to understand, but it means that the alerting is applied to all subscriptions in total. If this is not desired, you can move the logic to your subscription callback as well.

$topic = 'topic';
$timeout = 5;
$lastMessageReceivedAt = microtime(true);
$alertFiring = false;

try{
    $mqtt = MQTT::connection();

    $mqtt->subscribe($topic, function (string $topic, string $message) use ($mqtt) {
        echo sprintf('Received QoS level 0 message on topic [%s]: %s', $topic, $message) . PHP_EOL;
    }, 0);

    $mqtt->registerLoopEventHandler(function () use ($timeout, &$lastMessageReceivedAt, &$alertFiring) {
        if ($alertFiring || $lastMessageReceivedAt + $timeout > microtime(true)) {
            return;
        }

        echo sprintf('Payload not received within %s seconds.', $timeout) . PHP_EOL;
        $alertFiring = true;
    });

    $mqtt->registerMessageReceivedEventHandler(function () use (&$lastMessageReceivedAt, &$alertFiring) {
        $lastMessageReceivedAt = microtime(true);

        if ($alertFiring) {
            echo sprintf('Started to receive some data again.');
            $alertFiring = false;
        }
    });

    $mqtt->loop(true);
    
    $mqtt->disconnect();
} catch (MqttClientException $e) {
    echo $e->getMessage();
}

@AfiqAmmar
Copy link
Author

It works perfectly, thank you so much :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants