Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

BOF can possibly continue with failures.

Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-4045
  • Loading branch information...
commit 3f8372c1f5225afe83dced250660c4314e8d86b0 1 parent 67b340b
@driftx driftx authored
Showing with 13 additions and 0 deletions.
  1. +13 −0 src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
View
13 src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -47,6 +47,8 @@
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
@@ -55,11 +57,14 @@
private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+ private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
private final Configuration conf;
+ private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
private SSTableSimpleUnsortedWriter writer;
private SSTableLoader loader;
private File outputdir;
private Progressable progress;
+ private int maxFailures;
private enum CFType
{
@@ -95,6 +100,7 @@
Config.setOutboundBindAny(true);
this.conf = conf;
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0")));
+ maxFailures = Integer.valueOf(conf.get(MAX_FAILED_HOSTS, "O"));
String keyspace = ConfigHelper.getOutputKeyspace(conf);
outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); //dir must be named by ks/cf for the loader
outputdir.mkdirs();
@@ -218,6 +224,13 @@ private void close() throws IOException
throw new IOException(e);
}
}
+ if (future.hadFailures())
+ {
+ if (future.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + future.getFailedHosts());
+ else
+ logger.warn("Some hosts failed: " + future.getFailedHosts());
+ }
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.