Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MIST-1139] Integrate replaying upon recovery #1140

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.parameters;

import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;

/**
* The address of the replay server.
*/
@NamedParameter(doc = "The address of the replay server.", default_value = "noReplay", short_name = "replay_address")
public class ReplayServerAddress implements Name<String> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.parameters;

import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;

/**
* The port used for replaying.
*/
@NamedParameter(doc = "The port of the replay server.", default_value = "26523", short_name = "replay_port")
public class ReplayServerPort implements Name<Integer> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package edu.snu.mist.core.replay;

import edu.snu.mist.core.task.ExecutionDag;
import edu.snu.mist.core.task.ExecutionVertex;
import edu.snu.mist.core.task.PhysicalSource;
import org.apache.reef.io.Tuple;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.simple.JSONArray;
Expand Down Expand Up @@ -289,4 +292,67 @@ public static boolean removeOnCheckpoint(final String replayServerAddress, final
private static String getReplayServerUrl(final String replayServerAddress, final int replayServerPort) {
return "http://" + replayServerAddress + ":" + replayServerPort;
}

/**
* Start the sources.
*/
public static void startSources(final Collection<ExecutionDag> executionDags) {
LOG.log(Level.INFO, "Starting sources...");
for (final ExecutionDag executionDag : executionDags) {
for (final ExecutionVertex executionVertex : executionDag.getDag().getRootVertices()) {
final PhysicalSource src = (PhysicalSource) executionVertex;
src.start();
}
}
}

/**
* Send the MqttMessages of the sources in chronological order.
* @param srcAndMqttMessageListMap each value(List) is always required to be ordered in chronological order.
*/
public static void sendMsgs(final Map<PhysicalSource, List<Tuple<Long, MqttMessage>>> srcAndMqttMessageListMap) {
final Map<Tuple<Long, MqttMessage>, PhysicalSource> timestampMqttMessageTupleAndSrcMap = new HashMap<>();
for (final Map.Entry<PhysicalSource, List<Tuple<Long, MqttMessage>>> entry : srcAndMqttMessageListMap.entrySet()) {
final List<Tuple<Long, MqttMessage>> messageList = entry.getValue();
if (!messageList.isEmpty()) {
timestampMqttMessageTupleAndSrcMap.put(entry.getValue().remove(0), entry.getKey());
}
}
while (!srcAndMqttMessageListMap.isEmpty()) {
// Select the entry with the minimum timestamp.
final Tuple<Tuple<Long, MqttMessage>, PhysicalSource> minTimestampEntry =
selectAndRemoveMinTimestampEntry(timestampMqttMessageTupleAndSrcMap);
final Tuple<Long, MqttMessage> minTimestampTuple = minTimestampEntry.getKey();
final PhysicalSource minTimestampSrc = minTimestampEntry.getValue();

// Emit the MqttMessage corresponding to the minimum timestamp.
minTimestampSrc.getEventGenerator().emitData(minTimestampTuple.getValue());

if (srcAndMqttMessageListMap.get(minTimestampSrc).isEmpty()) {
// If there are no more messages to send for the source, exclude it completely.
srcAndMqttMessageListMap.remove(minTimestampSrc);
} else {
// If there are still more messages to send for the source, put the next message in the
// timestampMqttMessageAndSrcMap.
timestampMqttMessageTupleAndSrcMap.put(srcAndMqttMessageListMap.get(minTimestampSrc).remove(0),
minTimestampSrc);
}
}
}

private static Tuple<Tuple<Long, MqttMessage>, PhysicalSource> selectAndRemoveMinTimestampEntry(
final Map<Tuple<Long, MqttMessage>, PhysicalSource> timestampMqttMessageAndSrcMap) {
Long minimumTimestamp = Long.MAX_VALUE;
Tuple<Tuple<Long, MqttMessage>, PhysicalSource> result = null;
for (final Map.Entry<Tuple<Long, MqttMessage>, PhysicalSource> entry : timestampMqttMessageAndSrcMap.entrySet()) {
final Long entryTimestamp = entry.getKey().getKey();
if (entryTimestamp < minimumTimestamp) {
minimumTimestamp = entryTimestamp;
result = new Tuple<>(entry.getKey(), entry.getValue());
}
}
// Remove the entry to emit from the map.
timestampMqttMessageAndSrcMap.remove(result.getKey());
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
*/
public interface DataGenerator<T> extends AutoCloseable {

/**
* Starts receiving data.
*/
void setup();

/**
* Starts generating data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public KafkaDataGenerator(
this.pollTimeout = kafkaSharedResource.getPollTimeout();
}

@Override
public void setup() {
// This method is only for MQTTDataGenerator.
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

Expand All @@ -26,6 +28,11 @@
public final class MQTTDataGenerator implements DataGenerator<MqttMessage> {
private static final Logger LOG = Logger.getLogger(MQTTDataGenerator.class.getName());

/**
* A flag for setup to subscribe.
*/
private final AtomicBoolean setup;

/**
* A flag for start.
*/
Expand All @@ -51,12 +58,19 @@ public final class MQTTDataGenerator implements DataGenerator<MqttMessage> {
*/
private EventGenerator<MqttMessage> eventGenerator;

/**
* A queue for events that must be emitted.
*/
private Queue<MqttMessage> receivedMessages;

public MQTTDataGenerator(final MQTTSubscribeClient subClient,
final String topic) {
this.subClient = subClient;
this.topic = topic;
this.setup = new AtomicBoolean(false);
this.started = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
this.receivedMessages = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -65,18 +79,28 @@ public MQTTDataGenerator(final MQTTSubscribeClient subClient,
* @param message the message to emit
*/
void emitData(final MqttMessage message) {
if (!closed.get() && eventGenerator != null) {
if (setup.get() && !started.get() && !closed.get()) {
receivedMessages.add(message);
} else if (!closed.get() && eventGenerator != null) {
eventGenerator.emitData(message);
}
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
public void setup() {
if (setup.compareAndSet(false, true)) {
subClient.subscribe(topic);
}
}

@Override
public void start() {
while (!receivedMessages.isEmpty()) {
eventGenerator.emitData(receivedMessages.poll());
}
started.compareAndSet(false, true);
}

@Override
public void close() {
closed.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public NettyTextDataGenerator(
this.serverSocketAddress = new InetSocketAddress(serverAddr, port);
}

@Override
public void setup() {
// This method is only for MQTTDataGenerator.
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.task;

import edu.snu.mist.core.replay.EventReplayResult;
import edu.snu.mist.core.replay.EventReplayUtils;
import org.apache.reef.io.Tuple;
import org.apache.reef.tang.exceptions.InjectionException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.IOException;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class BaseQueryStarter {

private static final Logger LOG = Logger.getLogger(BaseQueryStarter.class.getName());

/**
* The map of sources and queryIds. This is only used for replaying events.
*/
private final Map<PhysicalSource, String> srcAndQueryIdMap;

/**
* The address of the replay server.
*/
private final String replayServerAddress;

/**
* The port number of the replay server.
*/
private final int replayServerPort;

protected BaseQueryStarter(final String replayServerAddress,
final int replayServerPort) {
this.srcAndQueryIdMap = new HashMap<>();
this.replayServerAddress = replayServerAddress;
this.replayServerPort = replayServerPort;
}

protected void replayAndStart(final Map<String, Set<Tuple<String, String>>> queryIdAndBrokerTopicMap,
final long minTimestamp,
final Collection<ExecutionDag> executionDagCollection)
throws InjectionException, IOException, ClassNotFoundException {
if (replayServerAddress.equals("noReplay")) {
LOG.log(Level.WARNING, "Replay server is not up.");
// Start the sources.
for (final ExecutionDag executionDag : executionDagCollection) {
for (final ExecutionVertex executionVertex : executionDag.getDag().getRootVertices()) {
final PhysicalSource src = (PhysicalSource) executionVertex;
src.start();
}
}
} else {
final Set<Tuple<String, String>> replayedBrokerTopicSet = new HashSet<>();
for (final ExecutionDag dag : executionDagCollection) {
final Map<PhysicalSource, List<Tuple<Long, MqttMessage>>> srcAndMqttMessageListMap = new HashMap<>();
for (final ExecutionVertex source : dag.getDag().getRootVertices()) {
final PhysicalSource physicalSource = (PhysicalSource) source;
final String queryId = srcAndQueryIdMap.remove(source);
final Set<Tuple<String, String>> brokerURIAndTopicSet = queryIdAndBrokerTopicMap.get(queryId);
for (final Tuple<String, String> brokerURIAndTopic : brokerURIAndTopicSet) {
if (!replayedBrokerTopicSet.contains(brokerURIAndTopic)) {
final EventReplayResult eventReplayResult =
EventReplayUtils.replay(replayServerAddress, replayServerPort,
brokerURIAndTopic.getValue(), brokerURIAndTopic.getKey(), minTimestamp);
if (eventReplayResult.isSuccess()) {
final List<Tuple<Long, MqttMessage>> mqttMessageList = eventReplayResult.getMqttMessages();
srcAndMqttMessageListMap.put(physicalSource, mqttMessageList);
} else {
LOG.log(Level.WARNING, "Replay server is not up and/or replaying events has failed.");
LOG.log(Level.WARNING,
"Sources for query " + queryId + " will be started without or partial replaying of events.");
}
replayedBrokerTopicSet.add(brokerURIAndTopic);
}
}
}
EventReplayUtils.sendMsgs(srcAndMqttMessageListMap);
}
EventReplayUtils.startSources(executionDagCollection);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public PhysicalSourceImpl(final String sourceId,
super(sourceId, configuration);
this.dataGenerator = dataGenerator;
this.eventGenerator = eventGenerator;
dataGenerator.setup();
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions mist-core/src/main/java/edu/snu/mist/core/task/QueryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import edu.snu.mist.formats.avro.AvroDag;
import edu.snu.mist.formats.avro.QueryCheckpoint;
import edu.snu.mist.formats.avro.QueryControlResult;
import org.apache.reef.io.Tuple;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.tang.exceptions.InjectionException;

import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
* This interface manages queries that are submitted from clients.
Expand All @@ -50,6 +52,16 @@ public interface QueryManager extends AutoCloseable {
QueryControlResult createQueryWithCheckpoint(AvroDag avroDag,
QueryCheckpoint checkpointedState);

/**
* Recover a checkpointed query.
* This method does not replay missed events and does not start the sources of the query.
* @param avroDag
* @param checkpointedState
* @return the set of tuples (mqtt topic and the brokerURI) of the submitted query
*/
Set<Tuple<String, String>> setupQueryWithCheckpoint(AvroDag avroDag,
QueryCheckpoint checkpointedState);

/**
* Create a query (this is for checkpointing).
* @param queryId query id
Expand Down
Loading