Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

feat: pubsub http rpc with multibase #255

Merged
merged 5 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/go-test-setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ runs:
run: (ipfs init)
- name: Run go-ipfs
shell: bash
run: (ipfs daemon --enable-pubsub-experiment &)
run: (ipfs daemon --enable-pubsub-experiment &)
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# go-ipfs-api

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](https://ipfs.io/)
[![](https://img.shields.io/badge/matrix-%23ipfs-blue.svg?style=flat-square)](https://app.element.io/#/room/#ipfs:matrix.org)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-api?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-api)
[![Build Status](https://travis-ci.org/ipfs/go-ipfs-api.svg)](https://travis-ci.org/ipfs/go-ipfs-api)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ module github.com/ipfs/go-ipfs-api

require (
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipfs-util v0.0.2
github.com/libp2p/go-libp2p-core v0.6.1
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.3.0
github.com/multiformats/go-multibase v0.0.3
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-ipfs-files v0.0.8 h1:8o0oFJkJ8UkO/ABl8T6ac6tKF3+NIpj67aAB6ZpusRg=
github.com/ipfs/go-ipfs-files v0.0.8/go.mod h1:wiN/jSG8FKyk7N0WyctKSvq3ljIa2NNTiZB55kpTdOs=
github.com/ipfs/go-ipfs-files v0.0.9 h1:OFyOfmuVDu9c5YtjSDORmwXzE6fmZikzZpzsnNkgFEg=
github.com/ipfs/go-ipfs-files v0.0.9/go.mod h1:aFv2uQ/qxWpL/6lidWvnSQmaVqCrf0TBGoUr+C1Fo84=
github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8=
github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down Expand Up @@ -89,8 +89,9 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c h1:GGsyl0dZ2jJgVT+VvWBf/cNijrHRhkrTjkmp5wg7li0=
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c/go.mod h1:xxcJeBb7SIUl/Wzkz1eVKJE/CB34YNrqX2TQI6jY9zs=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
Expand Down Expand Up @@ -139,6 +140,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
33 changes: 26 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/libp2p/go-libp2p-core/peer"
mbase "github.com/multiformats/go-multibase"
)

// Message is a pubsub message.
Expand All @@ -31,9 +32,9 @@ func newPubSubSubscription(resp io.ReadCloser) *PubSubSubscription {
// Next waits for the next record and returns that.
func (s *PubSubSubscription) Next() (*Message, error) {
var r struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
From string `json:"from,omitempty"`
Data string `json:"data,omitempty"`
Seqno string `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

Expand All @@ -42,15 +43,33 @@ func (s *PubSubSubscription) Next() (*Message, error) {
return nil, err
}

from, err := peer.IDFromBytes(r.From)
// fields are wrapped in multibase when sent over HTTP RPC
// and need to be decoded (https://github.com/ipfs/go-ipfs/pull/8183)
from, err := peer.Decode(r.From)
if err != nil {
return nil, err
}
_, data, err := mbase.Decode(r.Data)
if err != nil {
return nil, err
}
_, seqno, err := mbase.Decode(r.Seqno)
if err != nil {
return nil, err
}
topics := make([]string, len(r.TopicIDs))
for _, mbtopic := range r.TopicIDs {
_, topic, err := mbase.Decode(mbtopic)
if err != nil {
return nil, err
}
topics = append(topics, string(topic))
}
return &Message{
From: from,
Data: r.Data,
Seqno: r.Seqno,
TopicIDs: r.TopicIDs,
Data: data,
Seqno: seqno,
TopicIDs: topics,
}, nil
}

Expand Down
13 changes: 11 additions & 2 deletions shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
homedir "github.com/mitchellh/go-homedir"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
mbase "github.com/multiformats/go-multibase"
tar "github.com/whyrusleeping/tar-utils"

p2pmetrics "github.com/libp2p/go-libp2p-core/metrics"
Expand Down Expand Up @@ -516,7 +517,8 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) {

func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
// connect
resp, err := s.Request("pubsub/sub", topic).Send(context.Background())
encoder, _ := mbase.EncoderByName("base64url")
resp, err := s.Request("pubsub/sub", encoder.Encode([]byte(topic))).Send(context.Background())
if err != nil {
return nil, err
}
Expand All @@ -528,7 +530,14 @@ func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
}

func (s *Shell) PubSubPublish(topic, data string) (err error) {
resp, err := s.Request("pubsub/pub", topic, data).Send(context.Background())

fr := files.NewReaderFile(bytes.NewReader([]byte(data)))
slf := files.NewSliceDirectory([]files.DirEntry{files.FileEntry("", fr)})
fileReader := files.NewMultiFileReader(slf, true)

encoder, _ := mbase.EncoderByName("base64url")
resp, err := s.Request("pubsub/pub", encoder.Encode([]byte(topic))).
Body(fileReader).Send(context.Background())
if err != nil {
return err
}
Expand Down
14 changes: 8 additions & 6 deletions shell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ func TestPubSub(t *testing.T) {
s := NewShell(shellUrl)

var (
topic = "test"
topic = "test\n topic\r\t with unsafe bytes"
payload1 = "Hello\r\nWorld\t!"
payload2 = "Hallo\r\nWelt\t!!11oneonę"

sub *PubSubSubscription
err error
Expand All @@ -376,7 +378,7 @@ func TestPubSub(t *testing.T) {
time.Sleep(10 * time.Millisecond)

t.Log("publishing...")
is.Nil(s.PubSubPublish(topic, "Hello World!"))
is.Nil(s.PubSubPublish(topic, payload1))
t.Log("pub: done")

t.Log("next()...")
Expand All @@ -385,23 +387,23 @@ func TestPubSub(t *testing.T) {

is.Nil(err)
is.NotNil(r)
is.Equal(r.Data, "Hello World!")
is.Equal(r.Data, payload1)

sub2, err := s.PubSubSubscribe(topic)
is.Nil(err)
is.NotNil(sub2)

is.Nil(s.PubSubPublish(topic, "Hallo Welt!"))
is.Nil(s.PubSubPublish(topic, payload2))

r, err = sub2.Next()
is.Nil(err)
is.NotNil(r)
is.Equal(r.Data, "Hallo Welt!")
is.Equal(r.Data, payload2)

r, err = sub.Next()
is.NotNil(r)
is.Nil(err)
is.Equal(r.Data, "Hallo Welt!")
is.Equal(r.Data, payload2)

is.Nil(sub.Cancel())
}
Expand Down