Skip to content

Commit

Permalink
implements exchangeable filesystem library
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Landgraf committed Nov 17, 2016
1 parent 34d65c7 commit 6f2bfa6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ maildir conventions.
var Queue = require('file-queue').Queue,
queue = new Queue('.', callback);

### Dealing with many files and common filesystem errors

If you deal with lots of files EMFILE errors (too many open files errors)
can occur. Issacs wrote the `graceful-fs` package to deal with these errors.
To use it simply pass the filesystem library that you prefer:

var queue = new Queue({
path: 'tmp',
fs: require('graceful-fs')
}, done);

## Pushing and popping messages from the queue

Popping a message can be done at any time. If the queue doesn't contain an item at the
Expand Down
31 changes: 16 additions & 15 deletions lib/maildir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var util = require('util'),
events = require('events'),
fs = require('fs'),
os = require('os'),
crypto = require('crypto'),
async = require('async'),
Expand All @@ -16,14 +15,15 @@ function Maildir(root) {
this.dirPaths = [path.resolve(path.join(root, 'tmp')),
path.resolve(path.join(root, 'new')),
path.resolve(path.join(root, 'cur'))];
this.fs = require('fs');
this.pushed = 0;
}

util.inherits(Maildir, events.EventEmitter);

// Finds the length of the queue (list of files in new)
Maildir.prototype.length = function(callback) {
fs.readdir(this.dirPaths[NEW], function(err, files) {
this.fs.readdir(this.dirPaths[NEW], function(err, files) {
if (err) { callback(err); }
else { callback(null, files.length); }
});
Expand All @@ -49,13 +49,13 @@ Maildir.prototype.generateUniqueName = function(callback) {
Maildir.prototype.create = function(persistent, cb) {
var that = this;
async.each(this.dirPaths, function(path, callback) {
fs.exists(path, function(exists) {
that.fs.exists(path, function(exists) {
if (exists) { callback(); }
else { fs.mkdir(path, callback); }
else { that.fs.mkdir(path, callback); }
});
}, function() {
if (persistent) {
that.watcher = fs.watch(that.dirPaths[NEW], {}, function(err, messages) {
that.watcher = that.fs.watch(that.dirPaths[NEW], {}, function(err, messages) {
that.emit('new', [messages]);
});
}
Expand All @@ -78,23 +78,23 @@ Maildir.prototype.newFile = function(data, callback) {
else {
var tmpPath = path.join(that.dirPaths[TMP], uniqueName),
newPath = path.join(that.dirPaths[NEW], uniqueName);
fs.writeFile(tmpPath, data, function(err) {
that.fs.writeFile(tmpPath, data, function(err) {
if (err) { callback(err); }
else { fs.rename(tmpPath, newPath, callback); }
else { that.fs.rename(tmpPath, newPath, callback); }
});
}
});
};

// Lists all messages in the new folder
Maildir.prototype.listNew = function(callback) {
fs.readdir(this.dirPaths[NEW], callback);
this.fs.readdir(this.dirPaths[NEW], callback);
};

// Clears all messages from all folders
Maildir.prototype.clear = function(callback) {
var that = this;
async.map(this.dirPaths, fs.readdir, function(err, results) {
async.map(this.dirPaths, that.fs.readdir, function(err, results) {
if (err) { callback(err); }
else {
var unlinks = [], i, fn, len = that.dirPaths.length,
Expand All @@ -107,29 +107,30 @@ Maildir.prototype.clear = function(callback) {
fn = pushDir(that.dirPaths[i]);
results[i].forEach(fn);
}
async.each(unlinks, fs.unlink, callback);
async.each(unlinks, that.fs.unlink, callback);
}
});
};

// Processes one message from the queue (if possible)
Maildir.prototype.process = function(message, callback) {
var newPath = path.join(this.dirPaths[NEW], message),
curPath = path.join(this.dirPaths[CUR], message);
curPath = path.join(this.dirPaths[CUR], message),
that = this;

fs.rename(newPath, curPath, function(err) {
this.fs.rename(newPath, curPath, function(err) {
// if message could not be moved, another process probably already works
// on it, so we try to pop again, but we try further on the list
if (err) { callback(err); }
else {
fs.readFile(curPath, function(err, data) {
that.fs.readFile(curPath, function(err, data) {
if (err) { callback(err); }
else {
callback(null, data,
// commit function
function(cb) { fs.unlink(curPath, cb); },
function(cb) { that.fs.unlink(curPath, cb); },
// rollback function
function(cb) { fs.rename(curPath, newPath, cb); }
function(cb) { that.fs.rename(curPath, newPath, cb); }
);
}
});
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "file-queue",
"version": "0.2.1",
"version": "0.3.0",
"description": "A file system based queue (implemented using maildir)",
"main": "queue.js",
"scripts": {
Expand All @@ -13,7 +13,8 @@
"devDependencies": {
"mocha": "1.13.0",
"jshint": "*",
"istanbul": "0.2.6"
"istanbul": "0.2.6",
"graceful-fs": "*"
},
"keywords": [
"queue",
Expand Down
7 changes: 7 additions & 0 deletions queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ function Queue(options, cb) {
this.maildir = new Maildir(path);
this.laterPop = [];

// determine if different fs access library is used
if (typeof options.fs !== 'undefined') {
this.maildir.fs = options.fs;
} else {
this.maildir.fs = require('fs');
}

// be notified, when new messages are available
this.maildir.on('new', function(messages) {
var callback = that.laterPop.shift();
Expand Down
9 changes: 8 additions & 1 deletion test/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ describe('Queue', function() {
var queue;

beforeEach(function(done) {
queue = new Queue('tmp', done);
if (Math.random() > 0.5) {
queue = new Queue('tmp', done);
} else {
queue = new Queue({
path: 'tmp',
fs: require('graceful-fs')
}, done);
}
});

afterEach(function(done) {
Expand Down

0 comments on commit 6f2bfa6

Please sign in to comment.