Skip to content

Commit

Permalink
Force WorkerBolt to be more strict with stored data
Browse files Browse the repository at this point in the history
Lock all operations with `pendingTasks` map into the `WorkerBolt` i.e.
do not allow access there to the inheritors. If inheritors have access
to this map there is no way to implement LCM for objects stored there.

The original problem was lack of cleanup on timeout action.

Also make correct routing/dispatching of `onTimeout` event, till now
this dispach have happended inside `dispatch` method of
`org.openkilda.wfm.share.hubandspoke.CoordinatedBolt`(happens before
`handleInput`) as result all checks and cleanup steps implemented on
`handleInput` level are skipped.
  • Loading branch information
surabujin committed Sep 11, 2019
1 parent 252dfbf commit 7941204
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 103 deletions.
Expand Up @@ -16,6 +16,7 @@
package org.openkilda.wfm.share.hubandspoke;

import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt.CoordinatorCommand;
import org.openkilda.wfm.topology.utils.MessageKafkaTranslator;

Expand All @@ -27,7 +28,7 @@
/**
* This class provides callbacks and timeout handlers for asynchronous operations.
*/
abstract class CoordinatedBolt extends AbstractBolt implements TimeoutCallback {
abstract class CoordinatedBolt extends AbstractBolt {
static final String COMMAND_FIELD = "command";
static final String TIMEOUT_FIELD = "timeout_ms";

Expand Down Expand Up @@ -63,12 +64,19 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
FIELD_ID_CONTEXT));
}

/**
* Handler for timeout for pending request and define the way how such case will be processed.
* @param key request id.
* @param tuple anchor tuple.
*/
protected abstract void onTimeout(String key, Tuple tuple) throws PipelineException;

/**
* Should be called once operation is finished and callback/timer should be cancelled.
* @param key request's identifier.
*/
protected void cancelCallback(String key, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void cancelCallback(String key) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.CANCEL_CALLBACK, 0));
}

Expand All @@ -77,17 +85,17 @@ protected void cancelCallback(String key, Tuple tuple) {
* used.
* @param key operation identifier.
*/
protected void registerCallback(String key, Tuple tuple) {
registerCallback(key, defaultTimeout, tuple);
protected void registerCallback(String key) {
registerCallback(key, defaultTimeout);
}

/**
* Add callback with specified timeout value.
* @param key operation identifier.
* @param timeout how long coordinator waits for a response. If no response received - timeout error occurs.
*/
protected void registerCallback(String key, int timeout, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void registerCallback(String key, int timeout) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.REQUEST_CALLBACK, timeout));
}
}
Expand Up @@ -60,7 +60,7 @@ public HubBolt(Config config) {
@Override
protected void handleInput(Tuple input) throws Exception {
if (hubConfig.getRequestSenderComponent().equals(input.getSourceComponent())) {
registerCallback(input.getStringByField(MessageKafkaTranslator.KEY_FIELD), input);
registerCallback(pullValue(input, MessageKafkaTranslator.FIELD_ID_KEY, String.class));
onRequest(input);
} else if (hubConfig.getWorkerComponent().equals(input.getSourceComponent())) {
onWorkerResponse(input);
Expand Down

This file was deleted.

Expand Up @@ -18,6 +18,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.topology.utils.MessageKafkaTranslator;

import lombok.Builder;
Expand Down Expand Up @@ -49,7 +50,7 @@ public abstract class WorkerBolt extends CoordinatedBolt {

private final Config workerConfig;

protected Map<String, Tuple> pendingTasks = new HashMap<>();
private transient Map<String, Tuple> pendingTasks;

public WorkerBolt(Config config) {
super(config.isAutoAck(), config.getDefaultTimeout());
Expand All @@ -61,48 +62,97 @@ public WorkerBolt(Config config) {
}

@Override
protected void handleInput(Tuple input) throws Exception {
String key = input.getStringByField(MessageKafkaTranslator.KEY_FIELD);
protected void dispatch(Tuple input) throws Exception {
String sourceComponent = input.getSourceComponent();
if (workerConfig.getHubComponent().equals(sourceComponent)) {
dispatchHub(input);
} else if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
dispatchResponse(input);
} else {
super.dispatch(input);
}
}

private void dispatchHub(Tuple input) throws Exception {
String key = pullKey(input);
pendingTasks.put(key, input);
registerCallback(key);

onHubRequest(input);
}

if (sourceComponent.equals(workerConfig.getHubComponent())) {
pendingTasks.put(key, input);
registerCallback(key, input);

onHubRequest(input);
} else if (pendingTasks.containsKey(key)) {
if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
// TODO(surabujin): it whould be great to get initial request together with response i.e.
// onAsyncResponse(input, pendingTasks.get(key)onAsyncResponse(input);
onAsyncResponse(input);
} else if (sourceComponent.equals(CoordinatorBolt.ID)) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onTimeout(key, input);
}
private void dispatchResponse(Tuple input) throws Exception {
String key = pullKey();
Tuple request = pendingTasks.get(key);
if (request != null) {
onAsyncResponse(request, input);
} else {
unhandledInput(input);
log.error("Receive response for {}, but there is no pending request by this key", key);
}
}

@Override
protected void handleInput(Tuple input) throws Exception {
// all possible branching was done into `dispatch` nothing should hit here
unhandledInput(input);
}

/**
* Send response to the hub bolt. Note: the operation's key is required.
* @param input received tuple.
* @param input request or response tuple (use key to lookup original request).
* @param values response to be sent to the hub.
*/
protected void emitResponseToHub(Tuple input, Values values) {
String key = input.getStringByField(MessageKafkaTranslator.KEY_FIELD);
cancelCallback(key, input);
String key;
try {
key = pullKey(input);
} catch (PipelineException e) {
throw new IllegalStateException(String.format("Can't extract request key from %s: %s", input, e), e);
}
cancelCallback(key);

Tuple processingRequest = pendingTasks.remove(key);
if (processingRequest == null) {
throw new IllegalStateException(format("Attempt to send response for non pending task with id %s", key));
}
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), values);
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), getCurrentTuple(),
values);
}

