Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplementation of offline subscriptions #31

Merged
merged 9 commits into from
Jul 10, 2017

Conversation

behrad
Copy link
Contributor

@behrad behrad commented Jul 3, 2017

  • dropped counter:offline:subscriptions key. offline subs are SCARD of subs SET
  • dropped counter:offline:clients key. offline clients are SCARD of clients SET
  • dropped sub:client, no subscriptions index of clientIds are stored any more. all subscription list api methods are implemented VIA internal Qlobber.
  • renamed client:sub: to client:
  • new clients SET added containing all client ids
  • new subs SET added containing all topics
  • all redis operations are handled by _getPipeline

Would you please review these @mcollina @GavinDmello ?

- switch all redis operations to _getPipeline
@behrad
Copy link
Contributor Author

behrad commented Jul 3, 2017

tests are failing 'cause of this issue
which this implementation recovered. It will pass if you merge that in @mcollina

persistence.js Outdated
// this.clientId = clientId
// this.topic = topic
// this.qos = qos
// }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove this if it is not needed anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure

@mcollina
Copy link
Collaborator

mcollina commented Jul 3, 2017

Do you have a benchmark result for this?

@behrad
Copy link
Contributor Author

behrad commented Jul 3, 2017

Do you have a benchmark result for this?

Doing this right now... Will post results.

@behrad
Copy link
Contributor Author

behrad commented Jul 3, 2017

Implementation offline clients subscriptions count startup time heap rss Redis Memory
Current 1384138 2879070 406500.858ms 2423.3MB 3552.9MB 2.5G
This PR 1384066 2878969 245929.751ms 2265.2MB 2798.9MB 864M

@behrad
Copy link
Contributor Author

behrad commented Jul 3, 2017

a single Aedes process is able to handle 250 connections each with 3 subscriptions per second (around 700 subs/sec) in my tests, but any thing less or near eats cpu above 90%,
for this part I should check aedes-cached-persistence and aedes

@mcollina
Copy link
Collaborator

mcollina commented Jul 3, 2017

These are impressive numbers. Definitely +1. @GavinDmello what do you think?

@GavinDmello
Copy link
Collaborator

GavinDmello commented Jul 4, 2017

@mcollina @behrad This looks good. I have some suggestions though

persistence.js Outdated
multi.exec(cb)
var pipeline = this._getPipeline()
pipeline.rpush(listKey, key)
pipeline.set(key, msgpack.encode(new Packet(packet)), cb)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply use db.set here ? These keys can be sharded in case of nutcracker. It won't be sharded this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may revert all _getPipeline usages to direct _db to support redis cluster/nutcracker

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

persistence.js Outdated

multi.exec(cb)
var pipeline = this._getPipeline()
pipeline.rpush(listKey, key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a Redis set too for the sake of dedupe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a scenario we may need deduping on packet.brokerId + ':' + packet.brokerCounter inside outgoing messages for a client @GavinDmello ?

Copy link
Collaborator

@GavinDmello GavinDmello Jul 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to be safe. Using a list was a bad idea here, I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by SMEMBERS we can't control the offline message flow to the client.
lets see what @mcollina thinks about this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the list is the correct behavior, as we to retrieve the elements in insertion order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright 👍

Copy link
Contributor Author

@behrad behrad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good news: _db is performing better than _getPipeline in _setup where we are doing heavy hgetalls.

persistence.js Outdated
multi.exec(cb)
var pipeline = this._getPipeline()
pipeline.rpush(listKey, key)
pipeline.set(key, msgpack.encode(new Packet(packet)), cb)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may revert all _getPipeline usages to direct _db to support redis cluster/nutcracker

@behrad
Copy link
Contributor Author

behrad commented Jul 6, 2017

implemented createRetainedStreamCombi here & upgraded to aedes-cached-persistence 4.0.0 @mcollina

@behrad
Copy link
Contributor Author

behrad commented Jul 7, 2017

@mcollina ping

Copy link
Collaborator

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove completely the pipeline support? It is mostly gone now, so we can completely remove it then.

persistence.js Outdated
count++
that._waitFor(client, sub.topic, finish)
// TODO don't wait the client an extra tick
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite mosca, aedes is publishing sub topics, and even the local process is waiting for the subscribed handler (inside cached-persistence). My points:

  1. to call the callback sooner, we may go mosca way and check broker.id inside message handler. Or we may even call the callback without this check and let the subscription take effect in background after client subacked !?

  2. for subscription publication between nodes, cached-persistence is using broker's pub/sub. Could this be handled any lower level, not to pollute MQTT pub/sub with inter-node messaging!?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this is needed to provide a stable unit testing base. Mosca had so many spurious unit tests, and some of them relies on setTimeout to work. One of the underlining goal of aedes is to make maintenance and evolution simpler, and this goes into that direction.

  2. no

I'm ok if you would like to try and remove that (or maybe disable it with an option?), but let's drop the TODO for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed TODO 👍

persistence.js Outdated

patterns.map(function (pattern) {
qlobber.add(pattern, true)
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should use a for loop here, it's way faster is this is going to be in a hot path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

function RedisPersistence (opts) {
if (!(this instanceof RedisPersistence)) {
return new RedisPersistence(opts)
}

opts = opts || {}
this.maxSessionDelivery = opts.maxSessionDelivery || 1000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document this option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

persistence.js Outdated
}

this._db.srem(subsKey, subs) // TODO matcher.match should be checked

this._db.hdel(clientSubKey, subs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should specify callbacks for this, and deal with potential errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check this now @mcollina ?

persistence.js Outdated
if (err) {
splitStream.emit('error', err)
} else {
splitStream.write(results)
splitStream.write(clientIds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can completely avoid the use of splitStream here, just do the processing and call write on the throughv instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but I would like @GavinDmello review as well.

@behrad
Copy link
Contributor Author

behrad commented Jul 8, 2017

@GavinDmello ping

var pipeline = this._getPipeline()
pipeline.lrem(willKey, 0, key)
pipeline.getBuffer(key, function getClientWill (err, packet) {
var that = this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not required here.

Copy link
Contributor Author

@behrad behrad Jul 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not? that is used in line 479

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this ok @GavinDmello ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we can just use this as its in the same scope :)

Copy link
Contributor Author

@behrad behrad Jul 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using that in 479 inside a callback, Am I missing something?

Copy link
Collaborator

@GavinDmello GavinDmello Jul 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the getBuffer callback? This is weird, it shows me it's in the same scope. i.e 479 is outside the getClientWill function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, del should have been inside the getBuffer after multi being removed, fixed it just now @GavinDmello

@GavinDmello
Copy link
Collaborator

Minor change I mentioned above. I'm 👍 with this PR .

@behrad
Copy link
Contributor Author

behrad commented Jul 9, 2017

I'm not sure why node v5 is failed!

@GavinDmello
Copy link
Collaborator

Let me try running it again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants