Skip to content

Commit

Permalink
working version/output queryID
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Oct 23, 2015
1 parent f4ec6f1 commit 9a180f9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 43 deletions.
41 changes: 5 additions & 36 deletions src/edu/washington/escience/myria/HdfsTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.List;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import edu.washington.escience.myria.storage.ReadableTable;

Expand All @@ -21,18 +19,11 @@ public class HdfsTupleWriter implements TupleWriter {
/** Required for Java serialization. */
static final long serialVersionUID = 1L;

/** The logger for this class. */
private static final Logger LOGGER = LoggerFactory.getLogger(HdfsTupleWriter.class);

private static FSDataOutputStream outStream;

public HdfsTupleWriter() {
}

public HdfsTupleWriter(final OutputStream out) {
outStream = (FSDataOutputStream) out;
}

@Override
public void open(final OutputStream stream) {
outStream = (FSDataOutputStream) stream;
Expand All @@ -44,37 +35,15 @@ public void writeColumnHeaders(final List<String> columnNames) throws IOExceptio

@Override
public void writeTuples(final ReadableTable tuples) throws IOException {

List<Type> columnTypes = tuples.getSchema().getColumnTypes();
/* Write each row to the output stream */
for (int i = 0; i < tuples.numTuples(); ++i) {
String row = "";
for (int j = 0; j < tuples.numColumns(); ++j) {
switch (columnTypes.get(j)) {
case BOOLEAN_TYPE:
outStream.writeBoolean(tuples.getBoolean(j, i));
break;
case DOUBLE_TYPE:
outStream.writeDouble(tuples.getDouble(j, i));
break;
case FLOAT_TYPE:
outStream.writeFloat(tuples.getFloat(j, i));
break;
case INT_TYPE:
outStream.writeInt(tuples.getInt(j, i));
break;
case LONG_TYPE:
outStream.writeLong(tuples.getLong(j, i));
break;
case DATETIME_TYPE: // outStream.writeUTF(tuples.getDateTime(j,i));
break;
case STRING_TYPE:
outStream.writeChars(tuples.getString(j, i));
break;
default:
break;
}
row += tuples.getObject(j, i).toString();
row += j == tuples.numColumns() - 1 ? '\n' : ',';
}
outStream.writeUTF(row);
}

}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,18 @@ public Response persistDataset(@PathParam("userName") final String userName,

DatasetStatus status = server.getDatasetStatus(RelationKey.of(userName, programName, relationName));
RelationKey relationKey = status.getRelationKey();
long queryID;

try {
status = server.persistDataset(relationKey);
queryID = server.persistDataset(relationKey);
} catch (Exception e) {
throw new DbException();
}

/* Build the response */
ResponseBuilder response = Response.ok();

return response.entity(status.getQueryId()).build();
return response.entity(queryID).build();
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/edu/washington/escience/myria/operator/DataOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.collect.ImmutableMap;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.HdfsTupleWriter;
import edu.washington.escience.myria.TupleWriter;
import edu.washington.escience.myria.io.DataSink;
import edu.washington.escience.myria.storage.TupleBatch;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected void consumeTuples(final TupleBatch tuples) throws DbException {
@Override
protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbException {
try {
if (tupleWriter == null) {
if (tupleWriter instanceof HdfsTupleWriter) {
tupleWriter.open(dataSink.getOutputStream());
}
tupleWriter.writeColumnHeaders(getChild().getSchema().getColumnNames());
Expand Down
9 changes: 5 additions & 4 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,12 @@ public DatasetStatus deleteDataset(final RelationKey relationKey) throws DbExcep

/**
* @param relationKey the relationKey of the dataset to delete
* @return the status
* @return the queryID
* @throws DbException if there is an error
* @throws InterruptedException interrupted
*/
public DatasetStatus persistDataset(final RelationKey relationKey) throws DbException, InterruptedException {
public long persistDataset(final RelationKey relationKey) throws DbException, InterruptedException {
long queryID;

/* Mark the relation as is_persistent */
try {
Expand All @@ -1094,14 +1095,14 @@ public DatasetStatus persistDataset(final RelationKey relationKey) throws DbExce
"persisting from " + relationKey.toString(getDBMS()), new SubQueryPlan(new SinkRoot(new EOSSource())),
workerPlans);
try {
qf.get();
queryID = qf.get().getQueryId();
} catch (ExecutionException e) {
throw new DbException("Error executing query", e.getCause());
}
} catch (CatalogException e) {
throw new DbException(e);
}
return getDatasetStatus(relationKey);
return queryID;
}

/**
Expand Down

0 comments on commit 9a180f9

Please sign in to comment.