Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force WorkerBolt to be more strict with stored data #2546

Merged
merged 1 commit into from Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,7 @@
package org.openkilda.wfm.share.hubandspoke;

import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.share.hubandspoke.CoordinatorBolt.CoordinatorCommand;
import org.openkilda.wfm.topology.utils.MessageKafkaTranslator;

Expand All @@ -27,7 +28,7 @@
/**
* This class provides callbacks and timeout handlers for asynchronous operations.
*/
abstract class CoordinatedBolt extends AbstractBolt implements TimeoutCallback {
abstract class CoordinatedBolt extends AbstractBolt {
static final String COMMAND_FIELD = "command";
static final String TIMEOUT_FIELD = "timeout_ms";

Expand Down Expand Up @@ -63,12 +64,19 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
FIELD_ID_CONTEXT));
}

/**
* Handler for timeout for pending request and define the way how such case will be processed.
* @param key request id.
* @param tuple anchor tuple.
*/
protected abstract void onTimeout(String key, Tuple tuple) throws PipelineException;

/**
* Should be called once operation is finished and callback/timer should be cancelled.
* @param key request's identifier.
*/
protected void cancelCallback(String key, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void cancelCallback(String key) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.CANCEL_CALLBACK, 0));
}

Expand All @@ -77,17 +85,17 @@ protected void cancelCallback(String key, Tuple tuple) {
* used.
* @param key operation identifier.
*/
protected void registerCallback(String key, Tuple tuple) {
registerCallback(key, defaultTimeout, tuple);
protected void registerCallback(String key) {
registerCallback(key, defaultTimeout);
}

/**
* Add callback with specified timeout value.
* @param key operation identifier.
* @param timeout how long coordinator waits for a response. If no response received - timeout error occurs.
*/
protected void registerCallback(String key, int timeout, Tuple tuple) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, tuple,
protected void registerCallback(String key, int timeout) {
emitWithContext(CoordinatorBolt.INCOME_STREAM, getCurrentTuple(),
new Values(key, CoordinatorCommand.REQUEST_CALLBACK, timeout));
}
}
Expand Up @@ -60,7 +60,7 @@ public HubBolt(Config config) {
@Override
protected void handleInput(Tuple input) throws Exception {
if (hubConfig.getRequestSenderComponent().equals(input.getSourceComponent())) {
registerCallback(input.getStringByField(MessageKafkaTranslator.KEY_FIELD), input);
registerCallback(pullValue(input, MessageKafkaTranslator.FIELD_ID_KEY, String.class));
onRequest(input);
} else if (hubConfig.getWorkerComponent().equals(input.getSourceComponent())) {
onWorkerResponse(input);
Expand Down

This file was deleted.

Expand Up @@ -18,6 +18,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import org.openkilda.wfm.error.PipelineException;
import org.openkilda.wfm.topology.utils.MessageKafkaTranslator;

import lombok.Builder;
Expand Down Expand Up @@ -49,7 +50,7 @@ public abstract class WorkerBolt extends CoordinatedBolt {

private final Config workerConfig;

protected Map<String, Tuple> pendingTasks = new HashMap<>();
private transient Map<String, Tuple> pendingTasks;

public WorkerBolt(Config config) {
super(config.isAutoAck(), config.getDefaultTimeout());
Expand All @@ -61,48 +62,97 @@ public WorkerBolt(Config config) {
}

@Override
protected void handleInput(Tuple input) throws Exception {
String key = input.getStringByField(MessageKafkaTranslator.KEY_FIELD);
protected void dispatch(Tuple input) throws Exception {
String sourceComponent = input.getSourceComponent();
if (workerConfig.getHubComponent().equals(sourceComponent)) {
dispatchHub(input);
} else if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
dispatchResponse(input);
} else {
super.dispatch(input);
}
}

private void dispatchHub(Tuple input) throws Exception {
String key = pullKey(input);
pendingTasks.put(key, input);
registerCallback(key);

onHubRequest(input);
}

if (sourceComponent.equals(workerConfig.getHubComponent())) {
pendingTasks.put(key, input);
registerCallback(key, input);

onHubRequest(input);
} else if (pendingTasks.containsKey(key)) {
if (workerConfig.getWorkerSpoutComponent().equals(sourceComponent)) {
// TODO(surabujin): it whould be great to get initial request together with response i.e.
// onAsyncResponse(input, pendingTasks.get(key)onAsyncResponse(input);
onAsyncResponse(input);
} else if (sourceComponent.equals(CoordinatorBolt.ID)) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onTimeout(key, input);
}
private void dispatchResponse(Tuple input) throws Exception {
String key = pullKey();
Tuple request = pendingTasks.get(key);
if (request != null) {
onAsyncResponse(request, input);
} else {
unhandledInput(input);
log.error("Receive response for {}, but there is no pending request by this key", key);
}
}

@Override
protected void handleInput(Tuple input) throws Exception {
// all possible branching was done into `dispatch` nothing should hit here
unhandledInput(input);
}

/**
* Send response to the hub bolt. Note: the operation's key is required.
* @param input received tuple.
* @param input request or response tuple (use key to lookup original request).
* @param values response to be sent to the hub.
*/
protected void emitResponseToHub(Tuple input, Values values) {
String key = input.getStringByField(MessageKafkaTranslator.KEY_FIELD);
cancelCallback(key, input);
String key;
try {
key = pullKey(input);
} catch (PipelineException e) {
throw new IllegalStateException(String.format("Can't extract request key from %s: %s", input, e), e);
}
cancelCallback(key);

Tuple processingRequest = pendingTasks.remove(key);
if (processingRequest == null) {
throw new IllegalStateException(format("Attempt to send response for non pending task with id %s", key));
}
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), values);
getOutput().emitDirect(processingRequest.getSourceTask(), workerConfig.getStreamToHub(), getCurrentTuple(),
values);
}

protected abstract void onHubRequest(Tuple input) throws Exception;

protected abstract void onAsyncResponse(Tuple input) throws Exception;
protected abstract void onAsyncResponse(Tuple request, Tuple response) throws Exception;

@Override
protected final void onTimeout(String key, Tuple tuple) throws PipelineException {
Tuple request = pendingTasks.get(key);
if (request != null) {
log.warn("Timeout occurred while waiting for a response for {}", key);
onRequestTimeout(request);

// Do not remove request from pendingTask until now, because emitResponseToHub(most likely called by
// onRequestTimeout) can query pendingTasks
pendingTasks.remove(key);
} else {
log.debug("Receive timeout notification for {}, but there is no pending request by this key", key);
}
}

protected abstract void onRequestTimeout(Tuple request) throws PipelineException;

protected String pullKey() throws PipelineException {
return this.pullKey(getCurrentTuple());
}

protected String pullKey(Tuple tuple) throws PipelineException {
return pullValue(tuple, MessageKafkaTranslator.FIELD_ID_KEY, String.class);
}

@Override
protected void init() {
super.init();
pendingTasks = new HashMap<>();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Expand Down
Expand Up @@ -118,7 +118,7 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}

@Override
Expand Down
Expand Up @@ -127,6 +127,6 @@ public void sendHistoryUpdate(FlowHistoryHolder historyHolder) {

@Override
public void cancelTimeoutCallback(String key) {
cancelCallback(key, getCurrentTuple());
cancelCallback(key);
}
}
Expand Up @@ -36,42 +36,31 @@ public class SpeakerWorkerBolt extends WorkerBolt implements SpeakerCommandCarri
public static final String ID = "speaker.worker.bolt";
private transient SpeakerWorkerService service;

private Tuple currentTuple;

public SpeakerWorkerBolt(Config config) {
super(config);
}

@Override
protected void init() {
super.init();
service = new SpeakerWorkerService(this);
}

@Override
protected void onHubRequest(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(FIELD_ID_KEY);
SpeakerFlowRequest command = (SpeakerFlowRequest) input.getValueByField(FIELD_ID_PAYLOAD);

service.sendCommand(key, command);
SpeakerFlowRequest command = pullValue(input, FIELD_ID_PAYLOAD, SpeakerFlowRequest.class);
service.sendCommand(pullKey(), command);
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
this.currentTuple = input;

String key = input.getStringByField(FIELD_ID_KEY);
FlowResponse message = (FlowResponse) input.getValueByField(FIELD_ID_PAYLOAD);

service.handleResponse(key, message);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
FlowResponse message = pullValue(response, FIELD_ID_PAYLOAD, FlowResponse.class);
service.handleResponse(pullKey(response), message);
}

@Override
public void onTimeout(String key, Tuple tuple) throws PipelineException {
this.currentTuple = tuple;

service.handleTimeout(key);
protected void onRequestTimeout(Tuple tuple) throws PipelineException {
service.handleTimeout(pullKey(tuple));
}

@Override
Expand All @@ -91,12 +80,12 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {

@Override
public void sendCommand(String key, SpeakerFlowRequest command) {
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), currentTuple, new Values(key, command));
emitWithContext(SPEAKER_WORKER_REQUEST_SENDER.name(), getCurrentTuple(), new Values(key, command));
}

@Override
public void sendResponse(String key, FlowResponse response) {
Values values = new Values(key, response, getCommandContext());
emitResponseToHub(currentTuple, values);
emitResponseToHub(getCurrentTuple(), values);
}
}
Expand Up @@ -59,14 +59,12 @@ protected void onHubRequest(Tuple input) throws PipelineException {
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
handleCommand(input, SpeakerRouter.FIELD_ID_INPUT);
protected void onAsyncResponse(Tuple request, Tuple response) throws PipelineException {
handleCommand(request, SpeakerRouter.FIELD_ID_INPUT);
}

@Override
public void onTimeout(String key, Tuple tuple) {
Tuple request = pendingTasks.get(key);

protected void onRequestTimeout(Tuple request) {
try {
handleTimeout(request, BfdPortHandler.FIELD_ID_COMMAND);
} catch (PipelineException e) {
Expand Down Expand Up @@ -109,8 +107,8 @@ private void handleCommand(Tuple input, String field) throws PipelineException {
command.apply(this);
}

private void handleTimeout(Tuple input, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(input, field, SpeakerWorkerCommand.class);
private void handleTimeout(Tuple request, String field) throws PipelineException {
SpeakerWorkerCommand command = pullValue(request, field, SpeakerWorkerCommand.class);
command.timeout(this);
}

Expand Down
Expand Up @@ -189,7 +189,8 @@ private Values makeBfdPortTuple(BfdPortCommand command) {
}

private Values makeSwitchManagerWorkerTuple(String key, SwitchId switchId) {
return new Values(key, new SwitchManagerSynchronizeSwitchCommand(key, switchId), getCommandContext());
return new Values(
key, new SwitchManagerSynchronizeSwitchCommand(key, switchId), getCommandContext().fork("sync"));
}

// SwitchCommand processing
Expand Down
Expand Up @@ -61,14 +61,12 @@ protected void onHubRequest(Tuple input) throws PipelineException {
}

@Override
protected void onAsyncResponse(Tuple input) throws PipelineException {
handleCommand(input, SwitchManagerRouter.FIELD_ID_COMMAND);
protected void onAsyncResponse(Tuple request, Tuple response) throws Exception {
handleCommand(response, SwitchManagerRouter.FIELD_ID_COMMAND);
}

@Override
public void onTimeout(String key, Tuple tuple) {
Tuple request = pendingTasks.get(key);

protected void onRequestTimeout(Tuple request) throws PipelineException {
try {
handleTimeout(request, SwitchHandler.FIELD_ID_COMMAND);
} catch (PipelineException e) {
Expand Down