Skip to content

Commit

Permalink
#405: Fixed error handling and timeouts +refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitacherevko committed Sep 19, 2018
1 parent bc9a324 commit 6499c5a
Show file tree
Hide file tree
Showing 17 changed files with 325 additions and 106 deletions.
2 changes: 2 additions & 0 deletions confd/templates/northbound/northbound.properties.tmpl
Expand Up @@ -14,6 +14,8 @@ server.contextPath=/api/v1

topology.engine.rest.endpoint={{ getv "/kilda_topology_engine_rest_endpoint" }}:{{ getv "/kilda_topology_engine_rest_port" }}
northbound.messages.expiration.minutes=15
northbound.kafka.listener.threads=10
northbound.kafka.session.timeout=30000

neo4j.hosts = {{ getv "/kilda_neo4j_host" }}:{{ getv "/kilda_neo4j_bolt_port" }}
neo4j.user = {{ getv "/kilda_neo4j_user" }}
Expand Down
Expand Up @@ -16,8 +16,11 @@
package org.openkilda.northbound.config;

import org.openkilda.northbound.utils.ExecutionTimeInterceptor;
import org.openkilda.northbound.utils.RequestCorrelationFilter;
import org.openkilda.northbound.utils.ExtraAuthInterceptor;
import org.openkilda.northbound.utils.RequestCorrelationFilter;
import org.openkilda.northbound.utils.async.CompletableFutureReturnValueHandler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -26,12 +29,17 @@
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.PostConstruct;

/**
* The Web Application configuration.
Expand All @@ -40,6 +48,25 @@
@EnableWebMvc
@PropertySource({"classpath:northbound.properties"})
public class WebConfig extends WebMvcConfigurerAdapter {

@Autowired
private RequestMappingHandlerAdapter requestMappingHandlerAdapter;

/**
* Adds instance of {@link CompletableFutureReturnValueHandler} to the list of value handlers and put it on the
* first place (thus we override default handler for completable future
* {@link org.springframework.web.servlet.mvc.method.annotation.CompletionStageReturnValueHandler}).
*/
@PostConstruct
public void init() {
CompletableFutureReturnValueHandler futureHandler = new CompletableFutureReturnValueHandler();
List<HandlerMethodReturnValueHandler> defaultHandlers =
new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers());
defaultHandlers.add(0, futureHandler);

requestMappingHandlerAdapter.setReturnValueHandlers(defaultHandlers);
}

/**
* {@inheritDoc}
*/
Expand All @@ -49,6 +76,19 @@ public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(extraAuthInterceptor());
}

/**
* Swagger UI resources.
*
* @param registry resource registry
*/
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("swagger-ui.html")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}

/**
* Request processing time counting interceptor.
*
Expand Down Expand Up @@ -77,21 +117,9 @@ public RestTemplate restTemplate(RestTemplateBuilder builder) {
return restTemplate;
}

/**
* Swagger UI resources.
*
* @param registry resource registry
*/
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("swagger-ui.html")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}

