New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce xDS module and SotW stream #5342
Conversation
ee1eb37
to
a4c49c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good so far. 👍
Left style comments and some questions. 😉
xds/src/main/java/com/linecorp/armeria/xds/ClusterResourceParser.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/EndpointResourceParser.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/com/linecorp/armeria/xds/ListenerResourceParser.java
Outdated
Show resolved
Hide resolved
|
||
import com.linecorp.armeria.common.util.SafeCloseable; | ||
|
||
class XdsStreamSubscriber implements SafeCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the purpose of this class is to keep the reference?
Are there any other features to be added to this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, I imagine class will be responsible for:
- Scheduling timeouts
- Caching values
- Notifying the watcher storage of updates
Lines 106 to 108 in 90a9349
Whenever a subscribed resource is updated in the remote control plane server, a notification is | |
sent to the client over the `Stream`. The `Stream` then parses the resources and passes the | |
data to the `Subscriber`. Eventually, the subscriber will notify the subscribed `Watcher`s of this resource. |
xds/src/test/java/com/linecorp/armeria/xds/SotwXdsStreamTest.java
Outdated
Show resolved
Hide resolved
interface XdsResponseHandler { | ||
|
||
<T extends Message> void handleResponse( | ||
ResourceParser<T> resourceParser, DiscoveryResponse value, SotwXdsStream sender); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is resourceParser
necessary? DiscoveryResponse
has the type in it so we can use the type to get the corresponding ResourceParser
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean there is no need to pass the resourceParser
and we can re-run XdsResourceParserUtil.fromTypeUrl
?
We could, but what would be the advantage of doing so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind, the Stream
layer will do all the heavy lifting to ensure communication with the control plane is maintained and up-to-date.
For this reason I would rather keep nonceMap
, versionsMap
in the XdsStream
layer.
noncesMap.put(resourceParser.type(), value.getNonce()); |
In order to do so, I think it makes sense that the ResourceType
(or ResourceParser
) is determined inside SotwXdsStream
. I didn't see a need to re-compute this so I just passed it via parameter. I'd rather keep this the way it is, but let me know if you feel strongly about this 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. That makes sense. 👍
<T extends Message> void handleResponse( | ||
ResourceParser<T> resourceParser, DiscoveryResponse value, SotwXdsStream sender); | ||
|
||
void handleReset(XdsStream sender); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the definition of reset? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whenever the stream thinks its SubscriberStorage
may not be synchronized with the control plane, then a reset
is done. This can be the case if a connection is closed unexpectedly, or if a stream is doing a cold start.
When a reset is performed, all of the currently watched types and resources are sent to the control plane server. Note that this is a package-private interface, and is exposed for testing purposes.
ref:
armeria/xds/src/main/java/com/linecorp/armeria/xds/DefaultResponseHandler.java
Lines 84 to 86 in 90a9349
public void handleReset(XdsStream sender) { | |
storage.allSubscribers().keySet().forEach(sender::resourcesUpdated); | |
} |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #5342 +/- ##
============================================
+ Coverage 73.93% 73.96% +0.03%
- Complexity 20098 20159 +61
============================================
Files 1730 1741 +11
Lines 74142 74353 +211
Branches 9461 9481 +20
============================================
+ Hits 54815 54998 +183
- Misses 14852 14869 +17
- Partials 4475 4486 +11 ☔ View full report in Codecov by Sentry. |
2ed3ae3
to
bfd4d90
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far. 👍
Let's get to the next one. 😉
Thread.sleep(100); | ||
assertThat(responseHandler.getResponses()).isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Global comment: Could we use Awaitility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
�Let's start xDS. Let me review in detail on #5336. 🙇♂️🙇♂️
import com.google.common.base.MoreObjects; | ||
import com.google.protobuf.Message; | ||
|
||
abstract class ResourceParser<T extends Message> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the generic type parameter provides type safety in xDS implementation.
As the input is dynamically determined by typeUrl
at runtime, the compile time constrain does not seem to work.
https://github.com/line/armeria/pull/5342/files#diff-463524dcc65fabda54bb06752b0e248879c7bc1e5b367d0e8885bfbd23aa785bR209
What do you think of removing <T extends Message>
and checking the type with XdsType
only?
Motivation:
We would like to introduce a xDS integration module.
In order to fetch resources from xDS module, I would like to introduce a
Stream
object which represents a single persistent connection to a remote control plane server. TheStream
will query aSubscriberStorage
to check which resources are subscribed and update the request accordingly.Once a response is recieved, the type will be used to fetch a
ResourceParser
which will later parse the response. Afterwards, the parameters will be sent over to aResponseHandler
which will handle the response and send either anackResponse
ornackResponse
to keep the feedback loop open. Also, aresourcesUpdated
can be sent to indicate the subscribed resources have been changed.Modifications:
SotwXdsStream
which represents a persistent connection to a control plane serverResourceParser
which will be responsible for parsing a response in a format readily sendable in a nack responseXdsStreamSubscriber
andSubscriberStorage
which represent the currently subscribed resourcesResult: