Skip to content

Commit

Permalink
addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Jul 15, 2016
1 parent 065f166 commit 8dd5e3c
Show file tree
Hide file tree
Showing 6 changed files with 630 additions and 219 deletions.
35 changes: 35 additions & 0 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.ws.rs.core.UriInfo;

import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.URIException;
import org.glassfish.jersey.media.multipart.ContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +46,7 @@
import edu.washington.escience.myria.accessmethod.AccessMethod.IndexRef;
import edu.washington.escience.myria.api.encoding.DatasetEncoding;
import edu.washington.escience.myria.api.encoding.DatasetStatus;
import edu.washington.escience.myria.api.encoding.ParallelDatasetEncoding;
import edu.washington.escience.myria.api.encoding.TipsyDatasetEncoding;
import edu.washington.escience.myria.coordinator.CatalogException;
import edu.washington.escience.myria.io.InputStreamSource;
Expand Down Expand Up @@ -554,6 +556,39 @@ private Response doIngest(
return builder.entity(status).build();
}

/**
* Ingests a dataset from S3 in parallel
*
* @param dataset the dataset to be ingested.
* @return the created dataset resource.
* @throws DbException if there is an error in the database.
* @throws InterruptedException
* @throws URIException
*/
@POST
@Path("/ParallelIngest")
@Consumes(MediaType.APPLICATION_JSON)
public Response parallelIngest(final ParallelDatasetEncoding dataset)
throws DbException, URIException, InterruptedException {
dataset.validate();
DatasetStatus status =
server.parallelIngestDataset(
dataset.relationKey,
dataset.schema,
dataset.delimiter,
dataset.quote,
dataset.escape,
dataset.numberOfSkippedLines,
dataset.s3Source,
dataset.workers);

/* In the response, tell the client the path to the relation. */
URI datasetUri = getCanonicalResourcePath(uriInfo, dataset.relationKey);
status.setUri(datasetUri);
ResponseBuilder builder = Response.created(datasetUri);
return builder.entity(status).build();
}

/**
* @param dataset the dataset to be imported.
* @param uriInfo information about the current URL.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
*
*/
package edu.washington.escience.myria.api.encoding;

import java.util.Set;

import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.io.AmazonS3Source;

/**
*
*/
public class ParallelDatasetEncoding extends MyriaApiEncoding {
@Required public RelationKey relationKey;
@Required public Schema schema;
@Required public AmazonS3Source s3Source;
public Character delimiter;
public Character escape;
public Integer numberOfSkippedLines;
public Character quote;
public Set<Integer> workers;
}
27 changes: 15 additions & 12 deletions src/edu/washington/escience/myria/io/AmazonS3Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public class AmazonS3Source implements DataSource, Serializable {
private final String bucket;
private final String key;

@JsonCreator
public AmazonS3Source(@JsonProperty(value = "uri", required = true) final String uri)
throws URIException {
this(uri, null, null);
}
private Long fileSize;

public AmazonS3Source(final String uri, final Long startRange, final Long endRange)
@JsonCreator
public AmazonS3Source(
@JsonProperty(value = "s3Uri", required = true) final String uri,
@JsonProperty(value = "startRange") final Long startRange,
@JsonProperty(value = "endRange") final Long endRange)
throws URIException {
s3Uri = URI.create(Objects.requireNonNull(uri, "Parameter uri to UriSource may not be null"));
if (!s3Uri.getScheme().equals("s3")) {
Expand All @@ -61,8 +61,8 @@ public AmazonS3Source(final String uri, final Long startRange, final Long endRan
bucket = removedScheme.substring(0, removedScheme.indexOf('/'));
key = removedScheme.substring(removedScheme.indexOf('/') + 1);

this.startRange = MoreObjects.firstNonNull(new Long(0), startRange);
this.endRange = MoreObjects.firstNonNull(getFileSize(), endRange);
this.startRange = MoreObjects.firstNonNull(startRange, new Long(0));
this.endRange = MoreObjects.firstNonNull(endRange, getFileSize());
}

public AmazonS3Client getS3Client() {
Expand All @@ -75,12 +75,15 @@ public AmazonS3Client getS3Client() {
}

public Long getFileSize() {
return getS3Client().getObjectMetadata(bucket, key).getContentLength();
if (fileSize == null) {
fileSize = getS3Client().getObjectMetadata(bucket, key).getContentLength();
}
return fileSize;
}

public InputStream getInputStream(final long start, final long end) throws IOException {
setStartRange(start);
setEndRange(end);
public InputStream getInputStream(final long startByte, final long endByte) throws IOException {
setStartRange(startByte);
setEndRange(endByte);
return getInputStream();
}

Expand Down

0 comments on commit 8dd5e3c

Please sign in to comment.