Skip to content

Commit

Permalink
in-memory example
Browse files Browse the repository at this point in the history
  • Loading branch information
substack committed Dec 8, 2019
1 parent a665c8e commit 7e2775b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
2 changes: 1 addition & 1 deletion example/kv.js
Expand Up @@ -87,7 +87,7 @@ function connect (f) {
download: false,
live: true
})
var q = new Query(storage, { api: { get } })
var q = new Query({ api: { get } })
function get (data) {
kv.open(data)

Expand Down
61 changes: 61 additions & 0 deletions example/mem.js
@@ -0,0 +1,61 @@
var { Readable, Transform } = require('readable-stream')
var Protocol = require('hypercore-protocol')
var Query = require('../')
var ram = require('random-access-memory')

var hypercore = require('hypercore')
var feed0 = hypercore(ram)

setInterval(function () {
var n = Math.floor(Math.random()*100)
feed0.append(String(n))
}, 50)

feed0.ready(function () {
var feed1 = hypercore(ram, feed0.key)
var r0 = feed0.replicate(false, { download: false, live: true })
var r1 = feed1.replicate(true, { sparse: true, live: true })
r0.pipe(r1).pipe(r0)
var q0 = new Query({ api: api(feed0) })
var q1 = new Query({ api: api(feed1) })
r0.registerExtension('query-example', q0.extension())
r1.registerExtension('query-example', q1.extension())
var s = q1.query('subscribe', JSON.stringify({ start: 50, end: 70 }))
s.pipe(new Transform({
objectMode: true,
transform: function (row, enc, next) {
feed1.update(row.seq, function () {
feed1.get(row.seq, function (err, buf) {
if (err) return next(err)
console.log('n=', Number(buf.toString()))
next()
})
})
}
}))
})

function api (feed) {
var subs = []
feed.on('append', function () {
var seq = feed.length
feed.get(seq, function (err, buf) {
var n = Number(buf.toString())
subs.forEach(({ start, end, stream }) => {
if (n >= start && n < end) {
stream.push({ key: feed.key, seq })
}
})
})
})
return { subscribe }
function subscribe (args) {
var { start, end } = JSON.parse(args.toString())
var stream = new Readable({
objectMode: true,
read: function () {}
})
subs.push({ start, end, stream })
return stream
}
}
31 changes: 14 additions & 17 deletions index.js
Expand Up @@ -14,10 +14,9 @@ var codes = messages.Control.ControlCode

module.exports = Query

function Query (mstore, opts) {
if (!(this instanceof Query)) return new Query(mstore, opts)
function Query (opts) {
if (!(this instanceof Query)) return new Query(opts)
if (!opts) opts = {}
this._mstore = mstore
this._api = opts.api || {}
this._queryDefs = {}
this._feedDefs = {}
Expand Down Expand Up @@ -134,24 +133,25 @@ Query.prototype._handleControl = function (m) {
}
}

Query.prototype.register = function (p, extName) {
Query.prototype.extension = function () {
var self = this
self._ext = p.registerExtension(extName, {
encoding: 'binary',
onmessage: function (msg, peer) {
self._handle(msg)
},
onerror: function (err) {
self.emit('error', err)
return function (ext) {
self._ext = ext
return {
encoding: 'binary',
onmessage: function (msg, peer) {
self._handle(msg)
},
onerror: function (err) {
self.emit('error', err)
}
}
})
return self._ext
}
}

function reader (stream) {
var queue = [], ready = true
stream.on('readable', onreadable)
stream.on('error', onerror)
return function (n, cb) {
queue.push([Math.max(n || 1, 1),cb])
if (ready) read()
Expand All @@ -171,7 +171,4 @@ function reader (stream) {
}
ready = false
}
function onerror (err) {
stream.emit('error', err)
}
}

0 comments on commit 7e2775b

Please sign in to comment.