-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Manage transport lifecycle from dispatcher #529
Conversation
@kriskowal, thanks for your PR! By analyzing the history of the files in this pull request, we identified @abhinav, @breerly and @AlexAPB to be potential reviewers. |
@@ -98,11 +98,32 @@ func NewDispatcher(cfg Config) Dispatcher { | |||
panic("a service name is required") | |||
} | |||
|
|||
// Collect all unique transports from inbounds and outbounds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's factor this block (101-120) into a seperate function that get's called from here.
Perhaps func collectUniqueTransports(inbounds, outbounds) []transport.Transport
.
transports := make(map[transport.Transport]struct{}) | ||
for _, inbound := range cfg.Inbounds { | ||
for _, transport := range inbound.Transports() { | ||
transports[transport] = struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good 'ol struct{}{}
.
outbounds Outbounds | ||
inbounds Inbounds | ||
outbounds Outbounds | ||
transports map[transport.Transport]struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this should just be []transport.Transport
at this point - the map is useful for dedupping, but let's just call keys
on it before passing it around.
mu sync.Mutex | ||
startedInbounds []transport.Inbound | ||
startedOutbounds []transport.Outbound | ||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest a more specific name for this mu
, perhaps startedtransportsMutex
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This covers all changes to started* in scope.
// may use multiple transport protocols, particularly for shadowing traffic | ||
// across multiple transport protocols during a transport protocol | ||
// migration. | ||
Transports() []Transport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't composite outbounds compose the transport, such that it would still be represented as a single transport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is for the case like this:
FlaggedOutbound(
flag,
grpc.Outbound(...),
tchannel.Outbound(...),
)
The remotely configured outbound would need to return the grpc and tchannel transports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@breerly composite transports would defeat the deduplicator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah, thanks for the clarification.
@@ -67,6 +67,11 @@ func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound { | |||
return o | |||
} | |||
|
|||
// Transports returns the underlying TChannel Transport for this outbound. | |||
func (o *Outbound) Transports() []transport.Transport { | |||
return []transport.Transport{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why instantiate here? Shouldn't the transport pass an instance of itself into the outbound? Wouldnt this just return that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an empty array, until I refactor TChannel to use a Transport.
@@ -79,6 +79,11 @@ func (i *Inbound) SetRegistry(registry transport.Registry) { | |||
i.registry = registry | |||
} | |||
|
|||
// Transports returns the inbound's HTTP transport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we put a TODO here to route the http.Transport through here?
// may use multiple transport protocols, particularly for shadowing traffic | ||
// across multiple transport protocols during a transport protocol | ||
// migration. | ||
Transports() []Transport |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is for the case like this:
FlaggedOutbound(
flag,
grpc.Outbound(...),
tchannel.Outbound(...),
)
The remotely configured outbound would need to return the grpc and tchannel transports
@@ -20,7 +20,9 @@ | |||
|
|||
package transport | |||
|
|||
import "context" | |||
import ( | |||
"context" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make lint
locally?
@@ -84,6 +84,11 @@ func (i *Inbound) Channel() Channel { | |||
return i.ch | |||
} | |||
|
|||
// Transports returns the underlying Transport for this Inbound. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo
@@ -67,6 +67,11 @@ func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound { | |||
return o | |||
} | |||
|
|||
// Transports returns the underlying TChannel Transport for this outbound. | |||
func (o *Outbound) Transports() []transport.Transport { | |||
return []transport.Transport{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requesting changes for collectUniqueTransports function and adding todos to functions that return no-op transports.
@@ -20,7 +20,9 @@ | |||
|
|||
package transport | |||
|
|||
import "context" | |||
import ( | |||
"context" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make lint
locally?
I’ve factored out the transport collector and added notes for further factoring of inbounds and outbounds to expose their underlying transports. I’ve also added some finesse to dispatcher.Start(), so that it blocks on inbound/outbound Start before starting transport. This avoids a race between registering handlers in inbound.Start() and listening in transport.Start(). The race does not exist until subsequent work refactors TChannel inbound.Start() partially into transport.Start(). |
a17af61
to
f3bacc5
Compare
f3bacc5
to
8c5f949
Compare
This change creates the transport.Transport interface, which carries a lifecycle, and adds a Transports() method to all inbounds and outbounds, so that they may render up all zero or more transports that they require. The Dispatcher then collects and deduplicates these transports and manages their lifecycle.
Transports() returns a slice, which is convenient for migration now since existing inbounds and outbounds have none, but also since composite outbounds might be backed by multiple transports.