Skip to content

Commit

Permalink
Merge pull request #4 from mcollina/navigator
Browse files Browse the repository at this point in the history
Fluent API aka Navigator
  • Loading branch information
mcollina committed Apr 30, 2013
2 parents 07318d5 + 9064fde commit df5f70b
Show file tree
Hide file tree
Showing 11 changed files with 766 additions and 52 deletions.
135 changes: 134 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,36 @@ db.put([{
});
```

It also allows to generate a stream of triples, instead of a context:
```
db.join([{
subject: db.v("a"),
predicate: "friend",
object: db.v("x")
}, {
subject: db.v("x"),
predicate: "friend",
object: db.v("y")
}, {
subject: db.v("y"),
predicate: "friend",
object: db.v("b")
}], {
materialized: {
subject: db.v("a"),
predicate: "friend-of-a-friend",
object: db.v("b")
}
}, function(err, results) {
// this will print all the 'friend of a friend triples..'
// like so: {
// subject: "lucio",
// predicate: "friend-of-a-friend",
// object: "daniele"
// }
});
```

### Deleting

Deleting is easy too:
Expand All @@ -162,6 +192,110 @@ db.del(triple, function(err) {
});
```

## Navigator API

The Navigator API is a fluent API for LevelGraph, loosely inspired by
[Gremlin](http://markorodriguez.com/2011/06/15/graph-pattern-matching-with-gremlin-1-1/)
It allows to specify joins in a much more compact way and navigate
between vertexes in our graph.

Here is an example, using the same dataset as before:
```
db.nav("matteo").archIn("friend").archOut("friend").
contexts(function(err, results) {
// prints:
// [ { x0: 'daniele', x1: 'marco' },
// { x0: 'daniele', x1: 'matteo' },
// { x0: 'lucio', x1: 'marco' },
// { x0: 'lucio', x1: 'matteo' } ]
console.log(results);
});
```

The above example match the same triples of:
```
db.join([{
subject: db.v("x0"),
predicate: 'friend',
object: 'matteo'
}, {
subject: db.v("x0"),
predicate: 'friend',
object: db.v("x1")
}], function(err, results) {
// prints:
// [ { x0: 'daniele', x1: 'marco' },
// { x0: 'daniele', x1: 'matteo' },
// { x0: 'lucio', x1: 'marco' },
// { x0: 'lucio', x1: 'matteo' } ]
console.log(results);
});
```

It allows to see just the last reached vertex:
```
db.nav("matteo").archIn("friend").archOut("friend").
values(function(err, results) {
// prints [ 'marco', 'matteo' ]
console.log(results);
});
```

Variable names can also be specified, like so:
```
db.nav("marco").archIn("friend").as("a").archOut("friend").archOut("friend").as("a").
contexts(function(err, friends) {
console.log(friends); // will print [{ a: "daniele" }]
});
```

Variables can also be bound to a specific value, like so:
```
db.nav("matteo").archIn("friend").bind("lucio").archOut("friend").bind("marco").
values(function(err, friends) {
console.log(friends); // this will print ['marco']
});
```

A materialized join can also be produced, like so:
```
db.nav("matteo").archOut("friend").bind("lucio").archOut("friend").bind("marco").
triples({:
materialized: {
subject: db.v("a"),
predicate: "friend-of-a-friend",
object: db.v("b")
}
}, function(err, results) {
// this will return all the 'friend of a friend triples..'
// like so: {
// subject: "lucio",
// predicate: "friend-of-a-friend",
// object: "daniele"
// }
console.log(results);
});
```

It is also possible to change the current vertex:
```
db.nav("marco").archIn("friend").as("a").go("matteo").archOut("friend").as("b").
contexts(function(err, contexts) {
// contexs is: [{
// a: "daniele",
// b: "daniele"
// }, {
// a: "lucio",
// b: "daniele"
// }]
});
```

## TODO

There are plenty of things that this library is missing.
Expand All @@ -172,7 +306,6 @@ Here are some ideas:

* [ ] Return the matching triples in the JOIN results.
* [ ] Support for Query Planning in JOIN.
* [ ] Design and implement a nicer query interface using promises.
* [ ] Add more database operators.

## Contributing to LevelGraph
Expand Down
56 changes: 56 additions & 0 deletions examples/foafNavigator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

var levelup = require("levelup");
var levelgraph = require("../");
var tmp = require("tmp");

tmp.dir(function(err, dir) {
if (err) {
console.log(err);
process.exit(1);
}

db = levelgraph(levelup(dir));
db.put([{
subject: "matteo",
predicate: "friend",
object: "daniele"
}, {
subject: "daniele",
predicate: "friend",
object: "matteo"
}, {
subject: "daniele",
predicate: "friend",
object: "marco"
}, {
subject: "lucio",
predicate: "friend",
object: "matteo"
}, {
subject: "lucio",
predicate: "friend",
object: "marco"
}, {
subject: "marco",
predicate: "friend",
object: "davide"
}], function () {

db.nav("matteo").archIn("friend").archOut("friend").
values(function(err, results) {
console.log(results);
});

db.join([{
subject: db.v("x0"),
predicate: 'friend',
object: 'matteo'
}, {
subject: db.v("x0"),
predicate: 'friend',
object: db.v("x1")
}], function(err, results) {
console.log(results);
});
});
});
53 changes: 29 additions & 24 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

var KeyFilterStream = require("./lib/keyfilterstream");
var JoinStream = require("./lib/joinstream");
var MaterializerStream = require("./lib/materializerstream");
var Variable = require("./lib/variable");
var concat = require('concat-stream');
var navigator = require('./lib/navigator');
var extend = require("xtend");
var wrapCallback = require("./lib/utilities").wrapCallback;
var PassThrough = require("stream").PassThrough;

var defs = {
spo: ["subject", "predicate", "object"],
Expand All @@ -13,6 +17,10 @@ var defs = {
osp: ["object", "subject", "predicate"]
};

var joinDefaults = {
context: {}
};

module.exports = function levelgraph(leveldb) {

var db = {
Expand All @@ -26,21 +34,37 @@ module.exports = function levelgraph(leveldb) {
del: doAction('del', leveldb),
close: leveldb.close.bind(leveldb),
v: Variable,
joinStream: function(query) {
var that = this;
joinStream: function(query, options) {
var that = this, stream = null;
options = extend(joinDefaults, options);

if (!query || query.length === 0) {
stream = new PassThrough({ objectMode: true });
stream.end();
return stream;
}

var streams = query.map(function(triple) {
var stream = new JoinStream({ triple: triple, db: that });
return stream;
});

streams[0].end({});
streams[0].end(options.context);

if (options.materialized) {
streams.push(MaterializerStream({
pattern: options.materialized
}));
}

return streams.reduce(function(prev, current) {
return prev.pipe(current);
});
},
join: wrapCallback('joinStream')
join: wrapCallback('joinStream'),
nav: function(start) {
return navigator({ start: start, db: this });
}
};

return db;
Expand Down Expand Up @@ -102,22 +126,3 @@ function createQuery(pattern) {

return query;
}

function wrapCallback(method) {
return function(query, cb) {
var stream = this[method](query);
stream.pipe(reconcat(cb));
stream.on("error", cb);
};
}

function reconcat(cb) {
return concat(function(err, list) {
if(err) {
cb(err);
return;
}

cb(null, list || []);
});
}
34 changes: 34 additions & 0 deletions lib/callbackstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

var Writable = require("stream").Writable;

function CallbackStream(options) {
if (!(this instanceof CallbackStream)) {
return new CallbackStream(options);
}

options.objectMode = true;

Writable.call(this, options);

this.start = options.start;
this.results = [];
this.on("finish", function() {
options.callback(null, this.results);
});

this.once("pipe", function(source) {
source.on("error", options.callback);
});
}

CallbackStream.prototype = Object.create(
Writable.prototype,
{ constructor: { value: CallbackStream } }
);

CallbackStream.prototype._write = function(data, encoding, done) {
this.results.push(data);
done();
};

module.exports = CallbackStream;
40 changes: 40 additions & 0 deletions lib/materializerstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

var Transform = require("stream").Transform;
var Variable = require("./variable");

function MaterializerStream(options) {
if (!(this instanceof MaterializerStream)) {
return new MaterializerStream(options);
}

options.objectMode = true;

Transform.call(this, options);

this.pattern = options.pattern;
}

MaterializerStream.prototype = Object.create(
Transform.prototype,
{ constructor: { value: MaterializerStream } }
);

MaterializerStream.prototype._transform = function(data, encoding, done) {

var pattern = this.pattern;

this.push(Object.keys(pattern).
reduce(function(result, key) {
if (pattern[key] instanceof Variable) {
result[key] = data[pattern[key].name];
} else {
result[key] = pattern[key];
}

return result;
}, {}));

done();
};

module.exports = MaterializerStream;
Loading

0 comments on commit df5f70b

Please sign in to comment.