Skip to content

Commit

Permalink
Force WorkerBolt to be more strict with stored data
Browse files Browse the repository at this point in the history
Lock all operations with `pendingTasks` map into the `WorkerBolt` i.e.
do not allow access there to the inheritors. If inheritors have access
to this map there is no way to implement LCM for objects stored there.

The original problem was lack of cleanup on timeout action.

Also make correct routing/dispatching of `onTimeout` event, till now
this dispach have happended inside `dispatch` method of
`org.openkilda.wfm.share.hubandspoke.CoordinatedBolt`(happens before
`handleInput`) as result all checks and cleanup steps implemented on
`handleInput` level are skipped.
  • Loading branch information
surabujin committed Jun 27, 2019
1 parent 3be462c commit 4dbaafa
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 95 deletions.
Expand Up @@ -16,6 +16,7 @@
package org.openkilda.wfm.share.hubandspoke;

import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt.CoordinatorCommand;
import org.openkilda.wfm.topology.utils.MessageTranslator;

Expand All @@ -27,7 +28,7 @@
/**
* This class provides callbacks and timeout handlers for asynchronous operations.
*/
abstract class CoordinatedBolt extends AbstractBolt implements TimeoutCallback {
abstract class CoordinatedBolt extends AbstractBolt {
static final String COMMAND_FIELD = "command";
static final String TIMEOUT_FIELD = "timeout_ms";

Expand All @@ -42,7 +43,7 @@ abstract class CoordinatedBolt extends AbstractBolt implements TimeoutCallback {
@Override
protected void dispatch(Tuple input) throws Exception {
if (CoordinatorBolt.ID.equals(input.getSourceComponent())) {
String key = input.getStringByField(MessageTranslator.KEY_FIELD);
String key = input.getStringByField(MessageTranslator.FIELD_ID_KEY);
onTimeout(key, input);
} else {
super.dispatch(input);
Expand All @@ -58,16 +59,23 @@ protected void ack(Tuple input) {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(CoordinatorBolt.INCOME_STREAM, new Fields(MessageTranslator.KEY_FIELD,
declarer.declareStream(CoordinatorBolt.INCOME_STREAM, new Fields(MessageTranslator.FIELD_ID_KEY,
COMMAND_FIELD, TIMEOUT_FIELD, AbstractBolt.FIELD_ID_CONTEXT));
}

/**
* Handler for timeout for pending request and define the way how such case will be processed.
* @param key request id.
* @param tuple anchor tuple.
*/
protected abstract void onTimeout(String key, Tuple tuple) throws PipelineException;

/**
* Should be called once operation is finished and callback/timer should be cancelled.
* @param key request's identifier.
*/
protected void cancelCallback(String key, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void cancelCallback(String key) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.CANCEL_CALLBACK, 0));
}

Expand All @@ -76,17 +84,17 @@ protected void cancelCallback(String key, Tuple tuple) {
* used.
* @param key operation identifier.
*/
protected void registerCallback(String key, Tuple tuple) {
registerCallback(key, defaultTimeout, tuple);
protected void registerCallback(String key) {
registerCallback(key, defaultTimeout);
}

/**
* Add callback with specified timeout value.
* @param key operation identifier.
* @param timeout how long coordinator waits for a response. If no response received - timeout error occurs.
*/
protected void registerCallback(String key, int timeout, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void registerCallback(String key, int timeout) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.REQUEST_CALLBACK, timeout));
}
}
Expand Up @@ -59,7 +59,7 @@ public HubBolt(Config config) {
@Override
protected void handleInput(Tuple input) throws Exception {
if (hubConfig.getRequestSenderComponent().equals(input.getSourceComponent())) {
registerCallback(input.getStringByField(MessageTranslator.KEY_FIELD), input);
registerCallback(input.getStringByField(MessageTranslator.FIELD_ID_KEY));
onRequest(input);
} else if (hubConfig.getWorkerComponent().equals(input.getSourceComponent())) {
onWorkerResponse(input);
Expand Down

This file was deleted.

Expand Up @@ -18,6 +18,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.topology.utils.MessageTranslator;

import lombok.Builder;
Expand Down Expand Up @@ -49,7 +50,7 @@ public abstract class WorkerBolt extends CoordinatedBolt {

private final Config workerConfig;

protected Map<String, Tuple> pendingTasks = new HashMap<>();
private transient Map<String, Tuple> pendingTasks;

public WorkerBolt(Config config) {
super(config.isAutoAck(), config.getDefaultTimeout());
Expand All @@ -61,48 +62,97 @@ public WorkerBolt(Config config) {
}

@Override
protected void handleInput(Tuple input) throws Exception {
String key = input.getStringByField(MessageTranslator.KEY_FIELD);
protected void dispatch(Tuple input) throws Exception {
String sourceComponent = input.getSourceComponent();
if (workerConfig.getHubComponent().equals(sourceComponent)) {
dispatchHub(input);
} else if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
dispatchResponse(input);
} else {
super.dispatch(input);
}
}

private void dispatchHub(Tuple input) throws Exception {
String key = pullKey(input);
pendingTasks.put(key, input);
registerCallback(key);

onHubRequest(input);
}

if (sourceComponent.equals(workerConfig.getHubComponent())) {
pendingTasks.put(key, input);
registerCallback(key, input);

onHubRequest(input);
} else if (pendingTasks.containsKey(key)) {
if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
// TODO(surabujin): it whould be great to get initial request together with response i.e.
// onAsyncResponse(input, pendingTasks.get(key)onAsyncResponse(input);
onAsyncResponse(input);
} else if (sourceComponent.equals(CoordinatorBolt.ID)) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onTimeout(key, input);
}
private void dispatchResponse(Tuple input) throws Exception {
String key = pullKey();
Tuple request = pendingTasks.get(key);
if (request != null) {
onAsyncResponse(request, input);
} else {
unhandledInput(input);
log.error("Receive response for {}, but there is no pending request by this key", key);
}
}

@Override
protected void handleInput(Tuple input) throws Exception {
// all possible branching was done into `dispatch` nothing should hit here
unhandledInput(input);
}

/**
* Send response to the hub bolt. Note: the operation's key is required.
* @param input received tuple.
* @param input request or response tuple (use key to lookup original request).
* @param values response to be sent to the hub.
*/
protected void emitResponseToHub(Tuple input, Values values) {
String key = input.getStringByField(MessageTranslator.KEY_FIELD);
cancelCallback(key, input);
String key;
try {
key = pullKey(input);
} catch (PipelineException e) {
throw new IllegalStateException(String.format("Can't extract request key from %s: %s", input, e), e);
}
cancelCallback(key);

Tuple processingRequest = pendingTasks.remove(key);
if (processingRequest == null) {
throw new IllegalStateException(format("Attempt to send response for non pending task with id %s", key));
}
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), values);
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), getCurrentTuple(),
values);
}

protected abstract void onHubRequest(Tuple input) throws Exception;

protected abstract void onAsyncResponse(Tuple input) throws Exception;
protected abstract void onAsyncResponse(Tuple request, Tuple response) throws Exception;

@Override
protected final void onTimeout(String key, Tuple tuple) throws PipelineException {
Tuple request = pendingTasks.get(key);
if (request != null) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onRequestTimeout(request);

// Do not remove request from pendingTask until now, because emitResponseToHub(most likely called by
// onRequestTimeout) can query pendingTasks
pendingTasks.remove(key);
} else {
log.debug("Receive timeout notification for {}, but there is no pending request by this key", key);
}
}

protected abstract void onRequestTimeout(Tuple request) throws PipelineException;

protected String pullKey() throws PipelineException {
return this.pullKey(getCurrentTuple());
}

protected String pullKey(Tuple tuple) throws PipelineException {
return pullValue(tuple, MessageTranslator.FIELD_ID_KEY, String.class);
}

@Override
protected void init() {
super.init();
pendingTasks = new HashMap<>();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Expand Down
Expand Up @@ -119,7 +119,7 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}

@Override
Expand Down
Expand Up @@ -127,6 +127,6 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}
}
Expand Up @@ -16,8 +16,8 @@
package org.openkilda.wfm.topology.flowhs.bolts;

import static org.openkilda.wfm.topology.flowhs.FlowHsTopology.Stream.SPEAKER_WORKER_REQUEST_SENDER;
import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_KEY;
import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;
import static org.openkilda.wfm.topology.utils.MessageTranslator.KEY_FIELD;

import org.openkilda.floodlight.flow.request.FlowRequest;
import org.openkilda.floodlight.flow.response.FlowResponse;
Expand All @@ -36,8 +36,6 @@ public class SpeakerWorkerBolt extends WorkerBolt implements SpeakerCommandCarri
public static final String ID = "speaker.worker.bolt";
private transient SpeakerWorkerService service;

private Tuple currentTuple;

public SpeakerWorkerBolt(Config config) {
super(config);
}
Expand All @@ -49,29 +47,23 @@ protected void init() {

@Override
protected void onHubRequest(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(KEY_FIELD);
String key = pullKey();
FlowRequest command = (FlowRequest) input.getValueByField(FIELD_ID_PAYLOAD);

service.sendCommand(key, command);
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(KEY_FIELD);
FlowResponse message = (FlowResponse) input.getValueByField(FIELD_ID_PAYLOAD);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
String key = request.getStringByField(FIELD_ID_KEY);
FlowResponse message = (FlowResponse) request.getValueByField(FIELD_ID_PAYLOAD);

service.handleResponse(key, message);
}

@Override
public void onTimeout(String key, Tuple tuple) throws PipelineException {
this.currentTuple = tuple;

service.handleTimeout(key);
protected void onRequestTimeout(Tuple tuple) throws PipelineException {
service.handleTimeout(pullKey(tuple));
}

@Override
Expand All @@ -83,12 +75,12 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

@Override
public void sendCommand(String key, FlowRequest command) {
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), currentTuple, new Values(key, command));
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), getCurrentTuple(), new Values(key, command));
}

