From 144ef94fddad0c1adf3dfb47f4d5948489b6c5dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Kr=C3=BCger?= Date: Fri, 15 Jun 2018 10:10:51 +0200 Subject: [PATCH] feat: finish implementation --- src/index.js | 5 +--- src/server/rpc.js | 72 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/index.js b/src/index.js index 8ed0832..a8c8c76 100644 --- a/src/index.js +++ b/src/index.js @@ -97,10 +97,7 @@ class Exchange extends ExchangeBase { return cb(err) } - this._rpc('request', peerId, ns, data, (err, res) => { - console.log('req', err, res) - }) - // TODO: do .request(peerId, ns, data, cb) rpc call + this._rpc('request', peerId, ns, data, cb) }) } } diff --git a/src/server/rpc.js b/src/server/rpc.js index 563e8f5..22b52e1 100644 --- a/src/server/rpc.js +++ b/src/server/rpc.js @@ -5,8 +5,14 @@ const log = debug('libp2p:exchange:rendezvous:server:rpc') const Pushable = require('pull-pushable') const pull = require('pull-stream') -const {Type, ErrorType, ETABLE} = require('../proto.js') +const {Type, ErrorType} = require('../proto.js') const Id = require('peer-id') +const once = require('once') +const wrap = (cb) => { + cb = once(cb) + setTimeout(() => cb(new Error(ErrorType.E_TIMEOUT)), 10 * 1000) + return cb +} module.exports = (pi, server) => { let online = true @@ -64,12 +70,54 @@ module.exports = (pi, server) => { } case Type.REQUEST: { // we get a request (data.data, data.signature) to forward to data.remote (b58string) and then get a response to forward back - // TODO: add + + let cb = (err, res) => { + let out = { + type: Type.RESPONSE, + id: data.id + } + + if (err) { + out.error = err + } else { + Object.assign(out, res) + } + + source.push(out) + } + + let remote = this.rpc[String(data.remote)] + + if (!remote) { + return cb(ErrorType.E_NOT_FOUND) + } + + remote.forwardRequest(this.ids[pi.id.toB58String()], data.data, data.signature, (err, res) => { + if (err) { + return cb(err instanceof Error ? ErrorType.E_OTHER : err) + } + + return cb(null, { + data: data.data, + signature: data.signature + }) + }) break } case Type.RESPONSE: { - // TODO: add + let cb = cbs[data.id] + if (cb) { + delete cbs[data.id] + + log('got req->res response %s', data.id) + + if (data.error) { + return cb(data.error) + } + + return cb(null, data) + } break } @@ -87,8 +135,22 @@ module.exports = (pi, server) => { sink, methods: { online: () => online, - requestForward: () => { - // TODO: add + requestForward: (remote, data, signature, cb) => { + if (!online) { + return cb(new Error('Not online!')) + } + + let rid = id++ * 2 + 1 + + cbs[rid] = wrap(cb) + + source.push({ + type: Type.REQUEST, + id: rid, + data, + signature, + remote + }) } } }