Skip to content
This repository has been archived by the owner on Mar 31, 2022. It is now read-only.

Zvo/stream progress #13

Merged
merged 2 commits into from
Feb 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@

== 2.4
* Progress reporting for the streaming phase

== 2.3
* Fixed disappearing config

== 2.2
* Added support for CQL collections
* Cleanly shutdown java-driver connection if there is some error
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.spotify.hdfs2cass</groupId>
<artifactId>spotify-hdfs2cass</artifactId>
<version>2.4-SNAPSHOT</version>
<version>2.4</version>
<name>${project.groupId}:${project.artifactId}</name>

<description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public CrunchCqlBulkRecordWriter getRecordWriter(FileSystem filesystem, JobConf
throw new CrunchRuntimeException("Use getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)");
}

@Override
public CrunchCqlBulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
return new CrunchCqlBulkRecordWriter(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
package com.spotify.hdfs2cass.cassandra.cql;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import com.spotify.hdfs2cass.cassandra.thrift.ProgressHeartbeat;
import com.spotify.hdfs2cass.cassandra.thrift.ProgressIndicator;
import com.spotify.hdfs2cass.crunch.CrunchConfigHelper;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
Expand All @@ -30,13 +33,20 @@
import org.apache.cassandra.hadoop.HadoopCompat;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* This is an almost-copy of {@link org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter}
Expand All @@ -46,7 +56,11 @@
*/
public class CrunchCqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>> {

private static final Logger LOG = LoggerFactory.getLogger(CrunchCqlBulkRecordWriter.class);

private String keyspace;
private final ProgressHeartbeat heartbeat;

private String columnFamily;
private String schema;
private String insertStatement;
Expand All @@ -55,6 +69,7 @@ public class CrunchCqlBulkRecordWriter extends AbstractBulkRecordWriter<Object,
public CrunchCqlBulkRecordWriter(TaskAttemptContext context) throws IOException {
super(context);
setConfigs();
heartbeat = new ProgressHeartbeat(context, 120);
}

private void setConfigs() throws IOException
Expand All @@ -81,8 +96,8 @@ private void prepareWriter() throws IOException {
if (loader == null) {
CrunchExternalClient externalClient = new CrunchExternalClient(conf);
externalClient.addKnownCfs(keyspace, schema);
this.loader =
new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler());
this.loader = new SSTableLoader(outputDir, externalClient,
new BulkRecordWriter.NullOutputHandler());
}
} catch (Exception e) {
throw new IOException(e);
Expand Down Expand Up @@ -118,4 +133,37 @@ private File getColumnFamilyDirectory() throws IOException {
return dir;
}

@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
close();
}

@Deprecated
public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException {
close();
}

private void close() throws IOException {
LOG.info("SSTables built. Now starting streaming");
heartbeat.startHeartbeat();
try {
if (writer != null) {
writer.close();
Future<StreamState> future =
loader.stream(Collections.<InetAddress>emptySet(), new ProgressIndicator());
try {
Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException e) {
throw new RuntimeException("Streaming to the following hosts failed: " +
loader.getFailedHosts(), e);
}
} else {
LOG.info("SSTableWriter wasn't instantiated, no streaming happened.");
}
} finally {
heartbeat.stopHeartbeat();
}
LOG.info("Streaming finished successfully");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public class CrunchBulkRecordWriter
extends RecordWriter<ByteBuffer, List<Mutation>> implements
org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> {

private final static Logger logger = LoggerFactory.getLogger(CrunchBulkRecordWriter.class);
private final static Logger LOG = LoggerFactory.getLogger(CrunchBulkRecordWriter.class);

private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";

private final Configuration conf;
private final ProgressHeartbeat heartbeat;
private SSTableSimpleUnsortedWriter writer;
private SSTableLoader loader;
private File outputdir;
Expand All @@ -94,6 +95,7 @@ public CrunchBulkRecordWriter(TaskAttemptContext context) {
this.context = context;
int megabitsPerSec = Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"));
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(megabitsPerSec);
heartbeat = new ProgressHeartbeat(context, 120);
}

private String getOutputLocation() {
Expand Down Expand Up @@ -151,7 +153,8 @@ private void prepareWriter() {
ConfigHelper.getOutputKeyspaceUserName(conf),
ConfigHelper.getOutputKeyspacePassword(conf));

this.loader = new SSTableLoader(outputdir, externalClient, new OutputHandler.SystemOutput(true, true));
this.loader = new SSTableLoader(outputdir, externalClient,
new OutputHandler.SystemOutput(true, true));
}
}

Expand Down Expand Up @@ -216,22 +219,25 @@ public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
}

private void close() throws IOException {
logger.info("Closing bulk record writer");
ProgressHeartbeat heartbeat = new ProgressHeartbeat(context, 120);
LOG.info("SSTables built. Now starting streaming");
heartbeat.startHeartbeat();
try {
if (writer != null) {
writer.close();
Future<StreamState> future = loader.stream(Collections.<InetAddress>emptySet(), new ProgressIndicator());
Future<StreamState> future =
loader.stream(Collections.<InetAddress>emptySet(), new ProgressIndicator());
try {
Uninterruptibles.getUninterruptibly(future);
} catch (ExecutionException e) {
throw new RuntimeException("Streaming to the following hosts failed: " + loader.getFailedHosts(), e);
throw new RuntimeException("Streaming to the following hosts failed: " +
loader.getFailedHosts(), e);
}
} else {
LOG.info("SSTableWriter wasn't instantiated, no streaming happened.");
}
} finally {
heartbeat.stopHeartbeat();
}
logger.info("Succesfully closed bulk record writer");
LOG.info("Successfully closed bulk record writer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
package com.spotify.hdfs2cass.cassandra.thrift;

import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Runs a heartbeat thread in the background that calls progress every SLEEP_MINS in order to keep
* DoFns from timing out. The heartbeat will stop calling progress() after stopAfterMins.
*/
public class ProgressHeartbeat extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(ProgressHeartbeat.class);

private static final int SLEEP_MINS = 1;

private final Progressable progressable;
Expand All @@ -48,6 +53,7 @@ public void stopHeartbeat() {
public void run() {
int minsRunning = 0;
while (!isCancelled && minsRunning < stopAfterMins) {
LOG.debug("Heartbeat invoked");
progressable.progress();
try {
Thread.sleep(1000L * 60L * SLEEP_MINS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.Map;
Expand All @@ -33,6 +35,9 @@
* Return true when everything is at 100%
*/
public class ProgressIndicator implements StreamEventHandler {

private static final Logger LOG = LoggerFactory.getLogger(ProgressIndicator.class);

private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();

Expand All @@ -51,10 +56,25 @@ public void onFailure(Throwable t) {
}

public void handleStreamEvent(StreamEvent event) {

LOG.debug("Handling stream event");

if (event.eventType == StreamEvent.Type.STREAM_PREPARED) {

SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
sessionsByHost.put(session.peer, session);
LOG.info(String.format("Session to %s created", session.connecting.getHostAddress()));

} else if (event.eventType == StreamEvent.Type.STREAM_COMPLETE ) {

StreamEvent.SessionCompleteEvent completionEvent = ((StreamEvent.SessionCompleteEvent) event);
if (completionEvent.success) {
LOG.info(String.format("Stream to %s successful.", completionEvent.peer.getHostAddress()));
} else {
LOG.info(String.format("Stream to %s failed.", completionEvent.peer.getHostAddress()));
}
} else if (event.eventType == StreamEvent.Type.FILE_PROGRESS) {

ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;

// update progress
Expand All @@ -63,12 +83,14 @@ public void handleStreamEvent(StreamEvent event) {
progresses = Sets.newSetFromMap(Maps.<ProgressInfo, Boolean>newConcurrentMap());
progressByHost.put(progressInfo.peer, progresses);
}
if (progresses.contains(progressInfo))
if (progresses.contains(progressInfo)) {
progresses.remove(progressInfo);
}
progresses.add(progressInfo);

// craft status update string
StringBuilder sb = new StringBuilder();
sb.append("\rprogress: ");
sb.append("progress: ");

long totalProgress = 0;
long totalSize = 0;
Expand All @@ -79,8 +101,9 @@ public void handleStreamEvent(StreamEvent event) {
long current = 0;
int completed = 0;
for (ProgressInfo progress : entry.getValue()) {
if (progress.currentBytes == progress.totalBytes)
if (progress.currentBytes == progress.totalBytes) {
completed++;
}
current += progress.currentBytes;
}
totalProgress += current;
Expand All @@ -99,7 +122,7 @@ public void handleStreamEvent(StreamEvent event) {
sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");

System.out.print(sb.toString());
LOG.info(sb.toString());
}
}

Expand Down