-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start of event-delivery service #40
Conversation
* Changes to "triggers": * Renamed the "triggers" pacakge to "sources" to avoid confusion. * EventTriggers (FKA Sources) now take EventTriggers instead of just params. * source.EventTrigger is different from v1alpha1.EventTrigger because the latter has embedded Raw types that would be hard to use. * Updated GitHub sample: * Renamed the EventSource to "github.com" rather than just "github" to better align with GCF. I think this revealed something about the control plane model that I want to discuss. * Updated pullrequest.yaml to match new format. * As part of this I made a minor change to update-deps to work on mac (gnu-sed still required).
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: inlined The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/assign @vaikas-google Sorry this didn't break into smaller PRs well =/ |
Changing base to branch v0.1 so it is appropriate to /assign @eobrain while Ville is out of office. |
/assign @eobrain |
/unassign @vaikas-google |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not done with the review, but here is what I have so far. Mostly nits and comments.
cmd/delivery/main.go
Outdated
@@ -0,0 +1,139 @@ | |||
/* | |||
Copyright 2017 The Kubernetes Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/2017/2018/
cmd/delivery/main.go
Outdated
@@ -0,0 +1,139 @@ | |||
/* | |||
Copyright 2017 The Kubernetes Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "The Kubernetes Authors" the correct copyright owner? Or should it be Google like in the other licence files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like I copied a bad copy from cmd/controller.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. I seemed to have copied bad prose from cmd/controller/main.go
// but if it does, propagate it back. | ||
glog.Info("Staring event sender") | ||
if err := sender.Run(senderThreads, stopCh); err != nil { | ||
glog.Fatalf("Error running controller: %s", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Suggest:
glog.Fatalf("Error running controller: %v", err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never "%v" an error. "%s" has an overload for the error interface to call err.Error()
whereas "%v" will inspect the structure (and fails to follow pointers)
cmd/delivery/BUILD.bazel
Outdated
embed = [":go_default_library"], | ||
importpath = "github.com/elafros/eventing/cmd/delivery", | ||
pure = "on", | ||
visibility = ["//visibility:public"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary, given the default_visibility.
cmd/delivery/main.go
Outdated
defer cancel() | ||
srv.Shutdown(ctx) | ||
|
||
glog.Flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or alternatively this could be defer glog.Flush()
at the beginning of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// InMemoryQueue implements the queue interface with a memory buffer. | ||
// Note: this isn't just a simple typedef for a queue because we will soon start | ||
// experimenting with other features, such as fetching an event separate from acking, | ||
// transactional ack + enqueue, cursors, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that these other features will only apply to InMemoryQueue
and not to the Queue
interface? Is that why InMemoryQueue
is exported (rather than being inMemoryQueue
, with NewInMemoryQueue
returning the interface?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They'd apply to the Queue interface; I want to use silly shims to help me decide what I need/want before I go shopping for software
// Queue implements basic features to allow asynchronous buffering of events. | ||
type Queue interface { | ||
Push(event QueuedEvent) error | ||
Pull(stopCh <-chan struct{}) (event QueuedEvent, ok bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does stopCh
really belong here? It seems like instead it might be better to be passed into the NewInMemoryQueue
function (or equivalent for other implementation), and stored in the struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I don't fully understand how to use stopCh. Until working with K8S I've been told that a public interface with a channel is a strict anti-pattern in Golang. Most of the code I see in bind/controller.go uses the stopCh similarly to how a context.Context is used (which would dictate that it is not used in a constructor). Otoh, wait.Until with an infinite loop callback means there can never be a graceful shutdown of the Bind controller. I'll need to ask Ville for help understanding the right usage when he's back in the office.
pkg/delivery/receiver.go
Outdated
|
||
// TODO(vaikas): Remove this once Bind's Action has been migrated | ||
// to be generic. | ||
const alwaysUseProcessor = "eventing.elafros.dev/EventLogger" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this constant make it sound like it is a Boolean. Maybe rename it to something like onlySupportedProcessorForNow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/delivery/receiver.go
Outdated
bindsLister listers.BindLister | ||
} | ||
|
||
// NewReceiver creates a new Reciever object to enqueue events. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spelling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
// SendEvent enqueues an event data and Context for delivery to a particular action. | ||
func (r *Receiver) SendEvent(action queue.ActionType, data interface{}, context *event.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason this cannot be the exact same signature as Action.SendEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still very much TBD in my mind which half of the queue (if not both) will look up and determine the action. This is totally agnostic of whether the webhooks are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the rest of my review.
w.WriteHeader(http.StatusNotFound) | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe for clarity, it seems like it might be worth doing
namespace, flowName := matches[1], matches[2]
and using those below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/delivery/receiver.go
Outdated
) | ||
|
||
// If an event source knows the exact flow it is targeting it can bypass the work involved with | ||
// processing event triggers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not understand how this comment relates to the regexp below. Where is this bypassing of work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expensive version is not yet implemented. I can change the comment for now.
pkg/delivery/receiver.go
Outdated
bind, err := r.bindsLister.Binds(string(matches[1])).Get(string(matches[2])) | ||
if err != nil { | ||
if errors.IsNotFound(err) { | ||
fmt.Printf("Could not find Bind %s in namespace %s\n", matches[2], matches[1]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this left over from debugging? Suggest removing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/delivery/receiver.go
Outdated
return | ||
} | ||
|
||
w.WriteHeader(http.StatusOK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about putting something in the response body to make debugging easier. For example something like
w.Write([]byte("event sent"))
And if you do this you don't need the w.WriteHeader(http.StatusOK)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove the StatusOK. I don't want to start sending willy nilly response bodies, especially prose bodies.
if err := r.SendEvent(actionFromBind(bind), data, context); err != nil { | ||
glog.Error("Failed to enqueue event", err) | ||
w.WriteHeader(http.StatusInternalServerError) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why you don't want to expose the err caused by internal error, but maybe print something in the response body to make debugging easier. For example
w.Write([]byte(fmt.Sprintf("internal error attempting to send event to %s", bind.Spec.Action.RouteName)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be easier in the next PR when we remove the shim actionFromBind
go func() { | ||
sender.Run(20, stopCh) | ||
done = true | ||
wait.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, all that has happened is that 20 goroutines have been spawned inside sender.Run, but they have not finished yet. Is that what you intended, or did you intend to wait until all the RunOnce
s are finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed so that sender.Run blocks like the equivalent code in controller/bind
done = true | ||
wait.Done() | ||
}() | ||
if done { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a race here? Is it not possible (if unlikely) the go-routine above could execute done = true
before we get here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if sender.Run can exit before closing the stopCh (per the above mentioned fix)
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||
"//vendor/k8s.io/client-go/kubernetes:go_default_library", | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add unit tests in this package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel confident that I know enough of Elafros to write a good unit test except one that literally mocks & exercises my code. The Logging action isn't really testable unless glog has hooks.
Can look at the right way to add tests after Copenhagen
pkg/delivery/queue/BUILD.bazel
Outdated
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||
"//vendor/k8s.io/client-go/kubernetes:go_default_library", | ||
], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add unit tests in this package.
cmd/delivery/main.go
Outdated
// Set up HTTP endpoints | ||
glog.Infof("Set up metrics scrape path %s", metricsScrapePath) | ||
glog.Infof("Set up debug queue path %s", debugQueuePath) | ||
glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems these logging statements would be better split up an put before their respective mux.Handle
statements. Otherwise keeping these up-to-date will be error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. These were for my own debugging and they stuck around. Will reformat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few extra godoc suggestions.
cmd/delivery/main.go
Outdated
limitations under the License. | ||
*/ | ||
|
||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding package comment explaining what this command is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is really necessary, but done.
limitations under the License. | ||
*/ | ||
|
||
package delivery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding package comment for godoc, perhaps in separately file doc.go
containing just
/* package delivery blah blah blah
...
*/
package delivery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
limitations under the License. | ||
*/ | ||
|
||
package action |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding package comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(see doc.go)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with a pass at incorporating feedback.
I think that the queue package may merit tests, especially in the near future. I'm on the fence about whether it's worth building now rather than doing more demo work since the interface is so trivial.
WRT the Action package, the logger is untestable and the Elafros implementation would only literally make sure the code I wrote is exercised, not that any of the input is meaningfully acted upon. This can probably get exercised better in an integration test once we have a better foundation to work with.
cmd/delivery/main.go
Outdated
@@ -0,0 +1,139 @@ | |||
/* | |||
Copyright 2017 The Kubernetes Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. I seemed to have copied bad prose from cmd/controller/main.go
cmd/delivery/main.go
Outdated
limitations under the License. | ||
*/ | ||
|
||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is really necessary, but done.
go informerFactory.Start(stopCh) | ||
|
||
// Start sender: | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. It should have been blocking like the bind controller. Have fixed.
cmd/delivery/main.go
Outdated
// Set up HTTP endpoints | ||
glog.Infof("Set up metrics scrape path %s", metricsScrapePath) | ||
glog.Infof("Set up debug queue path %s", debugQueuePath) | ||
glog.Infof("Hosting sendEvent API at prefix %s", sendEventPrefix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. These were for my own debugging and they stuck around. Will reformat.
cmd/delivery/main.go
Outdated
defer cancel() | ||
srv.Shutdown(ctx) | ||
|
||
glog.Flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
t.Fatal("Failed to pull queued event") | ||
} | ||
if !reflect.DeepEqual(context, event.Context) { | ||
t.Fatalf("Event context was not marshalled correctly; expected=%+v got=%+v", context, event.Context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cascading failures in a test almost never help debugging. The only good reason for t.Errorf in the past was table-driven tests, and now there's a better interface that properly encapsulates t.Fatalf.
} | ||
} | ||
|
||
// RunOnce processes a single event from the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The OK bool idiom is pretty idiomatic. LMK if the comment is sufficient.
pkg/delivery/sender.go
Outdated
} | ||
|
||
// Run runs Sender until stopCh is closed. | ||
func (s *Sender) Run(threadiness int, stopCh <-chan struct{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to mimic the surrounding codebase. I don't feel comfortable changing until/unless Ville clarifies.
go func() { | ||
sender.Run(20, stopCh) | ||
done = true | ||
wait.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed so that sender.Run blocks like the equivalent code in controller/bind
done = true | ||
wait.Done() | ||
}() | ||
if done { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if sender.Run can exit before closing the stopCh (per the above mentioned fix)
Action ActionType `json:"action"` | ||
Data interface{} `json:"data"` | ||
Context *event.Context `json:"context"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Suggest TODO saying that.
t.Fatal("Failed to pull queued event") | ||
} | ||
if !reflect.DeepEqual(context, event.Context) { | ||
t.Fatalf("Event context was not marshalled correctly; expected=%+v got=%+v", context, event.Context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I agree, but OK -- fine as is.
} | ||
} | ||
|
||
// RunOnce processes a single event from the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I naively expected the Boolean would mean "SendEvent succeeded". But actually the Boolean is also true
in two cases that look like error conditions. It might be worth explaining how this is used to implement retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still LGTM
/lgtm |
Accept v0.1 and v0.2 cloud events. Adding UTs.
Olm manifest and catalogsource
Olm manifest and catalogsource
First pass at creating the skeleton for a delivery service. Since this needs to run at data-plane scale, I've spliced it off from the bind-controller pod. Eventually this will become more complex (e.g. when the backing store is Kafka or some other durable queue). I want to experiment with the in-memory queue for a while so we can derive our requirements before we choose what backing store to endorse.
Future PRs will make the action dynamic once Bind has been refactored. May also add a firehose "sendEvent" method, which would be a good fishbowl implementation of a public event provider SDK.