-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message Validators #55
Conversation
- make validators time out after 100ms - add context param to validator functions - add type Validator func(context.Context, *Message) bool - drop message if more than 10 messages are already being validated
Removed the global validation throttle and cleaned up the subscription validation logic. |
reword pushMsg for less indentation nesting.
floodsub.go
Outdated
return | ||
} | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hrm... spawning a goroutine for each message being sent is a bit wonky. Maybe:
select {
case p.sendMsg <- .....:
default:
go func() {
p.sendMsg <- ....
}()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we only have to spawn a new goroutine if it's a publish, in which case the push happens within the event loop.
I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are always in the event loop actually. I added a buffer to sendMsg
so that we almost always avoid the goroutine.
floodsub.go
Outdated
@@ -54,6 +58,9 @@ type PubSub struct { | |||
// topics tracks which topics each of our peers are subscribed to | |||
topics map[string]map[peer.ID]struct{} | |||
|
|||
// sendMsg handles messages that have been validated | |||
sendMsg chan sendReq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking that this should be a chan *sendReq
for consistency with the other channels.
// NewFloodSub returns a new FloodSub management object | ||
func NewFloodSub(ctx context.Context, h host.Host) *PubSub { | ||
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could avoid the interface change if we don't use any options -- do we want this for future proofing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it, it's a nice interface; also we may want to pass an option for global validation throttle.
and defaultMaxConcurrency is defaultValidateConcurrency.
subscription.go
Outdated
vctx, cancel := context.WithTimeout(ctx, sub.validateTimeout) | ||
defer cancel() | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annoying that we have to use an extra goroutine for this, but probably okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's how it go es :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not just trust that validate will respect its context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
subscription.go
Outdated
vctx, cancel := context.WithTimeout(ctx, sub.validateTimeout) | ||
defer cancel() | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not just trust that validate will respect its context?
floodsub.go
Outdated
select { | ||
case p.sendMsg <- sreq: | ||
default: | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this pattern (too easy to open a bunch of go routines). Could we just call maybePublishMessage
directly and rename the sendMsg
channel name to validatedMsgs
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a buffer to sendMsg so that we should almost never overflow.
But you are right, we can just call maybePublishMessage
!
I'll keep the sendMsg
channel named as it is, because it indicates a message that is ready to send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped the dance, and called maybePublishMessage
directly .
floodsub.go
Outdated
<-p.validateThrottle | ||
}() | ||
default: | ||
log.Warningf("message validation throttled; dropping message from %s", src) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll probably want per-peer or per-topic throttles in the future but, for now, this is probably fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have those! That was part of the improvements in this PR (although most of the stuff got rewritten).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... I'm worried that it's still possible to starve a topic by maxing out the global validator first. Ideally, we'd take the per-topic ticket first, then the global one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be hard to do, because a throttled subscription would release the semaphore quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I improved the comments around the throttle to more clearly describe the intention and existence of per-topic throttles.
floodsub.go
Outdated
continue | ||
} | ||
|
||
rch := make(chan bool, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just use a single channel and count results from it. Allocating channels isn't that expensive but it's certainly not free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough; it was really convenient to write this way though :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright, implemented!
so f1be0f1, which removed the external goroutine enforcing the timeout bombs two of keks' tests -- they explicitly test the external goroutine timing out. |
Removed the two offending tests, as they test faulty behaviour. More to the point, per discussion with @Stebalien on irc: if we timeout externally, the faulty validator is still running, while we unthrottle and leak a goroutine. It is preferable to have the faulty validator to cause the topic to throttle, at which point it can be identified as a bug and be debuged, rather than having a silent goroutine leak. |
@Stebalien ping -- let's move this forward? |
I don't see how this PR is a step in the right direction. Validators should either be a property of the topic (registered at initialization, preferably) or validators should be a property of the subscription in which case a message's validity for subscription A shouldn't affect the message's validity for subscription B. Given that making validators a property of the subscription makes it difficult to determine if we should forward the message, validators should be a property of the topic. |
Ok, I will rewrite to implement per topic validators then. |
I have removed the per-subscription validators and implemented per-topic validations. |
summoning @Stebalien @whyrusleeping |
// pushMsg pushes a message performing validation as necessary | ||
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) { | ||
if len(vals) > 0 { | ||
// validation is asynchronous and globally throttled with the throttleValidate semaphore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment still true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is still true.
I can remove the global throttle if we deem so, I just like to have a limit on how much goncurrency we can spawn as a result of incoming network traffic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although, subscription reference below! that's not true, I will fix the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My current understanding is that we still have global validators, but no global throttling right? I still see a pubsub level validate throttle though, so theres still some global throttling?
Oh wait, I was thinking backwards, no global validators, but still global throttling. |
Right, we have a global throttle that applies to all validators, that's set to a reasonably high limit (8192 active goroutines). This is to protect us from blowing our resource usage from big bursts of traffic that trigger validations. Each individual topic validator has its own throttle, that limits the resource usage per topic. This throttle by default is set much lower, to a mere 100 active goroutines. As I said we can remove the global throttle if we deem so, and we can also tweak the defaults accordingly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be fine accepting this if @whyrusleeping is as I consider it a step in the right direction this time but I'm still not happy with it (see #58). Why are you so opposed to having validators decide which topics they care about? The way this currently works, subscribing to the same topic twice will be painful.
Hrm, I am not terminally opposed to them -- I just find the interface a little clunky.
Why? The validator registration is decoupled from the subscription and it's something we do at initialization. |
Hey, I'm a brand-newbie in looking at this project, but for me, in the use cases I have in mind, I'd like to be able to validate by topic (although I can see why "by subscription" would also be useful) - but perhaps more importantly, I'd like not to propagate messages to other peers, when they do not validate. I might even want to drop peers who send invalid messages. I guess peer attribution might be a bit outside the scope of what's currently possible - but that's my $0.02. |
@jvsteiner with this pr, we do exactly that: we don't broadcast messages that don't validate on a per-topic basis. We have rejected subscription-based validators because of incoherent semantics. The global validators will simply add an additional layer below per topic validators: they will allow us to define validator classes for pubsub namespaces and will be implemented in follow up work. |
@whyrusleeping I believe we're waiting on you for the final signoff. |
Implements asynchronous message validators.
Continuation of #45 which was merged and rebased in a feature branch so that I can continue work on it.
Closes #46.
Closes #56.
cc @whyrusleeping @keks