Skip to content

Commit

Permalink
#405: Refactored FlowService/FlowController in order to use Completab…
Browse files Browse the repository at this point in the history
…leFuture
  • Loading branch information
nikitacherevko committed Sep 24, 2018
1 parent 26382ad commit 137e5b3
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 411 deletions.
Expand Up @@ -18,7 +18,9 @@
import static org.openkilda.messaging.Utils.EXTRA_AUTH;

import org.openkilda.messaging.command.flow.SynchronizeCacheAction;
import org.openkilda.messaging.error.ErrorType;
import org.openkilda.messaging.error.MessageError;
import org.openkilda.messaging.error.MessageException;
import org.openkilda.messaging.info.flow.FlowInfoData;
import org.openkilda.messaging.payload.flow.FlowCacheSyncResults;
import org.openkilda.messaging.payload.flow.FlowIdStatusPayload;
Expand All @@ -31,6 +33,7 @@
import org.openkilda.northbound.dto.flows.PingOutput;
import org.openkilda.northbound.service.FlowService;
import org.openkilda.northbound.utils.ExtraAuthRequired;
import org.openkilda.northbound.utils.RequestCorrelationId;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand All @@ -41,30 +44,30 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.PropertySource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import java.nio.file.InvalidPathException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


/**
* REST Controller for flow requests.
*/
@RestController
@RequestMapping(value = "/flows", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@PropertySource("classpath:northbound.properties")
@Api
@ApiResponses(value = {
Expand Down Expand Up @@ -94,14 +97,10 @@ public class FlowController {
* @return flow
*/
@ApiOperation(value = "Creates new flow", response = FlowPayload.class)
@RequestMapping(
value = "/flows",
method = RequestMethod.PUT,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<FlowPayload> createFlow(@RequestBody FlowPayload flow) {
FlowPayload response = flowService.createFlow(flow);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@PutMapping
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowPayload> createFlow(@RequestBody FlowPayload flow) {
return flowService.createFlow(flow);
}

/**
Expand All @@ -111,13 +110,10 @@ public ResponseEntity<FlowPayload> createFlow(@RequestBody FlowPayload flow) {
* @return flow
*/
@ApiOperation(value = "Gets flow", response = FlowPayload.class)
@RequestMapping(
value = "/flows/{flow-id:.+}",
method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public ResponseEntity<FlowPayload> getFlow(@PathVariable(name = "flow-id") String flowId) {
FlowPayload response = flowService.getFlow(flowId);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@GetMapping(value = "/{flow-id:.+}")
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowPayload> getFlow(@PathVariable(name = "flow-id") String flowId) {
return flowService.getFlow(flowId);
}

/**
Expand All @@ -127,13 +123,10 @@ public ResponseEntity<FlowPayload> getFlow(@PathVariable(name = "flow-id") Strin
* @return flow
*/
@ApiOperation(value = "Deletes flow", response = FlowPayload.class)
@RequestMapping(
value = "/flows/{flow-id:.+}",
method = RequestMethod.DELETE,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public ResponseEntity<FlowPayload> deleteFlow(@PathVariable(name = "flow-id") String flowId) {
FlowPayload response = flowService.deleteFlow(flowId);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@DeleteMapping(value = "/{flow-id:.+}")
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowPayload> deleteFlow(@PathVariable(name = "flow-id") String flowId) {
return flowService.deleteFlow(flowId);
}

/**
Expand All @@ -144,16 +137,11 @@ public ResponseEntity<FlowPayload> deleteFlow(@PathVariable(name = "flow-id") St
* @return flow
*/
@ApiOperation(value = "Updates flow", response = FlowPayload.class)
@RequestMapping(
value = "/flows/{flow-id:.+}",
method = RequestMethod.PUT,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<FlowPayload> updateFlow(
@PathVariable(name = "flow-id") String flowId,
@RequestBody FlowPayload flow) {
FlowPayload response = flowService.updateFlow(flow);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@PutMapping(value = "/{flow-id:.+}")
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowPayload> updateFlow(@PathVariable(name = "flow-id") String flowId,
@RequestBody FlowPayload flow) {
return flowService.updateFlow(flow);
}

/**
Expand All @@ -162,41 +150,36 @@ public ResponseEntity<FlowPayload> updateFlow(
* @return list of flow
*/
@ApiOperation(value = "Dumps all flows", response = FlowPayload.class, responseContainer = "List")
@RequestMapping(
value = "/flows",
method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public ResponseEntity<List<FlowPayload>> getFlows() {
List<FlowPayload> response = flowService.getFlows();
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@GetMapping
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<List<FlowPayload>> getFlows() {
return flowService.getFlows();
}


/**
* Delete all flows.
*
* @return list of flows that have been deleted
*/
@ApiOperation(value = "Delete all flows. Requires special authorization", response = FlowPayload.class,
responseContainer = "List")
@RequestMapping(
value = "/flows",
method = RequestMethod.DELETE,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@DeleteMapping
@ResponseStatus(HttpStatus.OK)
@ExtraAuthRequired
@SuppressWarnings("unchecked") // the error is unchecked
public ResponseEntity<List<FlowPayload>> deleteFlows(
public CompletableFuture<List<FlowPayload>> deleteFlows(
@RequestHeader(value = EXTRA_AUTH, defaultValue = "0") long extraAuth) {
long currentAuth = System.currentTimeMillis();
if (Math.abs(currentAuth - extraAuth) > 120 * 1000) {
/*
* The request needs to be within 120 seconds of the system clock.
*/
return new ResponseEntity("Invalid Auth: " + currentAuth, new HttpHeaders(), HttpStatus.UNAUTHORIZED);
throw new MessageException(RequestCorrelationId.getId(), System.currentTimeMillis(), ErrorType.AUTH_FAILED,
"Invalid Auth: " + currentAuth, "Valid e"
+ "xtra auth required");
}

List<FlowPayload> response = flowService.deleteFlows();
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
return flowService.deleteFlows();
}


Expand All @@ -208,13 +191,10 @@ public ResponseEntity<List<FlowPayload>> deleteFlows(
* @return list of flow
*/
@ApiOperation(value = "Gets flow status", response = FlowIdStatusPayload.class)
@RequestMapping(
value = "/flows/status/{flow-id:.+}",
method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public ResponseEntity<FlowIdStatusPayload> statusFlow(@PathVariable(name = "flow-id") String flowId) {
FlowIdStatusPayload response = flowService.statusFlow(flowId);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@GetMapping(value = "/status/{flow-id:.+}")
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowIdStatusPayload> statusFlow(@PathVariable(name = "flow-id") String flowId) {
return flowService.statusFlow(flowId);
}

/**
Expand All @@ -224,12 +204,10 @@ public ResponseEntity<FlowIdStatusPayload> statusFlow(@PathVariable(name = "flow
* @return list of flow
*/
@ApiOperation(value = "Gets flow path", response = FlowPathPayload.class)
@RequestMapping(
value = "/flows/{flow-id}/path", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public ResponseEntity<FlowPathPayload> pathFlow(@PathVariable(name = "flow-id") String flowId) {
FlowPathPayload response = flowService.pathFlow(flowId);
return new ResponseEntity<>(response, new HttpHeaders(), HttpStatus.OK);
@GetMapping(value = "/{flow-id}/path")
@ResponseStatus(HttpStatus.OK)
public CompletableFuture<FlowPathPayload> pathFlow(@PathVariable(name = "flow-id") String flowId) {
return flowService.pathFlow(flowId);
}


Expand All @@ -243,11 +221,9 @@ public ResponseEntity<FlowPathPayload> pathFlow(@PathVariable(name = "flow-id")
*/
@ApiOperation(value = "Push flows without expectation of modifying switches. It can push to switch and validate.",
response = BatchResults.class)
@RequestMapping(path = "/push/flows",
method = RequestMethod.PUT,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@PutMapping(path = "/push")
@ResponseStatus(HttpStatus.OK)
public BatchResults pushFlows(
public CompletableFuture<BatchResults> pushFlows(
@RequestBody List<FlowInfoData> externalFlows,
@ApiParam(value = "default: false. If true, this will propagate rules to the switches.",
required = false)
Expand All @@ -270,11 +246,9 @@ public BatchResults pushFlows(
*/
@ApiOperation(value = "Unpush flows without expectation of modifying switches. It can push to switch and validate.",
response = BatchResults.class)
@RequestMapping(path = "/unpush/flows",
method = RequestMethod.PUT,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@PutMapping(path = "/unpush")
@ResponseStatus(HttpStatus.OK)
public BatchResults unpushFlows(
public CompletableFuture<BatchResults> unpushFlows(
@RequestBody List<FlowInfoData> externalFlows,
@ApiParam(value = "default: false. If true, this will propagate rules to the switches.",
required = false)
Expand All @@ -295,10 +269,9 @@ public BatchResults unpushFlows(
* @return flow payload with updated path.
*/
@ApiOperation(value = "Reroute flow", response = FlowReroutePayload.class)
@RequestMapping(path = "/flows/{flow_id}/reroute",
method = RequestMethod.PATCH)
@PatchMapping(path = "/{flow_id}/reroute")
@ResponseStatus(HttpStatus.OK)
public FlowReroutePayload rerouteFlow(@PathVariable("flow_id") String flowId) {
public CompletableFuture<FlowReroutePayload> rerouteFlow(@PathVariable("flow_id") String flowId) {
return flowService.rerouteFlow(flowId);
}

Expand All @@ -311,10 +284,9 @@ public FlowReroutePayload rerouteFlow(@PathVariable("flow_id") String flowId) {
@ApiOperation(value = "Sync flow", response = FlowReroutePayload.class)
@ApiResponses(value = {
@ApiResponse(code = 200, response = FlowReroutePayload.class, message = "Operation is successful")})
@RequestMapping(path = "/flows/{flow_id}/sync",
method = RequestMethod.PATCH)
@PatchMapping(path = "/{flow_id}/sync")
@ResponseStatus(HttpStatus.OK)
public FlowReroutePayload syncFlow(@PathVariable("flow_id") String flowId) {
public CompletableFuture<FlowReroutePayload> syncFlow(@PathVariable("flow_id") String flowId) {
return flowService.syncFlow(flowId);
}

Expand All @@ -326,38 +298,21 @@ public FlowReroutePayload syncFlow(@PathVariable("flow_id") String flowId) {
*/
@ApiOperation(value = "Validate flow, comparing the DB to each switch", response = FlowValidationDto.class,
responseContainer = "List")
@RequestMapping(path = "/flows/{flow_id}/validate",
method = RequestMethod.GET)
@GetMapping(path = "/{flow_id}/validate")
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<List<FlowValidationDto>> validateFlow(@PathVariable("flow_id") String flowId) {

public CompletableFuture<List<FlowValidationDto>> validateFlow(@PathVariable("flow_id") String flowId) {
logger.debug("Received Flow Validation request with flow {}", flowId);
ResponseEntity<List<FlowValidationDto>> response;

try {
List<FlowValidationDto> result = flowService.validateFlow(flowId);
if (result == null) {
logger.info("VALIDATE FLOW: Flow Not Found: {}", flowId);
response = ResponseEntity.notFound().build();
} else {
response = ResponseEntity.ok(result);
}
} catch (InvalidPathException e) {
logger.error("VALIDATE FLOW: Flow has no path: {}", flowId);
logger.error(e.getMessage());
response = ResponseEntity.notFound().build();
}
return response;
return flowService.validateFlow(flowId);
}

/**
* Verify flow integrity by sending "ping" package over flow path.
*/
@ApiOperation(
value = "Verify flow - using special network packet that is being routed in the same way as client traffic")
@RequestMapping(path = "/flows/{flow_id}/ping", method = RequestMethod.PUT)
@PutMapping(path = "/{flow_id}/ping")
@ResponseStatus(HttpStatus.OK)
public PingOutput pingFlow(
public CompletableFuture<PingOutput> pingFlow(
@RequestBody PingInput payload,
@PathVariable("flow_id") String flowId) {
return flowService.pingFlow(flowId, payload);
Expand All @@ -369,10 +324,9 @@ public PingOutput pingFlow(
* @return a detailed response of the sync operation (added, deleted, modified, unchanged flows)
*/
@ApiOperation(value = "Sync Flow Cache(s)", response = FlowCacheSyncResults.class)
@RequestMapping(path = "/flows/cachesync",
method = RequestMethod.GET)
@GetMapping(path = "/cachesync")
@ResponseStatus(HttpStatus.OK)
public FlowCacheSyncResults syncFlowCache() {
public CompletableFuture<FlowCacheSyncResults> syncFlowCache() {
return flowService.syncFlowCache(SynchronizeCacheAction.NONE);
}

Expand All @@ -382,9 +336,9 @@ public FlowCacheSyncResults syncFlowCache() {
* @return a response of the invalidate operation
*/
@ApiOperation(value = "Invalidate (purge) Flow Cache(s)", response = FlowCacheSyncResults.class)
@DeleteMapping(path = "/flows/cache")
@DeleteMapping(path = "/cache")
@ResponseStatus(HttpStatus.OK)
public FlowCacheSyncResults invalidateFlowCache() {
public CompletableFuture<FlowCacheSyncResults> invalidateFlowCache() {
return flowService.syncFlowCache(SynchronizeCacheAction.INVALIDATE_CACHE);
}

Expand All @@ -394,9 +348,9 @@ public FlowCacheSyncResults invalidateFlowCache() {
* @return a detailed response of the refresh operation (added, deleted, modified, unchanged flows)
*/
@ApiOperation(value = "Refresh Flow Cache(s)", response = FlowCacheSyncResults.class)
@PatchMapping(path = "/flows/cache")
@PatchMapping(path = "/cache")
@ResponseStatus(HttpStatus.OK)
public FlowCacheSyncResults refreshFlowCache() {
public CompletableFuture<FlowCacheSyncResults> refreshFlowCache() {
return flowService.syncFlowCache(SynchronizeCacheAction.SYNCHRONIZE_CACHE);
}

Expand Down

0 comments on commit 137e5b3

Please sign in to comment.