Skip to content

Commit

Permalink
Merge pull request #864 from uwescience/parallel-ingest-operator-for-…
Browse files Browse the repository at this point in the history
…raco

Pipeline Parallel Ingest
  • Loading branch information
jortiz16 committed Feb 14, 2017
2 parents 439c005 + f38f5f3 commit d282c44
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 57 deletions.
14 changes: 14 additions & 0 deletions src/edu/washington/escience/myria/api/MyriaApiException.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,18 @@ public MyriaApiException(final Status status, final String explanation) {
super(new RuntimeException(explanation), MyriaExceptionMapper.getResponse(status, explanation));
LOGGER.trace("In Status:String (log will be after getResponse)");
}

/**
* Construct a MyriaApiException from the given status, explanation and cause.
* The entity of the HTTP Response includes the given HTTP status code and the body is
* the provided explanation string and the cause.
*
* @param status the HTTP status code used for the HTTP response.
* @param explanation the message used to explain the exception in the HTTP response.
* @param cause the cause of the exception in the HTTP response.
*/
public MyriaApiException(final Status status, final String explanation, final Throwable cause) {
super(new RuntimeException(explanation), MyriaExceptionMapper.getResponse(status, cause));
LOGGER.trace("In Status:String (log will be after getResponse)");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package edu.washington.escience.myria.api.encoding;

import java.util.Set;

import edu.washington.escience.myria.CsvTupleReader;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.io.AmazonS3Source;
import edu.washington.escience.myria.operator.CSVFileScanFragment;

public class CSVFileScanFragmentEncoding extends LeafOperatorEncoding<CSVFileScanFragment> {

@Required public CsvTupleReader reader;
@Required public AmazonS3Source source;

public Character delimiter;
public Character quote;
public Character escape;
public Integer skip;

public Set<Integer> workers;

@Override
public CSVFileScanFragment construct(ConstructArgs args) {
/* Attempt to use all the workers if not specified */
if (workers == null) {
workers = args.getServer().getAliveWorkers();
}

/* Find workers */
int[] workersArray =
args.getServer().parallelIngestComputeNumWorkers(source.getFileSize(), workers);

return new CSVFileScanFragment(
source, reader.getSchema(), workersArray, delimiter, quote, escape, skip);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@Type(name = "Consumer", value = ConsumerEncoding.class),
@Type(name = "Counter", value = CounterEncoding.class),
@Type(name = "CrossWithSingleton", value = CrossWithSingletonEncoding.class),
@Type(name = "CSVFileScanFragment", value = CSVFileScanFragmentEncoding.class),
@Type(name = "DbInsert", value = DbInsertEncoding.class),
@Type(name = "DbQueryScan", value = QueryScanEncoding.class),
@Type(name = "DbCreateIndex", value = CreateIndexEncoding.class),
Expand Down
50 changes: 45 additions & 5 deletions src/edu/washington/escience/myria/io/AmazonS3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import java.util.Objects;

import javax.annotation.concurrent.NotThreadSafe;
import javax.ws.rs.core.Response.Status;

import org.apache.commons.httpclient.URIException;
import org.apache.hadoop.conf.Configuration;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects;

import edu.washington.escience.myria.api.MyriaApiException;

/**
*
*/
Expand Down Expand Up @@ -65,11 +69,47 @@ public AmazonS3Source(
this.endRange = MoreObjects.firstNonNull(endRange, getFileSize());
}

public AmazonS3Client getS3Client() {
public AmazonS3Client getS3Client() throws MyriaApiException {
if (s3Client == null) {
clientConfig = new ClientConfiguration();
clientConfig.setMaxErrorRetry(3);
s3Client = new AmazonS3Client(new DefaultAWSCredentialsProviderChain());
/**
* Supported providers in fs.s3a.aws.credentials.provider are InstanceProfileCredentialsProvider,
* EnvironmentVariableCredentialsProvider and AnonymousAWSCredentialsProvider.
*/
AWSCredentialsProvider credentials;
Configuration conf = new Configuration();
String propertyName = "fs.s3a.aws.credentials.provider";
String className = conf.getTrimmed(propertyName);
if (className == null) {
throw new MyriaApiException(Status.INTERNAL_SERVER_ERROR, propertyName + " does not exist");
}
try {
Class<?> credentialClass = Class.forName(className);
try {
credentials =
(AWSCredentialsProvider)
credentialClass
.getDeclaredConstructor(URI.class, Configuration.class)
.newInstance(s3Uri, conf);
} catch (NoSuchMethodException | SecurityException e) {
credentials =
(AWSCredentialsProvider) credentialClass.getDeclaredConstructor().newInstance();
}
clientConfig = new ClientConfiguration();
clientConfig.setMaxErrorRetry(3);
s3Client = new AmazonS3Client(credentials, clientConfig);
} catch (ClassNotFoundException e) {
throw new MyriaApiException(Status.INTERNAL_SERVER_ERROR, className + " not found ", e);
} catch (NoSuchMethodException | SecurityException e) {
throw new MyriaApiException(
Status.INTERNAL_SERVER_ERROR,
className
+ " constructor exception. Should provide an accessible constructor accepting URI"
+ " and Configuration, or an accessible default constructor.",
e);
} catch (ReflectiveOperationException | IllegalArgumentException e) {
throw new MyriaApiException(
Status.INTERNAL_SERVER_ERROR, className + " instantiation exception.", e);
}
}
return s3Client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ public class CSVFileScanFragment extends LeafOperator {
private static final String truncatedQuoteErrorMessage =
"EOF reached before encapsulated token finished";

private final boolean isLastWorker;
private boolean isLastWorker;
private final long maxByteRange;
private final long partitionStartByteRange;
private final long partitionEndByteRange;
private long partitionStartByteRange;
private long partitionEndByteRange;

private long adjustedStartByteRange;
private int byteOffsetFromTruncatedRowAtStart = 0;
Expand All @@ -75,6 +75,8 @@ public class CSVFileScanFragment extends LeafOperator {
private boolean onLastRow;
private boolean finishedReadingLastRow;
private boolean flagAsIncomplete;
private boolean flagAsRangeSelected;
private int[] workerIds;

/** Required for Java serialization. */
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -180,6 +182,32 @@ public CSVFileScanFragment(
onLastRow = false;
finishedReadingLastRow = false;
flagAsIncomplete = false;
flagAsRangeSelected = true;
}

public CSVFileScanFragment(
final AmazonS3Source source,
final Schema schema,
final int[] workerIds,
@Nullable final Character delimiter,
@Nullable final Character quote,
@Nullable final Character escape,
@Nullable final Integer numberOfSkippedLines) {

this.source = Preconditions.checkNotNull(source, "source");
this.schema = Preconditions.checkNotNull(schema, "schema");
this.workerIds = workerIds;

this.delimiter = MoreObjects.firstNonNull(delimiter, CSVFormat.DEFAULT.getDelimiter());
this.quote = MoreObjects.firstNonNull(quote, CSVFormat.DEFAULT.getQuoteCharacter());
this.escape = escape;
this.numberOfSkippedLines = MoreObjects.firstNonNull(numberOfSkippedLines, 0);

maxByteRange = source.getFileSize();
onLastRow = false;
finishedReadingLastRow = false;
flagAsIncomplete = false;
flagAsRangeSelected = false;
}

@Override
Expand Down Expand Up @@ -379,6 +407,31 @@ protected Schema generateSchema() {
@Override
protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbException {
buffer = new TupleBatchBuffer(getSchema());

if (!flagAsRangeSelected) {
int workerID = getNodeID();
long fileSize = source.getFileSize();
long currentPartitionSize = fileSize / workerIds.length;
int workerIndex = 0;
for (int i = 0; i < workerIds.length; i++) {
if (workerID == workerIds[i]) {
workerIndex = i;
}
}
boolean isLastWorker = workerIndex == workerIds.length - 1;
long startByteRange = currentPartitionSize * workerIndex;
long endByteRange;

if (isLastWorker) {
endByteRange = fileSize - 1;
} else {
endByteRange = (currentPartitionSize * (workerIndex + 1)) - 1;
}
this.partitionStartByteRange = startByteRange;
this.partitionEndByteRange = endByteRange;
this.isLastWorker = isLastWorker;
}

try {

adjustedStartByteRange = partitionStartByteRange;
Expand Down
83 changes: 34 additions & 49 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.nio.ByteBuffer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -60,6 +57,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -73,12 +71,12 @@
import edu.washington.escience.myria.CsvTupleWriter;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.MyriaConstants.FunctionLanguage;
import edu.washington.escience.myria.PostgresBinaryTupleWriter;
import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.accessmethod.ConnectionInfo;
import edu.washington.escience.myria.accessmethod.AccessMethod.IndexRef;
import edu.washington.escience.myria.api.MyriaJsonMapperProvider;
import edu.washington.escience.myria.api.encoding.DatasetStatus;
Expand All @@ -99,7 +97,6 @@
import edu.washington.escience.myria.operator.DbCreateIndex;
import edu.washington.escience.myria.operator.DbCreateView;
import edu.washington.escience.myria.operator.DbDelete;
import edu.washington.escience.myria.operator.DbExecute;
import edu.washington.escience.myria.operator.DbInsert;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.operator.DuplicateTBGenerator;
Expand Down Expand Up @@ -131,7 +128,6 @@
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleBuffer;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.tools.MyriaGlobalConfigurationModule.DefaultInstancePath;
import edu.washington.escience.myria.tools.MyriaGlobalConfigurationModule.FlowControlWriteBufferHighMarkBytes;
import edu.washington.escience.myria.tools.MyriaGlobalConfigurationModule.FlowControlWriteBufferLowMarkBytes;
Expand All @@ -149,8 +145,6 @@
import edu.washington.escience.myria.util.IPCUtils;
import edu.washington.escience.myria.util.concurrent.ErrorLoggingTimerTask;
import edu.washington.escience.myria.util.concurrent.RenamingThreadFactory;

import edu.washington.escience.myria.MyriaConstants.FunctionLanguage;
/**
* The master entrance.
*/
Expand Down Expand Up @@ -884,54 +878,18 @@ public DatasetStatus parallelIngestDataset(
final Set<Integer> workersToIngest,
final DistributeFunction distributeFunction)
throws URIException, DbException, InterruptedException {
/* Figure out the workers we will use */
Set<Integer> actualWorkers = workersToIngest;
long fileSize = s3Source.getFileSize();

/* Determine the number of workers to ingest and the partition size */
long partitionSize = 0;
int[] workersArray;
Set<Integer> potentialWorkers = MoreObjects.firstNonNull(workersToIngest, getAliveWorkers());

/* Select a subset of workers */
int[] workersArray = parallelIngestComputeNumWorkers(fileSize, potentialWorkers);

if (workersToIngest == null) {
int[] allWorkers = Ints.toArray(getAliveWorkers());
int totalNumberOfWorkersToIngest = 0;
for (int i = allWorkers.length; i >= 1; i--) {
totalNumberOfWorkersToIngest = i;
long currentPartitionSize = fileSize / i;
if (currentPartitionSize > MyriaConstants.PARALLEL_INGEST_WORKER_MINIMUM_PARTITION_SIZE
|| totalNumberOfWorkersToIngest == 1) {
partitionSize = currentPartitionSize;
break;
}
}
workersArray = Arrays.copyOfRange(allWorkers, 0, totalNumberOfWorkersToIngest);
} else {
Preconditions.checkArgument(actualWorkers.size() > 0, "Must use > 0 workers");
workersArray = Ints.toArray(actualWorkers);
partitionSize = fileSize / workersArray.length;
}
Map<Integer, SubQueryPlan> workerPlans = new HashMap<>();
for (int workerID = 1; workerID <= workersArray.length; workerID++) {
boolean isLastWorker = workerID == workersArray.length;
long startRange = partitionSize * (workerID - 1);
long endRange;
if (isLastWorker) {
endRange = fileSize - 1;
} else {
endRange = (partitionSize * workerID) - 1;
}

CSVFileScanFragment scanFragment =
new CSVFileScanFragment(
s3Source,
schema,
startRange,
endRange,
isLastWorker,
delimiter,
quote,
escape,
numberOfSkippedLines);
s3Source, schema, workersArray, delimiter, quote, escape, numberOfSkippedLines);
workerPlans.put(
workersArray[workerID - 1],
new SubQueryPlan(new DbInsert(scanFragment, relationKey, true)));
Expand Down Expand Up @@ -960,6 +918,33 @@ public DatasetStatus parallelIngestDataset(
return getDatasetStatus(relationKey);
}

/**
* Helper method for parallel ingest.
*
* @param fileSize the size of the file to ingest
* @param allWorkers all workers considered for ingest
*/
public int[] parallelIngestComputeNumWorkers(long fileSize, Set<Integer> allWorkers) {
/* Determine the number of workers to ingest based on partition size */
int totalNumberOfWorkersToIngest = 0;
for (int i = allWorkers.size(); i >= 1; i--) {
totalNumberOfWorkersToIngest = i;
long currentPartitionSize = fileSize / i;
if (currentPartitionSize > MyriaConstants.PARALLEL_INGEST_WORKER_MINIMUM_PARTITION_SIZE) {
break;
}
}
int[] workersArray = new int[allWorkers.size()];
int wCounter = 0;
for (Integer w : allWorkers) {
workersArray[wCounter] = w;
wCounter++;
}
Arrays.sort(workersArray);
workersArray = Arrays.copyOfRange(workersArray, 0, totalNumberOfWorkersToIngest);
return workersArray;
}

/**
* @param relationKey the relationalKey of the dataset to import
* @param schema the schema of the dataset to import
Expand Down

0 comments on commit d282c44

Please sign in to comment.