protected abstract void onHubRequest(Tuple input) throws Exception;

protected abstract void onAsyncResponse(Tuple input) throws Exception;
protected abstract void onAsyncResponse(Tuple request, Tuple response) throws Exception;

@Override
protected final void onTimeout(String key, Tuple tuple) throws PipelineException {
Tuple request = pendingTasks.get(key);
if (request != null) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onRequestTimeout(request);

// Do not remove request from pendingTask until now, because emitResponseToHub(most likely called by
// onRequestTimeout) can query pendingTasks
pendingTasks.remove(key);
} else {
log.debug("Receive timeout notification for {}, but there is no pending request by this key", key);
}
}

protected abstract void onRequestTimeout(Tuple request) throws PipelineException;

protected String pullKey() throws PipelineException {
return this.pullKey(getCurrentTuple());
}

protected String pullKey(Tuple tuple) throws PipelineException {
return pullValue(tuple, MessageKafkaTranslator.FIELD_ID_KEY, String.class);
}

@Override
protected void init() {
super.init();
pendingTasks = new HashMap<>();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Expand Down
Expand Up @@ -118,7 +118,7 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}

@Override
Expand Down
Expand Up @@ -127,6 +127,6 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}
}
Expand Up @@ -36,42 +36,31 @@ public class SpeakerWorkerBolt extends WorkerBolt implements SpeakerCommandCarri
public static final String ID = "speaker.worker.bolt";
private transient SpeakerWorkerService service;

private Tuple currentTuple;

public SpeakerWorkerBolt(Config config) {
super(config);
}

@Override
protected void init() {
super.init();
service = new SpeakerWorkerService(this);
}

@Override
protected void onHubRequest(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(FIELD_ID_KEY);
SpeakerFlowRequest command = (SpeakerFlowRequest) input.getValueByField(FIELD_ID_PAYLOAD);

service.sendCommand(key, command);
SpeakerFlowRequest command = pullValue(input, FIELD_ID_PAYLOAD, SpeakerFlowRequest.class);
service.sendCommand(pullKey(), command);
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(FIELD_ID_KEY);
FlowResponse message = (FlowResponse) input.getValueByField(FIELD_ID_PAYLOAD);

service.handleResponse(key, message);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
FlowResponse message = pullValue(response, FIELD_ID_PAYLOAD, FlowResponse.class);
service.handleResponse(pullKey(response), message);
}

@Override
public void onTimeout(String key, Tuple tuple) throws PipelineException {
this.currentTuple = tuple;

service.handleTimeout(key);
protected void onRequestTimeout(Tuple tuple) throws PipelineException {
service.handleTimeout(pullKey(tuple));
}

@Override
Expand All @@ -91,12 +80,12 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

@Override
public void sendCommand(String key, SpeakerFlowRequest command) {
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), currentTuple, new Values(key, command));
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), getCurrentTuple(), new Values(key, command));
}

@Override
public void sendResponse(String key, FlowResponse response) {
Values values = new Values(key, response, getCommandContext());
emitResponseToHub(currentTuple, values);
emitResponseToHub(getCurrentTuple(), values);
}
}
Expand Up @@ -59,14 +59,12 @@ protected void onHubRequest(Tuple input) throws PipelineException {
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
handleCommand(input, SpeakerRouter.FIELD_ID_INPUT);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
handleCommand(request, SpeakerRouter.FIELD_ID_INPUT);
}

@Override
public void onTimeout(String key, Tuple tuple) {
Tuple request = pendingTasks.get(key);

protected void onRequestTimeout(Tuple request) {
try {
handleTimeout(request, BfdPortHandler.FIELD_ID_COMMAND);
} catch (PipelineException e) {
Expand Down Expand Up @@ -109,8 +107,8 @@ private void handleCommand(Tuple input, String field) throws PipelineException {
command.apply(this);
}

private void handleTimeout(Tuple input, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(input, field, SpeakerWorkerCommand.class);
private void handleTimeout(Tuple request, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(request, field, SpeakerWorkerCommand.class);
command.timeout(this);
}

Expand Down
Expand Up @@ -189,7 +189,8 @@ private Values makeBfdPortTuple(BfdPortCommand command) {
}

private Values makeSwitchManagerWorkerTuple(String key, SwitchId switchId) {
return new Values(key, new SwitchManagerSynchronizeSwitchCommand(key, switchId), getCommandContext());
return new Values(
key, new SwitchManagerSynchronizeSwitchCommand(key, switchId), getCommandContext().fork("sync"));
}

// SwitchCommand processing
Expand Down
Expand Up @@ -61,14 +61,12 @@ protected void onHubRequest(Tuple input) throws PipelineException {
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
handleCommand(input, SwitchManagerRouter.FIELD_ID_COMMAND);
protected void onAsyncResponse(Tuple request, Tuple response) throws Exception {
handleCommand(response, SwitchManagerRouter.FIELD_ID_COMMAND);
}

@Override
public void onTimeout(String key, Tuple tuple) {
Tuple request = pendingTasks.get(key);

protected void onRequestTimeout(Tuple request) throws PipelineException {
try {
handleTimeout(request, SwitchHandler.FIELD_ID_COMMAND);
} catch (PipelineException e) {
Expand Down

0 comments on commit 7941204

Please sign in to comment.