Skip to content

Commit

Permalink
added operator to open stream, modified writers
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Oct 18, 2015
1 parent cd677ae commit f0990e5
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 127 deletions.
7 changes: 6 additions & 1 deletion src/edu/washington/escience/myria/CsvTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CsvTupleWriter implements TupleWriter {
static final long serialVersionUID = 1L;

/** The CSVWriter used to write the output. */
private final CSVPrinter csvPrinter;
private CSVPrinter csvPrinter;

/**
* Constructs a {@link CsvTupleWriter} object that will produce an Excel-compatible comma-separated value (CSV) file
Expand Down Expand Up @@ -60,6 +60,11 @@ private CsvTupleWriter(final OutputStream out, final CSVFormat csvFormat) throws
csvPrinter = new CSVPrinter(new BufferedWriter(new OutputStreamWriter(out)), csvFormat);
}

@Override
public void open(final OutputStream stream) throws IOException {
csvPrinter = new CSVPrinter(new BufferedWriter(new OutputStreamWriter(stream)), CSVFormat.DEFAULT);
}

@Override
public void writeColumnHeaders(final List<String> columnNames) throws IOException {
csvPrinter.printRecord(columnNames);
Expand Down
74 changes: 74 additions & 0 deletions src/edu/washington/escience/myria/HdfsTupleWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
*
*/
package edu.washington.escience.myria;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import edu.washington.escience.myria.coordinator.MasterCatalog;
import edu.washington.escience.myria.storage.ReadableTable;

/**
*
*/
public class HdfsTupleWriter implements TupleWriter {

/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/** The logger for this class. */
private static final Logger LOGGER = LoggerFactory.getLogger(MasterCatalog.class);

private static FSDataOutputStream outStream;

public HdfsTupleWriter() {

}

public HdfsTupleWriter(final OutputStream out) {
outStream = (FSDataOutputStream) out;
}

@Override
public void open(final OutputStream stream) {
outStream = (FSDataOutputStream) stream;
}

@Override
public void writeColumnHeaders(final List<String> columnNames) throws IOException {
}

@Override
public void writeTuples(final ReadableTable tuples) throws IOException {

outStream.writeChars("test");

/*
* List<Type> columnTypes = tuples.getSchema().getColumnTypes(); for (int i = 0; i < tuples.numTuples(); ++i) { for
* (int j = 0; j < tuples.numColumns(); ++j) { switch (columnTypes.get(j)) { case BOOLEAN_TYPE:
* outStream.writeBoolean(tuples.getBoolean(j, i)); break; case DOUBLE_TYPE:
* outStream.writeDouble(tuples.getDouble(j, i)); break; case FLOAT_TYPE: outStream.writeFloat(tuples.getFloat(j,
* i)); break; case INT_TYPE: outStream.writeInt(tuples.getInt(j, i)); break; case LONG_TYPE:
* LOGGER.error(String.valueOf(tuples.getLong(j, i))); outStream.writeLong(tuples.getLong(j, i)); break; case
* DATETIME_TYPE: // outStream.writeUTF(tuples.getDateTime(j,i)); break; case STRING_TYPE:
* outStream.writeChars(tuples.getString(j, i)); break; } } }
*/

}

@Override
public void done() throws IOException {
outStream.close();
}

@Override
public void error() throws IOException {
}

}
49 changes: 0 additions & 49 deletions src/edu/washington/escience/myria/HdfsWriter.java

This file was deleted.

10 changes: 9 additions & 1 deletion src/edu/washington/escience/myria/JsonTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class JsonTupleWriter implements TupleWriter {
/** The names of the columns, escaped for JSON. */
private ImmutableList<String> escapedColumnNames;
/** The {@link PrintWriter} wraps the {@link OutputStream} to which we write the data. */
private final PrintWriter output;
private PrintWriter output;
/** Whether we have output a single tuple yet. */
private boolean haveWritten = false;

Expand All @@ -53,6 +53,14 @@ public JsonTupleWriter(final OutputStream output) {
this.output = new PrintWriter(new BufferedWriter(new OutputStreamWriter(output)));
}

/**
* ...
*/
@Override
public void open(final OutputStream stream) throws IOException {
output = new PrintWriter(new BufferedWriter(new OutputStreamWriter(stream)));
}

/**
* @param ch the data to print
* @throws IOException if the {@link PrintWriter} has errors.
Expand Down
16 changes: 15 additions & 1 deletion src/edu/washington/escience/myria/PostgresBinaryTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class PostgresBinaryTupleWriter implements TupleWriter {
static final long serialVersionUID = 1L;

/** The ByteBuffer to write the output. */
private final DataOutputStream buffer;
private DataOutputStream buffer;

/**
* Constructs a {@link PostgresBinaryTupleWriter} object.
Expand All @@ -41,6 +41,20 @@ public PostgresBinaryTupleWriter(final OutputStream out) throws IOException {
buffer.writeInt(0);
}

/*
* ...
*/
@Override
public void open(final OutputStream stream) throws IOException {
buffer = new DataOutputStream(new BufferedOutputStream(stream));
// 11 bytes required header
buffer.writeBytes("PGCOPY\n\377\r\n\0");
// 32 bit integer indicating no OID
buffer.writeInt(0);
// 32 bit header extension area length
buffer.writeInt(0);
}

/*
* No-op
*/
Expand Down
6 changes: 6 additions & 0 deletions src/edu/washington/escience/myria/TupleWriter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.washington.escience.myria;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.List;

Expand All @@ -26,6 +27,11 @@ public interface TupleWriter extends Serializable {
/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/**
* ...
*/
void open(OutputStream stream) throws IOException;

/**
* Inform the {@link TupleWriter} of the column headers. In the standard case (CSV output, see {@link CsvTupleWriter}
* ), they are written out directly. In some cases, they may be cached and output with the data (see
Expand Down
2 changes: 1 addition & 1 deletion src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public Response deleteDataset(@PathParam("userName") final String userName,
* @throws DbException if there is an error in the database.
*/
@POST
@Path("/persist/user-{userName}/program-{programName}/relation-{relationName}/")
@Path("/user-{userName}/program-{programName}/relation-{relationName}/persist/")
public Response persistDataset(@PathParam("userName") final String userName,
@PathParam("programName") final String programName, @PathParam("relationName") final String relationName)
throws DbException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,10 +1467,6 @@ public void markRelationPersistent(@Nonnull final RelationKey relation) throws C
throw new CatalogException("Catalog is closed.");
}

if (isPersistentRelation(relation)) {
LOGGER.warn("Relation has already been deleted");
}

/* Do the work -- for now just mark int the catalog as persistent */
try {
queue.execute(new SQLiteJob<Void>() {
Expand Down
31 changes: 31 additions & 0 deletions src/edu/washington/escience/myria/io/HdfsDataSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package edu.washington.escience.myria.io;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Objects;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.fasterxml.jackson.annotation.JsonProperty;

public class HdfsDataSink implements DataSink {

@JsonProperty
private final URI uri;

public HdfsDataSink(@JsonProperty(value = "uri", required = true) final String uri) {
this.uri = URI.create(Objects.requireNonNull(uri, "Parameter uri cannot be null"));
}

@Override
public OutputStream getOutputStream() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(uri, conf);
Path rootPath = new Path(uri);

return fs.create(rootPath);
}
}
54 changes: 0 additions & 54 deletions src/edu/washington/escience/myria/io/UriSink.java

This file was deleted.

71 changes: 71 additions & 0 deletions src/edu/washington/escience/myria/operator/DataOutputRoot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package edu.washington.escience.myria.operator;

import java.io.IOException;

import com.google.common.collect.ImmutableMap;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.storage.TupleBatch;

/**
* RemoteDataOutput is ...
*/
public final class DataOutputRoot extends RootOperator {

/** Required for Java serialization. */
private static final long serialVersionUID = 1L;

private final DataSink dataSink;
private final TupleWriter tupleWriter;

private boolean done = false;

public DataOutputRoot(final Operator child, final TupleWriter tupleWriter, final DataSink dataSink) {
super(child);
this.dataSink = dataSink;
this.tupleWriter = tupleWriter;
}

@Override
protected void childEOI() throws DbException {
/* Do nothing. */
}

@Override
protected void childEOS() throws DbException {
try {
tupleWriter.done();
} catch (IOException e) {
throw new DbException(e);
}
done = true;
}

@Override
protected void consumeTuples(final TupleBatch tuples) throws DbException {
try {
tupleWriter.writeTuples(tuples);
} catch (IOException e) {
throw new DbException(e);
}
}

@Override
protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbException {
try {
tupleWriter.open(dataSink.getOutputStream());
tupleWriter.writeColumnHeaders(getChild().getSchema().getColumnNames());
} catch (IOException | SecurityException | IllegalArgumentException e) {
throw new DbException(e);
}
}

@Override
protected void cleanup() throws IOException {
if (!done) {
tupleWriter.error();
}
}
}

0 comments on commit f0990e5

Please sign in to comment.