Skip to content

Commit

Permalink
Merge pull request #39 from textileio/jsign/prl
Browse files Browse the repository at this point in the history
Handle subscription messages concurrently
  • Loading branch information
jsign committed Feb 24, 2022
2 parents c781ac1 + f906227 commit 2ba2443
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,25 @@ func (t *Topic) listen() {
t.lk.Lock()
handler := t.messageHandler
t.lk.Unlock()

if handler != nil {
data, err := handler(msg.ReceivedFrom, t.t.String(), msg.Data)
if !strings.Contains(t.t.String(), "/_response") {
// This is a normal message; respond with data and error
msgID := cid.NewCidV1(cid.Raw, util.Hash(msg.Data))
t.publishResponse(msg.ReceivedFrom, msgID, data, err)
} else if err != nil {
log.Errorf("response message handler: %v", err)
}
if handler == nil {
log.Warnf("didn't process topic message since we don't have a handler")
continue
}
go processSubscriptionMessage(handler, msg.ReceivedFrom, t, msg.Data)
}
}

func processSubscriptionMessage(handler MessageHandler, from peer.ID, t *Topic, msgData []byte) {
res, err := handler(from, t.t.String(), msgData)
if err != nil {
log.Errorf("subcription message handler: %v", err)
// Intentionally not returning since we send the error
// to the other side in the response.
}
if !strings.Contains(t.t.String(), "/_response") {
// This is a normal message; respond with data and error
msgID := cid.NewCidV1(cid.Raw, util.Hash(msgData))
t.publishResponse(from, msgID, res, err)
}
}

Expand Down

0 comments on commit 2ba2443

Please sign in to comment.