diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml new file mode 100644 index 0000000..7200245 --- /dev/null +++ b/.github/workflows/node.js.yml @@ -0,0 +1,28 @@ +# This workflow will do a clean install of node dependencies, build the source code and run tests across different versions of node +# For more information see: https://help.github.com/actions/language-and-framework-guides/using-nodejs-with-github-actions + +name: CI + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + test: + runs-on: ubuntu-latest + + strategy: + matrix: + node-version: [10.x, 12.x, 14.x] + + steps: + - uses: actions/checkout@v2 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - run: npm install + - name: npm test + run: npm test \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 5bca135..0000000 --- a/.travis.yml +++ /dev/null @@ -1,12 +0,0 @@ -# Use Node.js configuration. -language: node_js - -node_js: - - 10 - - 12 - - node # latest - -os: - - linux - - osx - - windows diff --git a/contacts.js b/contacts.js index e057e25..51f210c 100644 --- a/contacts.js +++ b/contacts.js @@ -5,8 +5,6 @@ const isFeed = require('ssb-ref').isFeed module.exports = function (sbot, createLayer, config) { const layer = createLayer('contacts') let initial = false - const hops = {} - hops[sbot.id] = 0 const INDEX_VERSION = 10 const index = sbot._flumeUse('contacts2', Reduce(INDEX_VERSION, function (g, data) { diff --git a/db2-contacts.js b/db2-contacts.js new file mode 100644 index 0000000..d123c8a --- /dev/null +++ b/db2-contacts.js @@ -0,0 +1,195 @@ +const bipf = require('bipf') +const pull = require('pull-stream') +const pl = require('pull-level') +const jsonCodec = require('flumecodec/json') +const Plugin = require('ssb-db2/indexes/plugin') + +const isFeed = require('ssb-ref').isFeed + +module.exports = function (sbot, createLayer, config) { + const layer = createLayer('contacts') + + // used for dictionary compression where a feed is mapped to its index + let feeds = [] + + // a map of feed -> { feed: followStatus } + const feedValues = {} + // assuming we have feed A (index 0) and B, and A follows B we will in feedValues store: + // { 0: { 1: 1 } } meaning the map of values for feed A (0) is: index 1 (B) has value 1 (follow) + // + // feeds will be: [A,B] in this example + + const bValue = Buffer.from('value') + const bAuthor = Buffer.from('author') + const bContent = Buffer.from('content') + const bType = Buffer.from('type') + const bContact = Buffer.from('contact') + + const name = 'contacts' + const { level, offset, stateLoaded, onData, writeBatch } = Plugin( + config.path, + name, + 2, + handleData, + writeData, + beforeIndexUpdate + ) + + let batch = [] + // it turns out that if you place the same key in a batch multiple + // times. Level will happily write that key as many times as you give + // it, instead of just writing the last value for the key, so we have + // to help the poor bugger + let batchKeys = {} // key to index + + function writeData(cb) { + level.batch(batch, { valueEncoding: 'json' }, cb) + batch = [] + batchKeys = {} + } + + function handleData(record, processed) { + if (record.offset < offset.value) return batch.length + const recBuffer = record.value + if (!recBuffer) return batch.length // deleted + + let p = 0 // note you pass in p! + p = bipf.seekKey(recBuffer, p, bValue) + if (!~p) return batch.length + + const pAuthor = bipf.seekKey(recBuffer, p, bAuthor) + const author = bipf.decode(recBuffer, pAuthor) + + const pContent = bipf.seekKey(recBuffer, p, bContent) + if (!~pContent) return batch.length + + const pType = bipf.seekKey(recBuffer, pContent, bType) + if (!~pType) return batch.length + + if (bipf.compareString(recBuffer, pType, bContact) === 0) { + const content = bipf.decode(recBuffer, pContent) + const to = content.contact + + if (isFeed(author) && isFeed(to)) { + const value = content.blocking || content.flagged ? -1 : + content.following === true ? 1 + : -2 + + let updateFeeds = false + + let fromIndex = feeds.indexOf(author) + if (fromIndex === -1) { + feeds.push(author) + fromIndex = feeds.length -1 + updateFeeds = true + } + + let toIndex = feeds.indexOf(to) + if (toIndex === -1) { + feeds.push(to) + toIndex = feeds.length -1 + updateFeeds = true + } + + let fromValues = feedValues[fromIndex] || {} + fromValues[toIndex] = value + feedValues[fromIndex] = fromValues + + const batchValue = { + type: 'put', + key: fromIndex, + value: fromValues + } + + let existingKeyIndex = batchKeys[fromIndex] + if (existingKeyIndex) { + batch[existingKeyIndex] = batchValue + } + else { + batch.push(batchValue) + batchKeys[fromIndex] = batch.length - 1 + } + + if (updateFeeds) { + const feedsValue = { + type: 'put', + key: 'feeds', + value: feeds + } + + let existingFeedsIndex = batchKeys['feeds'] + if (existingFeedsIndex) { + batch[existingFeedsIndex] = feedsValue + } else { + batch.push(feedsValue) + batchKeys['feeds'] = batch.length - 1 + } + } + + layer(author, to, value) + } + } + + return batch.length + } + + layer({}) + + function beforeIndexUpdate(cb) { + get((err, g) => { + layer(g) + cb() + }) + } + + function get(cb) { + pull( + pl.read(level, { + valueEncoding: jsonCodec, + keys: true + }), + pull.collect((err, data) => { + if (err) return cb(err) + + for (let i = 0; i < data.length; ++i) { + if (data[i].key === 'feeds') { + feeds = data[i].value + break + } + } + + let result = {} + for (let i = 0; i < data.length; ++i) + { + const relation = data[i] + + if (relation.key !== '\x00' && relation.key !== 'feeds') { + const feed = feeds[parseInt(relation.key, 10)] + const feedFollowStatus = result[feed] || {} + let valueKeys = Object.keys(relation.value) + for (var v = 0; v < valueKeys.length; ++v) { + const to = feeds[valueKeys[v]] + feedFollowStatus[to] = parseInt(relation.value[valueKeys[v]], 10) + } + result[feed] = feedFollowStatus + } + } + + cb(null, result) + }) + ) + } + + sbot.db.registerIndex(() => { + return { + offset, + stateLoaded, + onData, + writeBatch, + name, + + remove: level.clear, + close: level.close.bind(level) + } + }) +} diff --git a/glue/auth.js b/glue/auth.js new file mode 100644 index 0000000..b54d67c --- /dev/null +++ b/glue/auth.js @@ -0,0 +1,12 @@ +module.exports = function (sbot, isBlocking) { + // opinion: do not authorize peers blocked by this node. + sbot.auth.hook(function (fn, args) { + const self = this + isBlocking({ source: sbot.id, dest: args[0] }, (err, blocked) => { + if (err) console.error(err) + + if (blocked) args[1](new Error('client is blocked')) + else fn.apply(self, args) + }) + }) +} diff --git a/glue/replicate.js b/glue/replicate.js new file mode 100644 index 0000000..31379a1 --- /dev/null +++ b/glue/replicate.js @@ -0,0 +1,45 @@ +const isFeed = require('ssb-ref').isFeed + +module.exports = function (sbot, layered) { + // check for ssb-replicate or similar, but with a delay so other plugins have time to be loaded + setImmediate(function () { + if (!sbot.replicate) { + throw new Error('ssb-friends expects a replicate plugin to be available') + } + + // opinion: replicate with everyone within max hops (max passed to layered above ^) + pull( + layered.hopStream({ live: true, old: true }), + pull.drain(function (data) { + if (data.sync) return + for (const k in data) { + sbot.replicate.request(k, data[k] >= 0) + } + }) + ) + + // opinion: pass the blocks to replicate.block + const block = (sbot.replicate && sbot.replicate.block) || (sbot.ebt && sbot.ebt.block) + if (block) { + function handleBlockUnlock (from, to, value) { + if (value === false) block(from, to, true) + else block(from, to, false) + } + pull( + legacy.stream({ live: true }), + pull.drain(function (contacts) { + if (!contacts) return + + if (isFeed(contacts.from) && isFeed(contacts.to)) { // live data + handleBlockUnlock(contacts.from, contacts.to, contacts.value) + } else { // initial data + for (const from in contacts) { + const relations = contacts[from] + for (const to in relations) { handleBlockUnlock(from, to, relations[to]) } + } + } + }) + ) + } + }) +} diff --git a/index.js b/index.js index 3a20582..c574447 100644 --- a/index.js +++ b/index.js @@ -3,8 +3,14 @@ const LayeredGraph = require('layered-graph') const pull = require('pull-stream') const isFeed = require('ssb-ref').isFeed const contacts = require('./contacts') +const db2Contacts = require('./db2-contacts') const _legacy = require('./legacy') const help = require('./help') + +// glue +const authGlue = require('./glue/auth') +const replicateEBTGlue = require('./glue/replicate') + // friends plugin // methods to analyze the social graph // maintains a 'follow' and 'flag' graph @@ -44,62 +50,19 @@ exports.init = function (sbot, config) { }) } - // opinion: do not authorize peers blocked by this node. - sbot.auth.hook(function (fn, args) { - const self = this - isBlocking({ source: sbot.id, dest: args[0] }, (err, blocked) => { - if (err) console.error(err) - - if (blocked) args[1](new Error('client is blocked')) - else fn.apply(self, args) - }) - }) - - contacts(sbot, layered.createLayer, config) + if (sbot.db) + db2Contacts(sbot, layered.createLayer, config) + else + contacts(sbot, layered.createLayer, config) const legacy = _legacy(layered) - // check for ssb-replicate or similar, but with a delay so other plugins have time to be loaded - setImmediate(function () { - if (!sbot.replicate) { - throw new Error('ssb-friends expects a replicate plugin to be available') - } - - // opinion: replicate with everyone within max hops (max passed to layered above ^) - pull( - layered.hopStream({ live: true, old: true }), - pull.drain(function (data) { - if (data.sync) return - for (const k in data) { - sbot.replicate.request(k, data[k] >= 0) - } - }) - ) - - // opinion: pass the blocks to replicate.block - const block = (sbot.replicate && sbot.replicate.block) || (sbot.ebt && sbot.ebt.block) - if (block) { - function handleBlockUnlock (from, to, value) { - if (value === false) block(from, to, true) - else block(from, to, false) - } - pull( - legacy.stream({ live: true }), - pull.drain(function (contacts) { - if (!contacts) return - - if (isFeed(contacts.from) && isFeed(contacts.to)) { // live data - handleBlockUnlock(contacts.from, contacts.to, contacts.value) - } else { // initial data - for (const from in contacts) { - const relations = contacts[from] - for (const to in relations) { handleBlockUnlock(from, to, relations[to]) } - } - } - }) - ) - } - }) + // glue modules together + if (config.friends && config.friends.hookAuth !== false) + authGlue(sbot, isBlocking) + + if (config.friends && config.friends.hookReplicate !== false) + replicateEBTGlue(sbot, layered) return { hopStream: layered.hopStream, @@ -130,7 +93,6 @@ exports.init = function (sbot, config) { sbot.publish(content, cb) }, isFollowing: isFollowing, - // block, isBlocking: isBlocking, // expose createLayer, so that other plugins may express relationships @@ -138,12 +100,16 @@ exports.init = function (sbot, config) { // legacy, debugging hops (opts, cb) { + if (typeof opts === 'function') { + cb = opts + opts = {} + } + layered.onReady(function () { - if (typeof opts === 'function') { - cb = opts - opts = {} - } - cb(null, layered.getHops(opts)) + if (sbot.db) + sbot.db.onDrain('contacts', () => cb(null, layered.getHops(opts))) + else + cb(null, layered.getHops(opts)) }) }, help: () => help, diff --git a/package.json b/package.json index 9fd63df..55f7c28 100644 --- a/package.json +++ b/package.json @@ -12,17 +12,22 @@ "url": "git://github.com/ssbc/ssb-friends.git" }, "dependencies": { + "bipf": "^1.4.0", + "flumecodec": "0.0.1", "flumeview-reduce": "^1.3.17", "layered-graph": "^1.1.1", "pull-flatmap": "0.0.1", + "pull-level": "^2.0.4", "pull-notify": "^0.1.1", "pull-stream": "^3.6.0", + "ssb-db2": "^1.9.1", "ssb-ref": "^2.7.1" }, "devDependencies": { "run-series": "^1.1.8", "scuttle-testbot": "^1.3.0", "secret-stack": "^6.1.2", + "ssb-caps": "^1.1.0", "ssb-generate": "^1.0.1", "ssb-replicate": "^1.1.0", "ssb-tribes": "^0.4.1", diff --git a/test/db2.js b/test/db2.js new file mode 100644 index 0000000..37a34d2 --- /dev/null +++ b/test/db2.js @@ -0,0 +1,115 @@ +const ssbKeys = require('ssb-keys') +const cont = require('cont') +const tape = require('tape') +const u = require('./util') +const pull = require('pull-stream') +const validate = require('ssb-validate') +const rimraf = require('rimraf') +const mkdirp = require('mkdirp') + +const os = require('os') +const path = require('path') + +const SecretStack = require('secret-stack') +const caps = require('ssb-caps') + +function liveFriends (ssbServer) { + const live = {} + pull( + ssbServer.friends.createFriendStream({ live: true, meta: true }), + pull.drain(function (friend) { + if (friend.sync) return + live[friend.id] = friend.hops + }) + ) + return live +} + +const dir = path.join(os.tmpdir(), "friends-db2") + +rimraf.sync(dir) +mkdirp.sync(dir) + +function Server(opts = {}) { + const stack = SecretStack({ caps }) + .use(require('ssb-db2')) + .use(require('..')) + + return stack(opts) +} + +let state = validate.initial() + +function addMsg(db, keys, content) { + state = validate.appendNew( + state, + null, + keys, + content, + Date.now() + ) + + return (cb) => { + value = state.queue.shift().value + db.add(value, cb) + } +} + +tape('db2 friends test', function (t) { + const alice = ssbKeys.generate() + const bob = ssbKeys.generate() + const carol = ssbKeys.generate() + + let sbot = Server({ + keys: alice, + db2: true, + path: dir + }) + let live = liveFriends(sbot) + + cont.para([ + addMsg(sbot.db, alice, { + type: 'contact', + contact: bob.id, + following: true + }), + addMsg(sbot.db, alice, u.follow(carol.id)), + addMsg(sbot.db, alice, u.follow(alice.id)), + addMsg(sbot.db, bob, u.follow(alice.id)), + addMsg(sbot.db, bob, { + type: 'contact', + contact: carol.id, + following: false, + flagged: true + }), + addMsg(sbot.db, carol, u.follow(alice.id)) + ])(function (err, results) { + sbot.friends.hops(function (err, hops) { + if (err) throw err + t.deepEqual(live, hops) + + sbot.close(() => { + sbot = Server({ + keys: alice, + db2: true, + path: dir + }) + live = liveFriends(sbot) + + addMsg(sbot.db, bob, { + type: 'contact', + contact: carol.id, + following: true + })((err) => { + if (err) throw err + + sbot.db.onDrain('contacts', () => { + t.deepEqual(live, hops) + + sbot.close(t.end) + }) + }) + }) + }) + }) +}) diff --git a/test/util.js b/test/util.js index 5f2a221..cd72035 100644 --- a/test/util.js +++ b/test/util.js @@ -6,9 +6,8 @@ exports.Server = function Testbot (opts = {}) { .use(require('ssb-replicate')) .use(require('..')) - if (opts.tribes === true) { + if (opts.tribes === true) stack = stack.use(require('ssb-tribes')) - } return stack(opts) }