Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package shell

import (
"encoding/json"
"io"

"github.com/libp2p/go-libp2p-peer"
)
Expand All @@ -16,33 +17,27 @@ type Message struct {

// PubSubSubscription allow you to receive pubsub records that where published on the network.
type PubSubSubscription struct {
resp *Response
resp io.Closer
dec *json.Decoder
}

func newPubSubSubscription(resp *Response) *PubSubSubscription {
sub := &PubSubSubscription{
func newPubSubSubscription(resp io.ReadCloser) *PubSubSubscription {
return &PubSubSubscription{
resp: resp,
dec: json.NewDecoder(resp),
}

return sub
}

// Next waits for the next record and returns that.
func (s *PubSubSubscription) Next() (*Message, error) {
if s.resp.Error != nil {
return nil, s.resp.Error
}

d := json.NewDecoder(s.resp.Output)

var r struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

err := d.Decode(&r)
err := s.dec.Decode(&r)
if err != nil {
return nil, err
}
Expand All @@ -61,9 +56,5 @@ func (s *PubSubSubscription) Next() (*Message, error) {

// Cancel cancels the given subscription.
func (s *PubSubSubscription) Cancel() error {
if s.resp.Output == nil {
return nil
}

return s.resp.Output.Close()
return s.resp.Close()
}
6 changes: 5 additions & 1 deletion shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,11 @@ func (s *Shell) PubSubSubscribe(topic string) (*PubSubSubscription, error) {
if err != nil {
return nil, err
}
return newPubSubSubscription(resp), nil
if resp.Error != nil {
resp.Close()
return nil, resp.Error
}
return newPubSubSubscription(resp.Output), nil
}

func (s *Shell) PubSubPublish(topic, data string) (err error) {
Expand Down