diff --git a/.travis.yml b/.travis.yml index 6528f584e..40119f00f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ services: before_install: - docker pull ipfs/go-ipfs:master - mkdir /tmp/ipfs && chmod 0777 /tmp/ipfs -- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master +- docker run -d -v /tmp/ipfs:/data/ipfs -p 8080:8080 -p 4001:4001 -p 5001:5001 ipfs/go-ipfs:master --enable-pubsub-experiment install: - go get -t -v ./... diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 000000000..37d40d47c --- /dev/null +++ b/pubsub.go @@ -0,0 +1,83 @@ +package shell + +import ( + "encoding/binary" + "encoding/json" + + "github.com/libp2p/go-floodsub" + "github.com/libp2p/go-libp2p-peer" +) + +// PubSubRecord is a record received via PubSub. +type PubSubRecord interface { + // From returns the peer ID of the node that published this record + From() peer.ID + + // Data returns the data field + Data() []byte + + // SeqNo is the sequence number of this record + SeqNo() int64 + + //TopicIDs is the list of topics this record belongs to + TopicIDs() []string +} + +type floodsubRecord struct { + msg *floodsub.Message +} + +func (r floodsubRecord) From() peer.ID { + return r.msg.GetFrom() +} + +func (r floodsubRecord) Data() []byte { + return r.msg.GetData() +} + +func (r floodsubRecord) SeqNo() int64 { + return int64(binary.BigEndian.Uint64(r.msg.GetSeqno())) +} + +func (r floodsubRecord) TopicIDs() []string { + return r.msg.GetTopicIDs() +} + +/// + +// PubSubSubscription allow you to receive pubsub records that where published on the network. +type PubSubSubscription struct { + resp *Response +} + +func newPubSubSubscription(resp *Response) *PubSubSubscription { + sub := &PubSubSubscription{ + resp: resp, + } + + sub.Next() // skip empty element used for flushing + return sub +} + +// Next waits for the next record and returns that. +func (s *PubSubSubscription) Next() (PubSubRecord, error) { + if s.resp.Error != nil { + return nil, s.resp.Error + } + + d := json.NewDecoder(s.resp.Output) + + r := &floodsub.Message{} + err := d.Decode(r) + + return floodsubRecord{msg: r}, err +} + +// Cancel cancels the given subscription. +func (s *PubSubSubscription) Cancel() error { + if s.resp.Output == nil { + return nil + } + + return s.resp.Output.Close() +} diff --git a/shell.go b/shell.go index 291dd393f..b91eac2ae 100644 --- a/shell.go +++ b/shell.go @@ -689,6 +689,27 @@ func (s *Shell) ObjectPut(obj *IpfsObject) (string, error) { return out.Hash, nil } +func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) { + // connect + req := s.newRequest("pubsub/sub", topic) + + resp, err := req.Send(s.httpcli) + if err != nil { + return nil, err + } + + return newPubSubSubscription(resp), nil +} + +func (s *Shell) PubSubPublish(topic, data string) error { + _, err := s.newRequest("pubsub/pub", topic, data).Send(s.httpcli) + if err != nil { + return err + } + + return nil +} + func (s *Shell) DiagNet(format string) ([]byte, error) { var result = new(bytes.Buffer) diff --git a/shell_test.go b/shell_test.go index 1e246dd5a..74ec854ec 100644 --- a/shell_test.go +++ b/shell_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/cheekybits/is" ) @@ -116,3 +117,53 @@ func TestResolvePath(t *testing.T) { is.Nil(err) is.Equal(childHash, "QmZTR5bcpQD7cFgTorqxZDYaew1Wqgfbd2ud9QqGPAkK2V") } + +func TestPubSub(t *testing.T) { + is := is.New(t) + s := NewShell(shellUrl) + + var ( + topic = "test" + + sub *PubSubSubscription + err error + ) + + t.Log("subscribing...") + sub, err = s.PubSubSubscribe(topic) + is.Nil(err) + is.NotNil(sub) + t.Log("sub: done") + + time.Sleep(10 * time.Millisecond) + + t.Log("publishing...") + is.Nil(s.PubSubPublish(topic, "Hello World!")) + t.Log("pub: done") + + t.Log("next()...") + r, err := sub.Next() + t.Log("next: done. ") + + is.Nil(err) + is.NotNil(r) + is.Equal(r.Data(), "Hello World!") + + sub2, err := s.PubSubSubscribe(topic) + is.Nil(err) + is.NotNil(sub2) + + is.Nil(s.PubSubPublish(topic, "Hallo Welt!")) + + r, err = sub2.Next() + is.Nil(err) + is.NotNil(r) + is.Equal(r.Data(), "Hallo Welt!") + + r, err = sub.Next() + is.NotNil(r) + is.Nil(err) + is.Equal(r.Data(), "Hallo Welt!") + + is.Nil(sub.Cancel()) +}