Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Not able to retrieve the persisted messages #578

Open
KohliDev opened this issue Dec 12, 2016 · 8 comments
Open

Not able to retrieve the persisted messages #578

KohliDev opened this issue Dec 12, 2016 · 8 comments

Comments

@KohliDev
Copy link

KohliDev commented Dec 12, 2016

Mosca setup with MongoDB persistence as per this PR:

#215

  1. Client A Subscribe to topic with QoS 1 using paho-mqtt
  2. Client A Disconnects
  3. Client B Publishes message to the same topic with QoS 1, while Client A is offline
  4. Client A reconnects and subscribes to the same topic (Sometimes able to receive the retained message and sometimes not, the duration is within one hour)

Even facing consistency issue with MQTTfx but works fine in case of mosquitto client.

mosquitto_sub -d -c -i sample -q 1 -t test/offline

Need to debug the cause of this inconsistency.

Using the below sample paho-mqtt Python code for subscribing:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("test/offline")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(client_id="sample", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message

client.connect(server-ip, 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
@mcollina
Copy link
Collaborator

This? #215 (comment).

MongoDB persistence has a 1h TTL: https://github.com/mcollina/mosca/blob/master/lib/persistence/mongo.js#L35-L46.

Note the persistence flag.

@KohliDev
Copy link
Author

KohliDev commented Dec 12, 2016

#215 (comment),
using the same configuration.

@mcollina
Copy link
Collaborator

try changing the TTL.

@KohliDev
Copy link
Author

KohliDev commented Dec 12, 2016

Still facing the inconsistency.
I believe a particular topic is not tied to a clientID for publishing the retained messages only to it.

@mcollina
Copy link
Collaborator

Can you please provide detailed instructions to reproduce, including source code examples, client (possibly MQTT.js) and so on?

@KohliDev
Copy link
Author

KohliDev commented Dec 12, 2016

Mosca-Server.js

var mosca = require('mosca');
var mongoUrl = 'mongodb://localhost:27017/mqtt';
var ascoltatore = {
  //using ascoltatore
  type: 'mongo',
  url: mongoUrl,
  pubsubCollection: 'ascoltatori',
  mongo: {}
};

var settings = {
  port: 1883,
  persistence: {
    factory: mosca.persistence.Mongo,
    url: mongoUrl,
  },
  backend: ascoltatore
};
var server = new mosca.Server(settings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);
});

// fired when a message is received
server.on('published', function(packet, client) {
    console.log('Published',packet.payload);
    if(client!==undefined&&client!==null){
        console.log("\tclient",client.id);  
    }
});
// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running')
}

Normal Publisher:
mosquitto_pub -d -i client1 -q 1 -t test/offline1 -m "h100"

Subscriber using Paho-MQTT:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("test/offline")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(client_id="sample", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message

client.connect(localhost, 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

I start the subscriber, publish a message which is received at the subscriber then purposely bring the subscriber down. Publish an offline message with QoS=1 and start the subscriber again sometimes able to retrieve the message sometimes not. Tried changing the clientID and then re-subscribe facing the same issue.

@mcollina
Copy link
Collaborator

A couple of note in your code:

a. your publisher issue a PUBLISH on topic test/offline1, while your subscriber subscribes to test/offline.
b. the QoS of the subscriber is 0, https://pypi.python.org/pypi/paho-mqtt/1.1#subscribe-unsubscribe, so there is no offline message support.

@KohliDev
Copy link
Author

KohliDev commented Jan 27, 2017

Thanks, for pointing out the issues. The main cause was that I was stopping the client session completely (Disconnect). If the network disconnect happens and the client session is retained I`m able to get the messages but not in the case of the client session closure. This is the default working mechanism of MQTT Protocol with QoS=1, figured it out a bit late :-)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants