Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Getting duplicate messages using Mongodb as mosca backend. #131

Open
cfan8 opened this issue Sep 8, 2015 · 9 comments
Open

Getting duplicate messages using Mongodb as mosca backend. #131

cfan8 opened this issue Sep 8, 2015 · 9 comments

Comments

@cfan8
Copy link
Contributor

cfan8 commented Sep 8, 2015

Hi @mcollina , I'm using ascolatatori as part of mosca. Recently I found that as long as I send three messages to a channel which has a client subscribed to very very quickly, the client will eventually receive four messages. I lookup into the sources and found the root cause may be in the ascoltatori.

How to reproduce

Have a client connected to a mosca server, subscribe to a channel, and send three consecutive messages very very quickly. Then at the client side, you will see three messages arrived, and yet another one after some time (this time gap equals the wait param you set for mongo backend ascoltatori).

This has been verified on mongodb 3.0.6 and 2.4.10

I tried to reproduce the problem in the unit test but I cannot always reproduce it in the unit test. I'm not quite sure why. Sometimes it can pass and sometimes it cannot. Here is my test code

it("should not duplicate messages", function(done) {
    this.timeout(5000);

    var that = this;
    var called = 0;
    that.instance.sub("hello", function(topic, value) {
      called++;
      expect(called).to.be.lessThan(7);
      expect(value).to.eql(new Buffer("42"));
    }, function() {
      setTimeout(function(){
        that.instance.pub("hello", new Buffer("42"));
        that.instance.pub("hello", new Buffer("42"));
        setTimeout(function(){
          that.instance.pub("hello", new Buffer("42"));
          that.instance.pub("hello", new Buffer("42"));
          setTimeout(function(){
            that.instance.pub("hello", new Buffer("42"));
            that.instance.pub("hello", new Buffer("42"));
          },100);
        },100);
      },100);
      setTimeout(done, 3000);
    });
  });

to replace the original code in https://github.com/mcollina/ascoltatori/blob/master/test/mongo_ascoltatore_spec.js#L110 from line 110 to line 124.

Root cause

I've looked into the sources. Each message sent to a channel will create two entries in the capped collection that the mongo ascoltatori uses. In other words, the _id of entries is supposed to increase all the time. But if you send messages through mosca, and mosca calls mongodb backend, the order of fifth and sixth document is not preserved. I added a line to print out the value of latest here: https://github.com/mcollina/ascoltatori/blob/master/lib/mongo_ascoltatore.js#L295. And this is what I got.
screenshot from 2015-09-08 19 42 02
You see there is a gap between the ids since I'm using Mongo as backend of mosca persistence. Each qos1 message will consume an id. But the result is same without qos1.

I think the reason might be either:

  1. This each() does not guarantee sequential read, or even though it guarantees, the actual code is executed in a callback, which may suggest that the order may not be preserved.
  2. Something is wrong inside the mongodb.

Temporary fix

I suggest to change https://github.com/mcollina/ascoltatori/blob/master/lib/mongo_ascoltatore.js#L295 to following to temporarily get rid of this problem.
From:

latest = doc._id;

To:

latest = doc._id > latest? doc._id: latest;
@mcollina
Copy link
Collaborator

mcollina commented Sep 8, 2015

I'm +1 with the fix. That each() is supposed to be sequential, so something
else might be wrong. Would you mind sending a PR with a test and a fix?
Il giorno mar 8 set 2015 alle 12:53 cfan8 notifications@github.com ha
scritto:

Hi @mcollina https://github.com/mcollina , I'm using ascolatatori as
part of mosca. Recently I found that as long as I send three messages to a
channel which has a client subscribed to very very quickly, the client will
eventually receive four messages. I lookup into the sources and found the
root cause may be in the ascoltatori.
How to reproduce

Have a client connected to a mosca server, subscribe to a channel, and
send three consecutive messages very very quickly. Then at the client side,
you will see three messages arrived, and yet another one after some time
(this time gap equals the wait param you set for mongo backend
ascoltatori.

This has been verified on mongodb 3.0.6 and 2.4.10
Root cause

I've looked into the sources. Each message send to a channel will create
two entries in the capped collection that the mongo ascoltatori uses. In
other words, the _id of entries is supposed to increase all the time. But
if you send messages through mosca, and mosca calls mongodb backend, the
order of fifth and sixth document is not preserved. I added a line to print
out the value of latest here:
https://github.com/mcollina/ascoltatori/blob/master/lib/mongo_ascoltatore.js#L295.
And this is what I got.
[image: screenshot from 2015-09-08 19 42 02]
https://cloud.githubusercontent.com/assets/5689300/9733933/e53684b6-5661-11e5-9292-edb5da93e4fb.png

  • You see there is a gap between the ids since I'm using Mongo as
    backend of mosca persistence. Each qos1 message will consume an id.*

I think the reason might be either:

  1. This each() does not guarantee sequential read, or even though it
    guarantees, the actual code is executed in a callback, which may suggest
    that the order may not be preserved.
  2. Something is wrong inside the mongodb.

I tried to reproduce the problem in the unit test but I found that the
order of read sequence is always correct in the unit test. I'm not quite
sure why.
Temporary fix

I suggest to change
https://github.com/mcollina/ascoltatori/blob/master/lib/mongo_ascoltatore.js#L295
to following to temporarily get rid of this problem.
From:

latest = doc._id;

To:

latest = doc._id > latest? doc._id: latest;


Reply to this email directly or view it on GitHub
#131.

@cfan8
Copy link
Contributor Author

cfan8 commented Sep 9, 2015

Hi, @mcollina. I've create a PR. Since we are using mosca in a production environment, could you please push a new release to the npm once it is merged?

I did some experiment to locate the reason of this problem. The function executed by each() is fine. I thought the cause may be inside the mongodb driver, or even the mongodb it self, until I saw this on mongodb's document http://docs.mongodb.org/master/tutorial/create-tailable-cursor/ .

If your query is on an indexed field, do not use tailable cursors, but instead, use a regular cursor. Keep track of the last value of the indexed field returned by the query. To retrieve the newly added documents, query the collection again using the last value of the indexed field in the query criteria, as in the following example:

db..find( { indexedField: { $gt: } } )

Actually by setting tailable of this cursor to false on https://github.com/mcollina/ascoltatori/blob/master/lib/mongo_ascoltatore.js#L260 did solve this problem. However, I do not dare to use this in production as I'm not fully into ascoltatori and not certain how many changes will this modification bring.

mcollina added a commit that referenced this issue Sep 14, 2015
Fix a problem related to Issues #131 Getting duplicate messages using Mongodb as mosca backend.
@mcollina
Copy link
Collaborator

Setting tailable: false will cause a massive performance drop.

A completely different solution might be to replace { _id: { $gt: latest } } with a little bit of code inside Ascoltatori, however this will cause a slowdown in startup, as the tailable cursor has to process the whole capped collection before being useful.

This is relevant also: #90.

I've merged your fix and I will release soon! However, we should look for a more stable fix. I think we should just remove the checks on _id, and place a unix timestamp in there. Do you see any other solutions?

@cfan8
Copy link
Contributor Author

cfan8 commented Sep 14, 2015

Thank you very much for the fix .

I'm wondering if it is important to guarantee the read order. If this is an important feature of ascoltatori, I agree with your fix, as building index on a field will break the read order guarantee. However, if ascoltatori does not promise read order, then maybe indexing timestamp and max(latest, document.timestamp) is a better option. The ObjectID may cause some confusion in a multi-server environment.

@mcollina
Copy link
Collaborator

Read order is not really important. The problem with indexing is speed: objects are inserted and removed from the capped collection at all times. Insertion speed is an issue I would like to avoid.

The order is already guaranteed by the fact of using a capped collection + tailable cursor.

@cfan8
Copy link
Contributor Author

cfan8 commented Sep 14, 2015

In that case, use timestamp without index is the only practical way. Initialization time is acceptable, given that the capped collection is not actually that large.

@mcollina
Copy link
Collaborator

👍. Would you mind sending a PR through? :) Thanks so much.

@cfan8
Copy link
Contributor Author

cfan8 commented Sep 15, 2015

I can have a try.

@mcollina
Copy link
Collaborator

Is this still a problem? Would you mind sending a PR?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants