Skip to content

Commit

Permalink
#21 serialize and batch up allocate vnode call
Browse files Browse the repository at this point in the history
  • Loading branch information
Yunong Xiao committed Jan 10, 2014
1 parent b82f71e commit 95b03ff
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions lib/backend/leveldb.js
Expand Up @@ -7,6 +7,7 @@ var bignum = require('bignum');
var bunyan = require('bunyan');
var common = require('../common');
var dtrace = require('../dtrace');
var EventEmitter = require('events').EventEmitter;
var fash = require('../index');
var crypto = require('crypto');
var levelup = require('levelup');
Expand Down Expand Up @@ -187,9 +188,57 @@ function ConsistentHash(options, cb) {
* vnode % total_pnode = assigned pnode.
*/
function allocateVnodes(_, _cb) {
log.trace('allocateVnodes');
_cb = once(_cb);
_.pnodeToVnodeMap = {};
for (var vnode = 0; vnode < self.vnodeCount_; vnode++) {

/*
* #21 Batch up the vnode puts here. Becauase we are running in
* a tight for loop, _every_ put get enqueued onto the node
* work queue before they actually get procecessed. This means
* that we are allocating a huge amount of memory and not
* deallocating it until every node has been enqueued. On
* sufficiently large counts of vnodes, say 10 000 000, we
* effectively fragment the heap such that either malloc(1),
* mmap(1) or v8 will fail to grow the heap because of
* fragmentation and cause the process to fail with OOME. Hence
* we want to batch up the puts in 1000 vnode increments and
* let them finish before enqueueing more puts.
*/
var batch = _.db.batch();
// use this emitter to control the serial puts of vnodes.
var emitter = new EventEmitter();
emitter.on('enqueue', function (vnode) {
if (vnode >= self.vnodeCount_) {
//done -- then batch up any remaining operations.
batch.write(function (err) {
if (err) {
return _cb(new verror.VError('unable to ' +
'allocate some vnodes'));
}
return _cb();
});
} else if (vnode % 1000 === 1) {
batch.write(function (err) {
if (err) {
return _cb(new verror.VError('unable to ' +
'allocate some vnodes'));
}
batch = _.db.batch();

// only invoke putVnode when the batch has finished
allocateVnodeImpl(vnode, function () {
emitter.emit('enqueue', ++vnode);
});
});
} else {
allocateVnodeImpl(vnode, function () {
emitter.emit('enqueue', ++vnode);
});
}
});

var allocateVnodeImpl = function (vnode, _cb1) {
var pnode = self.pnodes_[vnode % self.pnodes_.length];
var hashspace = common.findHashspace({
vnode: vnode,
Expand All @@ -204,30 +253,17 @@ function ConsistentHash(options, cb) {
}, 'ConsistentHash.new: assigning hashspace to vnode to ' +
'pnode');

/**
/*
* assign the pnode->vnode and vnode->pnode maps
* set the data here to null since this is a new ring
*/
_.db.put(sprintf(LKEY_VNODE_V, vnode),
pnode,
function(err)
{
if (err) {
err = new verror.VError(err);
}
return _cb(err);
});
/**
batch.put(sprintf(LKEY_VNODE_V, vnode), pnode);
/*
* we put the vnode in the path, to avoid having to put all
* vnodes under 1 key
*/
var pnodePath = sprintf(LKEY_PNODE_P_V, pnode, vnode);
_.db.put(pnodePath, LVAL_NULL, function(err) {
if (err) {
err = new verror.VError(err);
}
return _cb(err);
});
batch.put(pnodePath, LVAL_NULL);
// cache the pnopdeToVnode mapping for step 4
if (!_.pnodeToVnodeMap[pnode]) {
_.pnodeToVnodeMap[pnode] = [];
Expand All @@ -238,11 +274,15 @@ function ConsistentHash(options, cb) {
vnode: vnode,
pnode: pnode
}, 'ConsistentHash.new: added vnode to pnode');
}
return _cb();

return _cb1();
};

emitter.emit('enqueue', 0);
},
// step 4
function writePnodeKeys(_, _cb) {
log.trace('writePnodeKeys');
_cb = once(_cb);
var pnodeMap = {};
for (var i = 0; i < self.pnodes_.length; i++) {
Expand Down

0 comments on commit 95b03ff

Please sign in to comment.