From 55db949474aee6872aab814b81feb3205263c653 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Mon, 29 Nov 2021 22:28:15 +0100 Subject: [PATCH] feat: pubsub http rpc with multibase (#255) * feat: pubsub with multibase This updates HTTP RPC wire format to one from https://github.com/ipfs/go-ipfs/pull/8183 * use go install instead of make install * ci: switch to go-ipfs master * update README Co-authored-by: Adin Schmahmann --- .github/actions/go-test-setup/action.yml | 2 +- README.md | 6 ++--- go.mod | 3 ++- go.sum | 10 ++++--- pubsub.go | 33 +++++++++++++++++++----- shell.go | 13 ++++++++-- shell_test.go | 14 +++++----- 7 files changed, 57 insertions(+), 24 deletions(-) diff --git a/.github/actions/go-test-setup/action.yml b/.github/actions/go-test-setup/action.yml index cc5b02c95..2b9d64e30 100644 --- a/.github/actions/go-test-setup/action.yml +++ b/.github/actions/go-test-setup/action.yml @@ -12,4 +12,4 @@ runs: run: (ipfs init) - name: Run go-ipfs shell: bash - run: (ipfs daemon --enable-pubsub-experiment &) \ No newline at end of file + run: (ipfs daemon --enable-pubsub-experiment &) diff --git a/README.md b/README.md index e0d5c3f1e..8c3b6e0c8 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # go-ipfs-api -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) -[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) -[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](https://ipfs.io/) +[![](https://img.shields.io/badge/matrix-%23ipfs-blue.svg?style=flat-square)](https://app.element.io/#/room/#ipfs:matrix.org) [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-api?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-api) [![Build Status](https://travis-ci.org/ipfs/go-ipfs-api.svg)](https://travis-ci.org/ipfs/go-ipfs-api) diff --git a/go.mod b/go.mod index 48efd9859..9c5e5a3b6 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,11 @@ module github.com/ipfs/go-ipfs-api require ( github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 - github.com/ipfs/go-ipfs-files v0.0.8 + github.com/ipfs/go-ipfs-files v0.0.9 github.com/ipfs/go-ipfs-util v0.0.2 github.com/libp2p/go-libp2p-core v0.6.1 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.3.0 + github.com/multiformats/go-multibase v0.0.3 github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c ) diff --git a/go.sum b/go.sum index 6ea9b8c25..fe3c66de5 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= -github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg= -github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs= +github.com/ipfs/go-ipfs-files v0.0.9 h1:OFyOfmuVDu9c5YtjSDORmwXzE6fmZikzZpzsnNkgFEg= +github.com/ipfs/go-ipfs-files v0.0.9/go.mod h1:aFv2uQ/qxWpL/6lidWvnSQmaVqCrf0TBGoUr+C1Fo84= github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= @@ -89,8 +89,9 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0= github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs= go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= @@ -139,6 +140,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pubsub.go b/pubsub.go index c27d35852..98345a853 100644 --- a/pubsub.go +++ b/pubsub.go @@ -5,6 +5,7 @@ import ( "io" "github.com/libp2p/go-libp2p-core/peer" + mbase "github.com/multiformats/go-multibase" ) // Message is a pubsub message. @@ -31,9 +32,9 @@ func newPubSubSubscription(resp io.ReadCloser) *PubSubSubscription { // Next waits for the next record and returns that. func (s *PubSubSubscription) Next() (*Message, error) { var r struct { - From []byte `json:"from,omitempty"` - Data []byte `json:"data,omitempty"` - Seqno []byte `json:"seqno,omitempty"` + From string `json:"from,omitempty"` + Data string `json:"data,omitempty"` + Seqno string `json:"seqno,omitempty"` TopicIDs []string `json:"topicIDs,omitempty"` } @@ -42,15 +43,33 @@ func (s *PubSubSubscription) Next() (*Message, error) { return nil, err } - from, err := peer.IDFromBytes(r.From) + // fields are wrapped in multibase when sent over HTTP RPC + // and need to be decoded (https://github.com/ipfs/go-ipfs/pull/8183) + from, err := peer.Decode(r.From) if err != nil { return nil, err } + _, data, err := mbase.Decode(r.Data) + if err != nil { + return nil, err + } + _, seqno, err := mbase.Decode(r.Seqno) + if err != nil { + return nil, err + } + topics := make([]string, len(r.TopicIDs)) + for _, mbtopic := range r.TopicIDs { + _, topic, err := mbase.Decode(mbtopic) + if err != nil { + return nil, err + } + topics = append(topics, string(topic)) + } return &Message{ From: from, - Data: r.Data, - Seqno: r.Seqno, - TopicIDs: r.TopicIDs, + Data: data, + Seqno: seqno, + TopicIDs: topics, }, nil } diff --git a/shell.go b/shell.go index 4d696d86e..78bbcc5dd 100644 --- a/shell.go +++ b/shell.go @@ -20,6 +20,7 @@ import ( homedir "github.com/mitchellh/go-homedir" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + mbase "github.com/multiformats/go-multibase" tar "github.com/whyrusleeping/tar-utils" p2pmetrics "github.com/libp2p/go-libp2p-core/metrics" @@ -516,7 +517,8 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) { func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) { // connect - resp, err := s.Request("pubsub/sub", topic).Send(context.Background()) + encoder, _ := mbase.EncoderByName("base64url") + resp, err := s.Request("pubsub/sub", encoder.Encode([]byte(topic))).Send(context.Background()) if err != nil { return nil, err } @@ -528,7 +530,14 @@ func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) { } func (s *Shell) PubSubPublish(topic, data string) (err error) { - resp, err := s.Request("pubsub/pub", topic, data).Send(context.Background()) + + fr := files.NewReaderFile(bytes.NewReader([]byte(data))) + slf := files.NewSliceDirectory([]files.DirEntry{files.FileEntry("", fr)}) + fileReader := files.NewMultiFileReader(slf, true) + + encoder, _ := mbase.EncoderByName("base64url") + resp, err := s.Request("pubsub/pub", encoder.Encode([]byte(topic))). + Body(fileReader).Send(context.Background()) if err != nil { return err } diff --git a/shell_test.go b/shell_test.go index ba0d869de..f2e2ece9f 100644 --- a/shell_test.go +++ b/shell_test.go @@ -369,7 +369,9 @@ func TestPubSub(t *testing.T) { s := NewShell(shellUrl) var ( - topic = "test" + topic = "test\n topic\r\t with unsafe bytes" + payload1 = "Hello\r\nWorld\t!" + payload2 = "Hallo\r\nWelt\t!!11oneonÄ™" sub *PubSubSubscription err error @@ -384,7 +386,7 @@ func TestPubSub(t *testing.T) { time.Sleep(10 * time.Millisecond) t.Log("publishing...") - is.Nil(s.PubSubPublish(topic, "Hello World!")) + is.Nil(s.PubSubPublish(topic, payload1)) t.Log("pub: done") t.Log("next()...") @@ -393,23 +395,23 @@ func TestPubSub(t *testing.T) { is.Nil(err) is.NotNil(r) - is.Equal(r.Data, "Hello World!") + is.Equal(r.Data, payload1) sub2, err := s.PubSubSubscribe(topic) is.Nil(err) is.NotNil(sub2) - is.Nil(s.PubSubPublish(topic, "Hallo Welt!")) + is.Nil(s.PubSubPublish(topic, payload2)) r, err = sub2.Next() is.Nil(err) is.NotNil(r) - is.Equal(r.Data, "Hallo Welt!") + is.Equal(r.Data, payload2) r, err = sub.Next() is.NotNil(r) is.Nil(err) - is.Equal(r.Data, "Hallo Welt!") + is.Equal(r.Data, payload2) is.Nil(sub.Cancel()) }