Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Propagate streaming session errors while moving or decommissioning

  • Loading branch information...
commit a1499e642b05ce5c7b0956d7380f13390b807740 1 parent 998dbce
@tivv authored
View
79 src/java/org/apache/cassandra/service/StorageService.java
@@ -69,7 +69,7 @@
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
-/**
+/*
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
* This class will also maintain histograms of the load information
@@ -2174,7 +2174,8 @@ private void unbootstrap(final Runnable onFinish)
setMode(Mode.LEAVING, "streaming data to other nodes", true);
- CountDownLatch latch = streamRanges(rangesToStream);
+ ErrorMemoryCallback errorCallback = new ErrorMemoryCallback();
+ CountDownLatch latch = streamRanges(rangesToStream, errorCallback);
// wait for the transfer runnables to signal the latch.
logger_.debug("waiting for stream aks.");
@@ -2186,6 +2187,10 @@ private void unbootstrap(final Runnable onFinish)
{
throw new RuntimeException(e);
}
+ if (errorCallback.isError()) {
+ setMode(Mode.NORMAL, "Decommission was unsuccessfull. Returning to normal state.", true);
+ throw new RuntimeException("There was an error while streaming to other nodes.");
+ }
logger_.debug("stream acks all received.");
leaveRing();
onFinish.run();
@@ -2311,7 +2316,8 @@ private void move(Token newToken) throws IOException
if (logger_.isDebugEnabled())
logger_.debug("[Move->STREAMING] Work Map: " + rangesToStreamByTable);
- CountDownLatch streamLatch = streamRanges(rangesToStreamByTable);
+ ErrorMemoryCallback errorCallback = new ErrorMemoryCallback();
+ CountDownLatch streamLatch = streamRanges(rangesToStreamByTable, errorCallback);
if (logger_.isDebugEnabled())
logger_.debug("[Move->FETCHING] Work Map: " + rangesToFetch);
@@ -2327,6 +2333,10 @@ private void move(Token newToken) throws IOException
{
throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
}
+ if (errorCallback.isError()) {
+ setToken(getLocalToken());
+ throw new IOException("There was an error while streaming to other nodes");
+ }
}
setToken(newToken); // setting new token as we have everything settled
@@ -2664,10 +2674,12 @@ public void flushLargestMemtables()
/**
* Seed data to the endpoints that will be responsible for it at the future
*
+ *
* @param rangesToStreamByTable tables and data ranges with endpoints included for each
+ * @param errorCallback
* @return latch to count down
*/
- private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
+ private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable, Runnable errorCallback)
{
final CountDownLatch latch = new CountDownLatch(rangesToStreamByTable.keySet().size());
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByTable.entrySet())
@@ -2689,26 +2701,15 @@ private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, Ine
final Range<Token> range = endPointEntry.getKey();
final InetAddress newEndpoint = endPointEntry.getValue();
- final Runnable callback = new Runnable()
- {
- public void run()
- {
- synchronized (pending)
- {
- pending.remove(endPointEntry);
-
- if (pending.isEmpty())
- latch.countDown();
- }
- }
- };
+ final Runnable callback = new RangeDoneCallback(pending, entry, latch);
+ final Runnable rangeErrorCallback = new RangeDoneCallback(pending, entry, latch, errorCallback);
StageManager.getStage(Stage.STREAM).execute(new Runnable()
{
public void run()
{
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
- StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
+ StreamOut.transferRanges(newEndpoint, Table.open(table), Arrays.asList(range), callback, rangeErrorCallback, OperationType.UNBOOTSTRAP);
}
});
}
@@ -2880,4 +2881,46 @@ public void loadNewSSTables(String ksName, String cfName)
{
ColumnFamilyStore.loadNewSSTables(ksName, cfName);
}
+
+ private static class RangeDoneCallback implements Runnable {
+ private final Set<Map.Entry<Range<Token>, InetAddress>> pending;
+ private final Map.Entry<Range<Token>, InetAddress> entry;
+ private final CountDownLatch latch;
+ private final Runnable nextRunnable;
+
+ public RangeDoneCallback(Set<Map.Entry<Range<Token>, InetAddress>> pending, Map.Entry<Range<Token>, InetAddress> entry, CountDownLatch latch) {
+ this(pending, entry, latch, null);
+ }
+ public RangeDoneCallback(Set<Map.Entry<Range<Token>, InetAddress>> pending, Map.Entry<Range<Token>, InetAddress> entry, CountDownLatch latch, Runnable nextRunnable) {
+ this.pending = pending;
+ this.entry = entry;
+ this.latch = latch;
+ this.nextRunnable = nextRunnable;
+ }
+
+ public void run()
+ {
+ synchronized (pending)
+ {
+ pending.remove(entry);
+
+ if (pending.isEmpty())
+ latch.countDown();
+ }
+ if (nextRunnable != null)
+ nextRunnable.run();
+ }
+ }
+
+ private static class ErrorMemoryCallback implements Runnable {
+ private boolean error = false;
+
+ public void run() {
+ error = true;
+ }
+
+ public boolean isError() {
+ return error;
+ }
+ }
}
View
4 src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -81,9 +81,9 @@
/**
* Stream the given ranges to the target endpoint from each CF in the given keyspace.
*/
- public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+ public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, Runnable callback, Runnable errorCallback, OperationType type)
{
- StreamOutSession session = StreamOutSession.create(table.name, target, callback);
+ StreamOutSession session = StreamOutSession.create(table.name, target, callback, errorCallback);
transferRanges(session, table.getColumnFamilyStores(), ranges, type);
}
View
18 src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -49,6 +49,11 @@ public static StreamOutSession create(String table, InetAddress host, Runnable c
return create(table, host, System.nanoTime(), callback);
}
+ public static StreamOutSession create(String table, InetAddress host, Runnable callback, Runnable errorCallback)
+ {
+ return create(table, host, System.nanoTime(), callback, errorCallback);
+ }
+
public static StreamOutSession create(String table, InetAddress host, long sessionId)
{
return create(table, host, sessionId, null);
@@ -56,8 +61,13 @@ public static StreamOutSession create(String table, InetAddress host, long sessi
public static StreamOutSession create(String table, InetAddress host, long sessionId, Runnable callback)
{
+ return create(table, host, sessionId, callback, null);
+ }
+
+ public static StreamOutSession create(String table, InetAddress host, long sessionId, Runnable callback, Runnable errorCallback)
+ {
Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
- StreamOutSession session = new StreamOutSession(table, context, callback);
+ StreamOutSession session = new StreamOutSession(table, context, callback, errorCallback);
streams.put(context, session);
return session;
}
@@ -72,14 +82,16 @@ public static StreamOutSession get(InetAddress host, long sessionId)
public final String table;
private final Pair<InetAddress, Long> context;
private final Runnable callback;
+ private final Runnable errorCallback;
private volatile String currentFile;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback)
+ private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback, Runnable errorCallback)
{
this.table = table;
this.context = context;
this.callback = callback;
+ this.errorCallback = errorCallback;
Gossiper.instance.register(this);
FailureDetector.instance.registerFailureDetectionEventListener(this);
}
@@ -154,6 +166,8 @@ private void close(boolean success)
// that to a future ticket (likely CASSANDRA-3112)
if (callback != null && success)
callback.run();
+ if (errorCallback != null && !success)
+ errorCallback.run();
}
/** convenience method for use when testing */
Please sign in to comment.
Something went wrong with that request. Please try again.