Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix(pubsub): new wire format in http rpc
Browse files Browse the repository at this point in the history
Implements changes from ipfs/kubo#8183
  • Loading branch information
lidel committed Oct 20, 2021
1 parent afeb20d commit 461224d
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 28 deletions.
64 changes: 64 additions & 0 deletions packages/ipfs-http-client/src/lib/http-rpc-wire-format.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { base64url } from 'multiformats/bases/base64'
import PeerId from 'peer-id'

/* HTTP RPC:
* - sends PeerIds as Base58btc (legacy) or CIDv1
* - wraps binary data in multibase. base64url is used to avoid issues
* when a binary data is passed as search param in URL.
* Historical context: https://github.com/ipfs/go-ipfs/issues/7939
* Multibase wrapping introduced in: https://github.com/ipfs/go-ipfs/pull/8183
*/

/**
* @param {Array<string>} strings
* @returns {Array<string>} strings
*/
const mbToTextArray = strings => {
if (Array.isArray(strings)) {
return strings.map(mbToText)
}
return strings
}

/**
* @param {string} mb
* @returns {string}
*/
const mbToText = mb => uint8ArrayToString(mbToBytes(mb))

/**
* @param {string} mb
* @returns {Uint8Array}
*/
const mbToBytes = mb => base64url.decode(mb)

/**
* @param {string} text
* @returns {string}
*/
const toUrlSafeBase = text => base64url.encode(uint8ArrayFromString(text))

/**
* Ensure uniform Peer ID representation in text
*
* @param {Array<string>} peerids
* @returns {Array<string>} peerids
*/
const normalizePeerIds = peerids => {
if (Array.isArray(peerids)) {
return peerids.map(normalizePeerId)
}
return peerids
}

/**
* Ensure uniform Peer ID representation in text
*
* @param {string} peerid
* @returns {string}
*/
const normalizePeerId = peerid => PeerId.parse(peerid).toB58String() // TODO: toString() when go-ipfs switch to CIDv1

export { mbToTextArray, mbToText, mbToBytes, toUrlSafeBase, normalizePeerIds, normalizePeerId }
4 changes: 2 additions & 2 deletions packages/ipfs-http-client/src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { mbToTextArray } from '../lib/http-rpc-wire-format.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -17,8 +18,7 @@ export const createLs = configure(api => {
headers: options.headers
})).json()

// TODO: unwrap topic names from multibase
return Strings || []
return mbToTextArray(Strings) || []
}
return ls
})
4 changes: 2 additions & 2 deletions packages/ipfs-http-client/src/pubsub/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { toUrlSafeBase } from '../lib/http-rpc-wire-format.js'
import { AbortController } from 'native-abort-controller'

/**
Expand All @@ -14,9 +15,8 @@ export const createPublish = configure(api => {
* @type {PubsubAPI["publish"]}
*/
async function publish (topic, data, options = {}) {
// TODO: wrap topic in multibase
const searchParams = toUrlSearchParams({
arg: topic,
arg: toUrlSafeBase(topic),
...options
})

Expand Down
16 changes: 6 additions & 10 deletions packages/ipfs-http-client/src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import debug from 'debug'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { toUrlSafeBase, mbToTextArray, mbToBytes } from '../lib/http-rpc-wire-format.js'
const log = debug('ipfs-http-client:pubsub:subscribe')

/**
Expand Down Expand Up @@ -39,12 +38,11 @@ export const createSubscribe = (options, subsTracker) => {
// is received. If this doesn't happen within 1 second assume success
const ffWorkaround = setTimeout(() => done(), 1000)

// TODO: wrap topic in multibase
// Do this async to not block Firefox
api.post('pubsub/sub', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: topic,
arg: toUrlSafeBase(topic),
...options
}),
headers: options.headers
Expand Down Expand Up @@ -95,13 +93,11 @@ async function readMessages (response, { onMessage, onEnd, onError }) {
continue
}

// TODO: multibase data, seqno and topics
// TODO: parse string and get peerid bytes using libp2p lib
onMessage({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base64pad'), 'base58btc'),
data: uint8ArrayFromString(msg.data, 'base64pad'),
seqno: uint8ArrayFromString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from,
data: mbToBytes(msg.data),
seqno: mbToBytes(msg.seqno),
topicIDs: mbToTextArray(msg.topicIDs)
})
} catch (/** @type {any} */ err) {
err.message = `Failed to parse pubsub message: ${err.message}`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-http-client/test/utils/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const commonOptions = {

const commonOverrides = {
go: {
ipfsBin: isNode ? path() : undefined
ipfsBin: isNode ? (process.env.IPFS_GO_EXEC || path()) : undefined
}
}

Expand Down
40 changes: 27 additions & 13 deletions packages/ipfs-http-server/src/api/resources/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { streamResponse } from '../../utils/stream-response.js'
import pushable from 'it-pushable'
import { base64url } from 'multiformats/bases/base64'

const preDecodeTopicFromHttpRpc = {
assign: 'topic',
/**
* @param {import('../../types').Request} request
* @param {import('@hapi/hapi').ResponseToolkit} _h
*/
method: async (request, _h) => {
try {
return uint8ArrayToString(base64url.decode(request.query.topic))
} catch (/** @type {any} */ err) {
throw Boom.boomify(err, { message: `Failed to decode topic from HTTP RPC form ${request.query.topic}` })
}
}
}

export const subscribeResource = {
options: {
Expand All @@ -24,7 +40,8 @@ export const subscribeResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand Down Expand Up @@ -56,13 +73,11 @@ export const subscribeResource = {
* @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn}
*/
const handler = (msg) => {
// TODO: data, seqno and topicIDs in multibase
// TODO: from should use canonical toString from peerid libp2p lib
output.push({
from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'),
data: uint8ArrayToString(msg.data, 'base64pad'),
seqno: uint8ArrayToString(msg.seqno, 'base64pad'),
topicIDs: msg.topicIDs
from: msg.from, // TODO: switch to PeerId.parse(msg.from).toString() when go-ipfs defaults to CIDv1
data: base64url.encode(msg.data),
seqno: base64url.encode(msg.seqno),
topicIDs: msg.topicIDs.map(t => base64url.encode(uint8ArrayFromString(t)))
})
}

Expand Down Expand Up @@ -92,7 +107,7 @@ export const publishResource = {
parse: false,
output: 'stream'
},
pre: [{
pre: [preDecodeTopicFromHttpRpc, {
assign: 'data',
/**
* @param {import('../../types').Request} request
Expand Down Expand Up @@ -158,7 +173,7 @@ export const publishResource = {
} = request

try {
// TODO: unwrap topic from multibase?
// TODO: move to reusable pre handler?
await ipfs.pubsub.publish(topic, data, {
signal,
timeout
Expand Down Expand Up @@ -212,8 +227,7 @@ export const lsResource = {
throw Boom.boomify(err, { message: 'Failed to list subscriptions' })
}

// TODO: multibase topic names in Strings array
return h.response({ Strings: subscriptions })
return h.response({ Strings: subscriptions.map(s => base64url.encode(uint8ArrayFromString(s))) })
}
}

Expand All @@ -232,7 +246,8 @@ export const peersResource = {
override: true,
ignoreUndefined: true
})
}
},
pre: [preDecodeTopicFromHttpRpc]
},
/**
* @param {import('../../types').Request} request
Expand All @@ -256,7 +271,6 @@ export const peersResource = {

let peers
try {
// TODO: unwrap topic from multibase
peers = await ipfs.pubsub.peers(topic, {
signal,
timeout
Expand Down

0 comments on commit 461224d

Please sign in to comment.