Skip to content

Commit

Permalink
initial steps, adding sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Oct 8, 2015
1 parent f214bf0 commit f94b528
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 92 deletions.
3 changes: 3 additions & 0 deletions src/edu/washington/escience/myria/CsvTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
*/
public class CsvTupleWriter implements TupleWriter {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/** The CSVWriter used to write the output. */
private final CSVPrinter csvPrinter;

Expand Down
49 changes: 49 additions & 0 deletions src/edu/washington/escience/myria/HdfsWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
*
*/
package edu.washington.escience.myria;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;

import edu.washington.escience.myria.storage.ReadableTable;

/**
*
*/
public class HdfsWriter implements TupleWriter {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

FSDataOutputStream outStream;

public HdfsWriter(final OutputStream out) {
outStream = (FSDataOutputStream) out;
}

@Override
public void writeColumnHeaders(final List<String> columnNames) throws IOException {
}

@Override
public void writeTuples(final ReadableTable tuples) throws IOException {
for (int i = 0; i < tuples.numTuples(); ++i) {
// not sure
outStream.writeBytes(tuples.toString());
}
}

@Override
public void done() throws IOException {
outStream.close();
}

@Override
public void error() throws IOException {
}

}
3 changes: 3 additions & 0 deletions src/edu/washington/escience/myria/JsonTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
*/
public class JsonTupleWriter implements TupleWriter {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/** The names of the columns, escaped for JSON. */
private ImmutableList<String> escapedColumnNames;
/** The {@link PrintWriter} wraps the {@link OutputStream} to which we write the data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
*/
public class PostgresBinaryTupleWriter implements TupleWriter {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/** The ByteBuffer to write the output. */
private final DataOutputStream buffer;

Expand Down
6 changes: 5 additions & 1 deletion src/edu/washington/escience/myria/TupleWriter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.washington.escience.myria;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

import edu.washington.escience.myria.storage.ReadableTable;
Expand All @@ -20,7 +21,10 @@
*
*
*/
public interface TupleWriter {
public interface TupleWriter extends Serializable {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/**
* Inform the {@link TupleWriter} of the column headers. In the standard case (CSV output, see {@link CsvTupleWriter}
Expand Down
9 changes: 4 additions & 5 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,17 @@ public Response deleteDataset(@PathParam("userName") final String userName,
public Response persistDataset(@PathParam("userName") final String userName,
@PathParam("programName") final String programName, @PathParam("relationName") final String relationName)
throws DbException {

DatasetStatus status = server.getDatasetStatus(RelationKey.of(userName, programName, relationName));
if (status == null) {
/* Dataset not found, throw a 404 (Not Found) */
throw new MyriaApiException(Status.NOT_FOUND, "That dataset was not found");
}
RelationKey relationKey = status.getRelationKey();
// run the persist command

try {
status = server.persistDataset(relationKey);
} catch (Exception e) {
throw new DbException();
}

// return a query ID
return Response.noContent().build();
}

Expand Down
30 changes: 30 additions & 0 deletions src/edu/washington/escience/myria/io/DataSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
*
*/
package edu.washington.escience.myria.io;

import java.io.IOException;
import java.io.OutputStream;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

/**
* An interface for any sink of bits. This interface should be the principal parameter to any operator that produces
* tuples for a destination (HDFS, S3, files, etc)
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "dataType")
@JsonSubTypes({
@Type(name = "Bytes", value = ByteArraySource.class), @Type(name = "File", value = FileSource.class),
@Type(name = "URI", value = UriSource.class), @Type(name = "Empty", value = EmptySource.class) })
public interface DataSink {
/**
* Returns an {@link OutputStream} providing write access to the bits in the specified data destination.
*
* @return an {@link OutputStream} providing read access to the bits in the specified data destination.
* @throws IOException if there is an error producing the input stream.
*/
OutputStream getOutputStream() throws IOException;

}
54 changes: 54 additions & 0 deletions src/edu/washington/escience/myria/io/UriSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
*
*/
package edu.washington.escience.myria.io;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Objects;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
*
* Similar to UriSource
*/
public class UriSink implements DataSink {

/** The Uniform Resource Indicator (URI) of the data source. */
@JsonProperty
private final String uri;

/** The logger for debug, trace, etc. messages in this class. */
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(UriSink.class);

public UriSink(@JsonProperty(value = "uri", required = true) final String uri) {
this.uri = Objects.requireNonNull(uri, "Parameter uri to UriSource may not be null");
}

@Override
public OutputStream getOutputStream() throws IOException {
URI parsedUri = URI.create(uri);

return (parsedUri.getScheme().equals("http") || parsedUri.getScheme().equals("https")) ? parsedUri.toURL()
.openConnection().getOutputStream() : getHadoopFileSystemOutputStream(parsedUri);
}

private static OutputStream getHadoopFileSystemOutputStream(final URI uri) throws IOException {
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(uri, conf);
Path rootPath = new Path(uri);

FSDataOutputStream out = fs.create(rootPath);

return out;
}

}
2 changes: 2 additions & 0 deletions src/edu/washington/escience/myria/operator/DataOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class DataOutput extends RootOperator {
private static final long serialVersionUID = 1L;
/** The class that will serialize the tuple batches. */
private final TupleWriter tupleWriter;

/** Whether this object has finished. */
private boolean done = false;

Expand Down Expand Up @@ -62,6 +63,7 @@ protected void consumeTuples(final TupleBatch tuples) throws DbException {
protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbException {
try {
tupleWriter.writeColumnHeaders(getChild().getSchema().getColumnNames());

} catch (IOException e) {
throw new DbException(e);
}
Expand Down
71 changes: 0 additions & 71 deletions src/edu/washington/escience/myria/operator/DbPersist.java

This file was deleted.

35 changes: 20 additions & 15 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import edu.washington.escience.myria.CsvTupleWriter;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.HdfsWriter;
import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.MyriaSystemConfigKeys;
import edu.washington.escience.myria.RelationKey;
Expand All @@ -66,11 +67,12 @@
import edu.washington.escience.myria.expression.MinusExpression;
import edu.washington.escience.myria.expression.VariableExpression;
import edu.washington.escience.myria.expression.WorkerIdExpression;
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.io.UriSink;
import edu.washington.escience.myria.operator.Apply;
import edu.washington.escience.myria.operator.DataOutput;
import edu.washington.escience.myria.operator.DbDelete;
import edu.washington.escience.myria.operator.DbInsert;
import edu.washington.escience.myria.operator.DbPersist;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.operator.DuplicateTBGenerator;
import edu.washington.escience.myria.operator.EOSSource;
Expand Down Expand Up @@ -1058,34 +1060,38 @@ public DatasetStatus deleteDataset(final RelationKey relationKey) throws DbExcep
}

return getDatasetStatus(relationKey);

}

/**
* @param relationKey the relationKey of the dataset to persist
* @param relationKey the relationKey of the dataset to delete
* @return the status
* @throws DbException if there is an error
* @throws InterruptedException interrupted
*/
public DatasetStatus persistDataset(final RelationKey relationKey) throws DbException, InterruptedException {

/* Mark the relation as is_persistent -- locks? */
try {
catalog.markRelationPersistent(relationKey);
} catch (CatalogException e) {
throw new DbException(e);
}
// Here, we tied the persist call to HDFSWriter and URISink, but should we do this choice before this call?

/* Delete from postgres at each worker by calling the Persist Operator */
/* create the query plan for persist */
try {
Map<Integer, SubQueryPlan> workerPlans = new HashMap<>();
for (Integer workerId : getWorkersForRelation(relationKey, null)) {
workerPlans.put(workerId, new SubQueryPlan(new DbPersist(EmptyRelation.of(catalog.getSchema(relationKey)),
relationKey, null)));

try {

DataSink workerSink = new UriSink("/some/worker/address");
TupleWriter workerWriter = new HdfsWriter(workerSink.getOutputStream());

workerPlans.put(workerId, new SubQueryPlan(new DataOutput(
new DbQueryScan(relationKey, getSchema(relationKey)), workerWriter)));
} catch (IOException e) {
// TODO
}
}
ListenableFuture<Query> qf =
queryManager.submitQuery("persist " + relationKey.toString(), "persist " + relationKey.toString(), "persist "
+ relationKey.toString(getDBMS()), new SubQueryPlan(new SinkRoot(new EOSSource())), workerPlans);
queryManager.submitQuery("persist " + relationKey.toString(), "persist " + relationKey.toString(),
"persisting from " + relationKey.toString(getDBMS()), new SubQueryPlan(new SinkRoot(new EOSSource())),
workerPlans);
try {
qf.get();
} catch (ExecutionException e) {
Expand All @@ -1094,7 +1100,6 @@ public DatasetStatus persistDataset(final RelationKey relationKey) throws DbExce
} catch (CatalogException e) {
throw new DbException(e);
}

return getDatasetStatus(relationKey);

}
Expand Down

0 comments on commit f94b528

Please sign in to comment.