Skip to content

Commit

Permalink
feat: finish implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 committed Jun 15, 2018
1 parent 776de25 commit 144ef94
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
5 changes: 1 addition & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ class Exchange extends ExchangeBase {
return cb(err)
}

this._rpc('request', peerId, ns, data, (err, res) => {
console.log('req', err, res)
})
// TODO: do .request(peerId, ns, data, cb) rpc call
this._rpc('request', peerId, ns, data, cb)
})
}
}
Expand Down
72 changes: 67 additions & 5 deletions src/server/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ const log = debug('libp2p:exchange:rendezvous:server:rpc')

const Pushable = require('pull-pushable')
const pull = require('pull-stream')
const {Type, ErrorType, ETABLE} = require('../proto.js')
const {Type, ErrorType} = require('../proto.js')
const Id = require('peer-id')
const once = require('once')
const wrap = (cb) => {
cb = once(cb)
setTimeout(() => cb(new Error(ErrorType.E_TIMEOUT)), 10 * 1000)
return cb
}

module.exports = (pi, server) => {
let online = true
Expand Down Expand Up @@ -64,12 +70,54 @@ module.exports = (pi, server) => {
}
case Type.REQUEST: {
// we get a request (data.data, data.signature) to forward to data.remote (b58string) and then get a response to forward back
// TODO: add

let cb = (err, res) => {
let out = {
type: Type.RESPONSE,
id: data.id
}

if (err) {
out.error = err
} else {
Object.assign(out, res)
}

source.push(out)
}

let remote = this.rpc[String(data.remote)]

if (!remote) {
return cb(ErrorType.E_NOT_FOUND)
}

remote.forwardRequest(this.ids[pi.id.toB58String()], data.data, data.signature, (err, res) => {
if (err) {
return cb(err instanceof Error ? ErrorType.E_OTHER : err)
}

return cb(null, {
data: data.data,
signature: data.signature
})
})

break
}
case Type.RESPONSE: {
// TODO: add
let cb = cbs[data.id]
if (cb) {
delete cbs[data.id]

log('got req->res response %s', data.id)

if (data.error) {
return cb(data.error)
}

return cb(null, data)
}

break
}
Expand All @@ -87,8 +135,22 @@ module.exports = (pi, server) => {
sink,
methods: {
online: () => online,
requestForward: () => {
// TODO: add
requestForward: (remote, data, signature, cb) => {
if (!online) {
return cb(new Error('Not online!'))
}

let rid = id++ * 2 + 1

cbs[rid] = wrap(cb)

source.push({
type: Type.REQUEST,
id: rid,
data,
signature,
remote
})
}
}
}
Expand Down

0 comments on commit 144ef94

Please sign in to comment.