Skip to content

Commit

Permalink
s3 uri add, encoding bug fix, tests in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Feb 11, 2016
1 parent 5bfce53 commit f4a54b8
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 10 deletions.
21 changes: 21 additions & 0 deletions src/edu/washington/escience/myria/io/ByteSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
*
*/
package edu.washington.escience.myria.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
*
*/
public class ByteSink implements DataSink {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;

@Override
public OutputStream getOutputStream() throws IOException {
return new ByteArrayOutputStream();
}
}
6 changes: 3 additions & 3 deletions src/edu/washington/escience/myria/io/DataSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "dataType")
@JsonSubTypes({
@Type(name = "Bytes", value = ByteArraySource.class), @Type(name = "File", value = FileSource.class),
@Type(name = "URI", value = UriSource.class), @Type(name = "Empty", value = EmptySource.class) })
@Type(name = "URI", value = UriSink.class), @Type(name = "Pipe", value = PipeSink.class),
@Type(name = "Bytes", value = ByteSink.class) })
public interface DataSink extends Serializable {
/**
* Returns an {@link OutputStream} providing write access to the specified data destination.
Expand All @@ -25,4 +25,4 @@ public interface DataSink extends Serializable {
*/
OutputStream getOutputStream() throws IOException;

}
}
17 changes: 11 additions & 6 deletions src/edu/washington/escience/myria/io/UriSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -18,13 +19,17 @@ public class UriSink implements DataSink {
private static final long serialVersionUID = 1L;

@JsonProperty
private final URI uri;
private URI uri;

public UriSink(@JsonProperty(value = "uri", required = true) String uri) throws CatalogException {
if (uri.contains("s3")) {
uri = uri.replace("s3", "s3a");
}
public UriSink(@JsonProperty(value = "uri", required = true) final String uri) throws CatalogException,
URISyntaxException {
this.uri = URI.create(Objects.requireNonNull(uri, "Parameter uri cannot be null"));
/* Force using the Hadoop S3A FileSystem */
if (this.uri.getScheme().equals("s3")) {
this.uri =
new URI("s3a", this.uri.getUserInfo(), this.uri.getHost(), this.uri.getPort(), this.uri.getPath(), this.uri
.getQuery(), this.uri.getFragment());
}
if (!this.uri.getScheme().equals("hdfs") && !this.uri.getScheme().equals("s3a")) {
throw new CatalogException("URI must be an HDFS or S3 URI");
}
Expand All @@ -38,4 +43,4 @@ public OutputStream getOutputStream() throws IOException {
Path rootPath = new Path(uri);
return fs.create(rootPath);
}
}
}
5 changes: 4 additions & 1 deletion src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1072,8 +1073,10 @@ public DatasetStatus deleteDataset(final RelationKey relationKey) throws DbExcep
* @return the queryID
* @throws DbException if there is an error
* @throws InterruptedException interrupted
* @throws URISyntaxException
*/
public long persistDataset(final RelationKey relationKey) throws DbException, InterruptedException {
public long persistDataset(final RelationKey relationKey) throws DbException, InterruptedException,
URISyntaxException {
long queryID;

/* Mark the relation as is_persistent */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
*
*/
package edu.washington.escience.myria.systemtest;

import static org.junit.Assert.assertEquals;

import java.nio.file.Paths;

import org.junit.Test;

import edu.washington.escience.myria.PostgresBinaryTupleWriter;
import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.io.DataSource;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.io.UriSink;
import edu.washington.escience.myria.operator.DataOutput;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.operator.network.partition.RoundRobinPartitionFunction;
import edu.washington.escience.myria.util.JsonAPIUtils;

/**
*/
public class DataSinkTest extends SystemTestBase {

@Test
public void s3UploadTest() throws Exception {
/* Ingest test data */
DataSource relationSource = new FileSource(Paths.get("testdata", "filescan", "simple_two_col_int.txt").toString());
RelationKey relationKey = RelationKey.of("public", "adhoc", "testIngest");
Schema relationSchema = Schema.ofFields("x", Type.INT_TYPE, "y", Type.INT_TYPE);
JsonAPIUtils.ingestData("localhost", masterDaemonPort, ingest(relationKey, relationSchema, relationSource, ' ',
new RoundRobinPartitionFunction(workerIDs.length)));

/* Construct the query programmatically -- for the workers */
DbQueryScan dbScan = new DbQueryScan(relationKey, relationSchema);
DataSink sink = new UriSink("s3://myria-test/");
DataOutput dataOutput = new DataOutput(dbScan, new PostgresBinaryTupleWriter(), sink);

// finish the query plan
// server.submitQueryPlan(serverPlan, workerPlans).get();

/* Read the data back in from S3 and verify results */
String data = ""; // S3 download via JsonUtils?
String expectedData = "";
assertEquals(data, expectedData);
}
}
48 changes: 48 additions & 0 deletions test/edu/washington/escience/myria/operator/DataOutputTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
*
*/
package edu.washington.escience.myria.operator;

import org.junit.Test;

import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.CsvTupleWriter;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.io.ByteSink;
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.io.FileSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.util.TestEnvVars;

/**
*
*/
public class DataOutputTest {

@Test
public void readCSVTest() throws Exception {
/* Read a CSV and construct the query */
final String filename = "testdata/twitter/TwitterK.csv";
final Schema schema = new Schema(ImmutableList.of(Type.INT_TYPE, Type.INT_TYPE));

FileScan scanCSV = new FileScan(new FileSource(filename), schema);
DataSink byteSink = new ByteSink();
DataOutput dataOutput = new DataOutput(scanCSV, new CsvTupleWriter(), byteSink);

dataOutput.open(TestEnvVars.get());

/* Read the data */
String result = null;
TupleBatch tb = null;
while (!dataOutput.eos()) {
tb = dataOutput.nextReady();
if (tb != null) {
// add to result
}
}

/* Verify results */
}
}

0 comments on commit f4a54b8

Please sign in to comment.