New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#405: Refactored NB services to work with async requests #1421
Conversation
4413980
to
6537470
Compare
663f6b6
to
dfa2a89
Compare
edbb813
to
6537470
Compare
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
@Override | ||
public List<FlowPayload> deleteFlows() { | ||
public CompletableFuture<List<FlowPayload>> deleteFlows() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this method into deleteAllFlows
... because names deleteFlow
and deleteFlows
too similar.
PS We should keep together deleteFlow
and delete(All)Flows
methods.
for (int i = 0; i < flows.size(); i++) { | ||
String requestId = correlationId + "-" + i; | ||
FlowPayload flow = flows.get(i); | ||
deleteResult.add(sendDeleteFlow(flow.getId(), requestId).join()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not parallel execute.
...ices/src/northbound/src/main/java/org/openkilda/northbound/service/impl/FlowServiceImpl.java
Show resolved
Hide resolved
@@ -385,83 +346,70 @@ private BatchResults flowPushUnpush(List<FlowInfoData> externalFlows, FlowOperat | |||
InfoMessage flowRequest = | |||
new InfoMessage(data, System.currentTimeMillis(), flowCorrelation, Destination.WFM); | |||
flowRequests.add(flowRequest); | |||
messageProducer.send(topic, flowRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use CompletableFuture.allOf
method, you can keep "current" worflow of this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten it using the way you suggested
} else { | ||
msgs.add("FAILURE (TE): Flow " + status.getId() | ||
|
||
FlowStatusResponse teResponse = (FlowStatusResponse) flowPair.getRight().join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are ignoring request into TE (.getRight())... Usage Pair<> object here looks weird.
} | ||
final Map<SwitchId, List<SimpleSwitchRule>> switchRules = new HashMap<>(); | ||
|
||
return CompletableFuture.supplyAsync(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that CompletableFuture.allOf
is more suitable here.
6537470
to
f0d9b45
Compare
f0d9b45
to
415f00f
Compare
@@ -55,9 +48,6 @@ public void onMessage(Message message) { | |||
try (MDCCloseable closable = MDC.putCloseable(CORRELATION_ID, message.getCorrelationId())) { | |||
logger.debug("Message received: {} - {}", Thread.currentThread().getId(), message); | |||
messagingChannel.onResponse(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why for we need separate "listenr"? Why we can't attach kafka-consumer directly to messagingChannel.onResponse (or some other method of KafkaMessagingChannel)?
…ync-service # Conflicts: # services/src/northbound/src/main/java/org/openkilda/northbound/config/MessageConsumerConfig.java
No description provided.