Skip to content

Commit

Permalink
Merge pull request #13 from textileio/merlin/topic-exists
Browse files Browse the repository at this point in the history
prevent joining an already existed response topic
  • Loading branch information
merlinran committed Aug 12, 2021
2 parents 3155738 + 9176585 commit 08dfe40
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func NewTopic(ctx context.Context, ps *pubsub.PubSub, host peer.ID, topic string
}
t.resTopic, err = newTopic(ctx, ps, host, responseTopic(topic, host), true)
if err != nil {
_ = t.Close()
return nil, fmt.Errorf("creating response topic: %v", err)
}
t.resTopic.eventHandler = t.resEventHandler
Expand All @@ -106,13 +107,16 @@ func newTopic(ctx context.Context, ps *pubsub.PubSub, host peer.ID, topic string

handler, err := top.EventHandler()
if err != nil {
_ = top.Close()
return nil, fmt.Errorf("getting topic handler: %v", err)
}

var sub *pubsub.Subscription
if subscribe {
sub, err = top.Subscribe()
if err != nil {
handler.Cancel()
_ = top.Close()
return nil, fmt.Errorf("subscribing to topic: %v", err)
}
}
Expand Down Expand Up @@ -168,7 +172,9 @@ func (t *Topic) SetMessageHandler(handler MessageHandler) {
t.messageHandler = handler
}

// Publish data. Note that the data may arrive peers duplicated. See PublishOptions for option details.
// Publish data. Note that the data may arrive peers duplicated. And as a
// result, if WithMultiResponse is supplied, the response may be duplicated as
// well. See PublishOptions for option details.
func (t *Topic) Publish(
ctx context.Context,
data []byte,
Expand Down Expand Up @@ -296,10 +302,8 @@ func (t *Topic) listen() {
data, err := handler(msg.ReceivedFrom, t.t.String(), msg.Data)
if !strings.Contains(t.t.String(), "/_response") {
// This is a normal message; respond with data and error
go func() {
msgID := cid.NewCidV1(cid.Raw, util.Hash(msg.Data))
t.publishResponse(msg.ReceivedFrom, msgID, data, err)
}()
msgID := cid.NewCidV1(cid.Raw, util.Hash(msg.Data))
t.publishResponse(msg.ReceivedFrom, msgID, data, err)
} else if err != nil {
log.Errorf("response message handler: %v", err)
}
Expand Down

0 comments on commit 08dfe40

Please sign in to comment.