@Override
public void sendResponse(String key, FlowResponse response) {
Values values = new Values(key, response, getCommandContext());
emitResponseToHub(currentTuple, values);
emitResponseToHub(getCurrentTuple(), values);
}
}
Expand Up @@ -59,14 +59,12 @@ protected void onHubRequest(Tuple input) throws PipelineException {
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
handleCommand(input, SpeakerRouter.FIELD_ID_INPUT);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
handleCommand(request, SpeakerRouter.FIELD_ID_INPUT);
}

@Override
public void onTimeout(String key, Tuple tuple) {
Tuple request = pendingTasks.get(key);

protected void onRequestTimeout(Tuple request) {
try {
handleTimeout(request, BfdPortHandler.FIELD_ID_COMMAND);
} catch (PipelineException e) {
Expand Down Expand Up @@ -109,8 +107,8 @@ private void handleCommand(Tuple input, String field) throws PipelineException {
command.apply(this);
}

private void handleTimeout(Tuple input, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(input, field, SpeakerWorkerCommand.class);
private void handleTimeout(Tuple request, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(request, field, SpeakerWorkerCommand.class);
command.timeout(this);
}

Expand Down
Expand Up @@ -29,8 +29,6 @@
import org.openkilda.messaging.info.rule.SwitchFlowEntries;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt.CoordinatorCommand;
import org.openkilda.wfm.share.hubandspoke.HubBolt;
import org.openkilda.wfm.topology.switchmanager.StreamType;
import org.openkilda.wfm.topology.switchmanager.SwitchManagerCarrier;
Expand Down Expand Up @@ -150,10 +148,6 @@ public void endProcessing(String key) {
removeKeyFromRouterBolt(key);
}

private void cancelCallback(String key) {
getOutput().emit(CoordinatorBolt.INCOME_STREAM, new Values(key, CoordinatorCommand.CANCEL_CALLBACK, 0, null));
}

private void removeKeyFromRouterBolt(String key) {
CommandMessage commandMessage =
new CommandMessage(new RemoveKeyRouterBolt(key), System.currentTimeMillis(), key);
Expand Down

0 comments on commit 4dbaafa

Please sign in to comment.