-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prevent joining an already existed response topic #13
Conversation
Signed-off-by: Merlin Ran <merlinran@gmail.com>
4028967
to
6288236
Compare
rpc.go
Outdated
if err != nil && | ||
// this can happen when more than one messages are received | ||
// from the same peer. | ||
!strings.Contains(err.Error(), "topic already exists") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally or not, the libp2p library doesn't provide a better way to check this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A decision also needs to be made here: Should we continue or simply return if topic exists? For our usage, an existing topic means duplicated message, which should simply be ignored (the only topic with more than one messages is the AuctionTopic, which doesn't watch for response), but for general user case, it should be totally fine to send many messages on a single topic and expect many responses.
I vote for correctness of this package and let the caller handle duplications
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oop continuing won't work... as top
is nil
for whatever err
returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this must happen when the first response topic hasn't been closed yet. I think it's reasonable to assume only one response can be sent at a time... but many can be sent over the lifetime of the topic.
conclusion: just ignore the error and return from publishResponse
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this relates to race condition - because publishResponse
runs in a separate goroutine, what if there are actually more than one messages arrive, hence start several goroutines? All but the first one would be responded.
Even if we respond all messages, they can be out of order, hence a bigger problem: the other end gets response for a wrong message. The simplest would be to stop spawning new goroutine for response.
There's some previous discussion around this behaviour of |
If we want to send multiple responses to the same host for a single topic, we need to remember the topics we already created for response - creating the response topic when a peer joins and closing when it leaves, for example. Or, we can have unique response topic for each individual message, which is too heavy I feel. |
Signed-off-by: Merlin Ran <merlinran@gmail.com>
t.publishResponse(msg.ReceivedFrom, msgID, data, err) | ||
}() | ||
msgID := cid.NewCidV1(cid.Raw, util.Hash(msg.Data)) | ||
t.publishResponse(msg.ReceivedFrom, msgID, data, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ended up making publishResponse
synchronous. It is actually not a blocking call as I anticipated. What it does: 1) call newTopic
, which returns immediately, and 2) call topic.Publish
, which blocks when calling t.p.disc.Bootstrap
https://github.com/libp2p/go-libp2p-pubsub/blob/master/topic.go#L245, but only if WithReadiness
is set, which we don't. So, synchronous call always close the response topic, and responses are never out of order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, that sounds much better
rpc.go
Outdated
@@ -91,6 +91,7 @@ func NewTopic(ctx context.Context, ps *pubsub.PubSub, host peer.ID, topic string | |||
} | |||
t.resTopic, err = newTopic(ctx, ps, host, responseTopic(topic, host), true) | |||
if err != nil { | |||
t.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by. cleaning up goroutines in case of error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be t.Close()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right... fixed. also added some lines to properly close resources in newTopic
9176585
@sanderpick how do you feel about the changes now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 💪
Signed-off-by: Merlin Ran <merlinran@gmail.com>
ec807b7
to
9176585
Compare
Merging. Thanks for the discussion here! |
With re-publishing, this can be seen in bidbot frequently. This is a fix.