@Bean
public OncePerRequestFilter requestCorrelationIdFilter() {
return new RequestCorrelationFilter();
}

}
Expand Up @@ -130,7 +130,7 @@ public SwitchFlowEntries getSwitchRules(
*/
@ApiOperation(value = "Delete switch rules. Requires special authorization",
response = Long.class, responseContainer = "List")
@ApiResponse(code = 200, response = Long.class, responseContainer = "List" , message = "Operation is successful")
@ApiResponse(code = 200, response = Long.class, responseContainer = "List", message = "Operation is successful")
@DeleteMapping(value = "/switches/{switch-id}/rules",
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ExtraAuthRequired
Expand Down Expand Up @@ -184,7 +184,7 @@ public ResponseEntity<List<Long>> deleteSwitchRules(
*/
@ApiOperation(value = "Install switch rules. Requires special authorization",
response = Long.class, responseContainer = "List")
@ApiResponse(code = 200, response = Long.class, responseContainer = "List" , message = "Operation is successful")
@ApiResponse(code = 200, response = Long.class, responseContainer = "List", message = "Operation is successful")
@PutMapping(value = "/switches/{switch-id}/rules",
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ExtraAuthRequired
Expand Down
Expand Up @@ -15,6 +15,10 @@

package org.openkilda.northbound.messaging;

/**
* Allows to read messages from messaging broker.
* @deprecated replaced by {@link org.openkilda.northbound.messaging.kafka.KafkaMessagingChannel}
* */
@Deprecated
public interface MessageConsumer<T> {
/**
Expand All @@ -36,12 +40,7 @@ public interface MessageConsumer<T> {
T poll(final String correlationId);

/**
* Clears message queue.
*/
void clear();

/**
* Processes the response of the request.
* Processes the response of the request. It is used for backward compatibility with old versions of consumers.
*/
void onResponse(T message);
}
Expand Up @@ -17,6 +17,9 @@

import org.openkilda.messaging.Message;

import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;

public interface MessageProducer {
/**
* Kafka message send timeout.
Expand All @@ -30,4 +33,17 @@ public interface MessageProducer {
* @param message message to serialize and send
*/
void send(final String topic, final Message message);

/**
* Sends message to specified topic and applies callback for processing the result of sending. By default it just
* calls {@link MessageProducer#send(String, Message)}, if there is a need to add a custom callback you can
* implement this method.
*
* @param topic topic name.
* @param message message to be sent.
* @param callback to be executed once message is sent.
*/
default void send(String topic, Message message, ListenableFutureCallback<SendResult<String, Message>> callback) {
send(topic, message);
}
}
Expand Up @@ -22,17 +22,18 @@
import java.util.concurrent.CompletableFuture;

/**
* The main component for sending messages to internal kilda components.
* All sent operations will be performed asynchronous and wrapped into {@link CompletableFuture}.
* The main purpose of this class is to have one entrypoint for sending messages
* and receiving them back in one place and doing it in non-blocking way.
* The main component for sending messages to internal kilda components. All sent operations will be performed
* asynchronous and wrapped into {@link CompletableFuture}. The main purpose of this class is to have one entrypoint
* for sending messages and receiving them back in one place and doing it in non-blocking way.
*/
public interface MessagingChannel {

/**
* Sends the message to the specified topic and provides a response wrapped in {@link CompletableFuture}.
* Note: this type of request expects to receive back the response,
* if you don't need any responses please use {@link MessagingChannel#send(String, Message)}
* <p/>
* Note: this type of request expects to receive back the response, if you don't need any responses please use
* {@link MessagingChannel#send(String, Message)}
*
* @param topic topic where the message should be sent.
* @param message data to be sent.
* @return response for the request.
Expand All @@ -41,6 +42,7 @@ public interface MessagingChannel {

/**
* Sends the message to the specified topic and collects all chunked responses for this request into the list.
*
* @param topic topic where the message should be sent.
* @param message data to be sent.
* @return response for the request.
Expand All @@ -49,6 +51,7 @@ public interface MessagingChannel {

/**
* Sends the message to the specified topic without waiting for a response.
*
* @param topic topic where the message should be sent to.
* @param message the data to be sent.
*/
Expand Down
Expand Up @@ -72,13 +72,6 @@ public Map<String, String> poll(final String correlationId) {
HealthCheckInfoData::getId, HealthCheckInfoData::getState));
}

/**
* {@inheritDoc}
*/
@Override
public void clear() {
}

@Override
public void onResponse(Map<String, String> message) {
InfoMessage info = (InfoMessage) message;
Expand Down
Expand Up @@ -94,10 +94,6 @@ public Message poll(final String correlationId) {
OPERATION_TIMED_OUT, TIMEOUT_ERROR_MESSAGE, northboundTopic);
}

@Override
public void clear() {
}

@Override
public void onResponse(Message message) {
messages.put(message.getCorrelationId(), message);
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void onMessage(Message message) {
logger.debug("Message received: {} - {}", Thread.currentThread().getId(), message);
messagingChannel.onResponse(message);

// todo: should be removed once KafkaConsumer will be deleted. it is for the support of outdated services.
// FIXME: should be removed once KafkaConsumer will be deleted. it is for the support of outdated services.
messageConsumer.onResponse(message);
}
}
Expand Down
Expand Up @@ -61,4 +61,10 @@ public void onFailure(Throwable exception) {
}
});
}

@Override
public void send(String topic, Message message, ListenableFutureCallback<SendResult<String, Message>> callback) {
kafkaTemplate.send(topic, message)
.addCallback(callback);
}
}

0 comments on commit 6499c5a

Please sign in to comment.