Skip to content

Commit

Permalink
Merge pull request #751 from uwescience/catalog-scan
Browse files Browse the repository at this point in the history
Implement catalog scan
  • Loading branch information
domoritz committed Jun 10, 2015
2 parents 7fe054f + 278316b commit 5eca0e5
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -25,9 +24,6 @@
import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.column.Column;
import edu.washington.escience.myria.column.builder.ColumnBuilder;
import edu.washington.escience.myria.column.builder.ColumnFactory;
import edu.washington.escience.myria.storage.TupleBatch;

/**
Expand Down Expand Up @@ -232,7 +228,7 @@ public Iterator<TupleBatch> tupleBatchIteratorFromQuery(final String queryString
throw new DbException(e);
}

return new SQLiteTupleBatchIterator(statement, schema, sqliteConnection);
return new SQLiteTupleBatchIterator(statement, sqliteConnection, schema);
}

@Override
Expand Down Expand Up @@ -456,99 +452,3 @@ public void createIndexIfNotExists(final RelationKey relationKey, final Schema s
throw new UnsupportedOperationException("create index if not exists is not supported in sqlite yet, implement me");
}
}

/**
* Wraps a SQLiteStatement result set in a Iterator<TupleBatch>.
*
*
*/
class SQLiteTupleBatchIterator implements Iterator<TupleBatch> {
/** The logger for this class. Uses SQLiteAccessMethod settings. */
private static final Logger LOGGER = LoggerFactory.getLogger(SQLiteAccessMethod.class);
/** The results from a SQLite query that will be returned in TupleBatches by this Iterator. */
private final SQLiteStatement statement;
/** The connection to the SQLite database. */
private final SQLiteConnection connection;
/** The Schema of the TupleBatches returned by this Iterator. */
private final Schema schema;

/**
* Wraps a SQLiteStatement result set in an Iterator<TupleBatch>.
*
* @param statement the SQLiteStatement containing the results.
* @param schema the Schema describing the format of the TupleBatch containing these results.
* @param connection the connection to the SQLite database.
*/
SQLiteTupleBatchIterator(final SQLiteStatement statement, final Schema schema, final SQLiteConnection connection) {
this.statement = statement;
this.connection = connection;
this.schema = schema;
}

/**
* Wraps a SQLiteStatement result set in an Iterator<TupleBatch>.
*
* @param statement the SQLiteStatement containing the results. If it has not yet stepped, this constructor will step
* it. Then the Schema of the generated TupleBatchs will be extracted from the statement.
* @param connection the connection to the SQLite database.
* @param schema the Schema describing the format of the TupleBatch containing these results.
*/
SQLiteTupleBatchIterator(final SQLiteStatement statement, final SQLiteConnection connection, final Schema schema) {
this.connection = connection;
this.statement = statement;
try {
if (!statement.hasStepped()) {
statement.step();
}
this.schema = schema;
} catch (final SQLiteException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasNext() {
final boolean hasRow = statement.hasRow();
if (!hasRow) {
statement.dispose();
connection.dispose();
}
return hasRow;
}

@Override
public TupleBatch next() {
/* Allocate TupleBatch parameters */
final int numFields = schema.numColumns();
final List<ColumnBuilder<?>> columnBuilders = ColumnFactory.allocateColumns(schema);

/**
* Loop through resultSet, adding one row at a time. Stop when numTuples hits BATCH_SIZE or there are no more
* results.
*/
int numTuples;
try {
for (numTuples = 0; numTuples < TupleBatch.BATCH_SIZE && statement.hasRow(); ++numTuples) {
for (int column = 0; column < numFields; ++column) {
columnBuilders.get(column).appendFromSQLite(statement, column);
}
statement.step();
}
} catch (final SQLiteException e) {
LOGGER.error("Got SQLiteException:" + e + "in TupleBatchIterator.next()");
throw new RuntimeException(e);
}

List<Column<?>> columns = new ArrayList<Column<?>>(columnBuilders.size());
for (ColumnBuilder<?> cb : columnBuilders) {
columns.add(cb.build());
}

return new TupleBatch(schema, columns, numTuples);
}

@Override
public void remove() {
throw new UnsupportedOperationException("SQLiteTupleBatchIterator.remove()");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
*
*/
package edu.washington.escience.myria.accessmethod;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.almworks.sqlite4java.SQLiteConnection;
import com.almworks.sqlite4java.SQLiteException;
import com.almworks.sqlite4java.SQLiteStatement;

import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.column.Column;
import edu.washington.escience.myria.column.builder.ColumnBuilder;
import edu.washington.escience.myria.column.builder.ColumnFactory;
import edu.washington.escience.myria.storage.TupleBatch;

/**
* Wraps a SQLiteStatement result set in a Iterator<TupleBatch>.
*
*/
public class SQLiteTupleBatchIterator implements Iterator<TupleBatch> {
/** The logger for this class. Uses SQLiteAccessMethod settings. */
private static final Logger LOGGER = LoggerFactory.getLogger(SQLiteAccessMethod.class);
/** The results from a SQLite query that will be returned in TupleBatches by this Iterator. */
private final SQLiteStatement statement;
/** The connection to the SQLite database. */
private final SQLiteConnection connection;
/** The Schema of the TupleBatches returned by this Iterator. */
private final Schema schema;

/**
* Wraps a SQLiteStatement result set in an Iterator<TupleBatch>.
*
* @param statement the SQLiteStatement containing the results. If it has not yet stepped, this constructor will step
* it. Then the Schema of the generated TupleBatchs will be extracted from the statement.
* @param connection the connection to the SQLite database.
* @param schema the Schema describing the format of the TupleBatch containing these results.
*/
public SQLiteTupleBatchIterator(final SQLiteStatement statement, final SQLiteConnection connection,
final Schema schema) {
this.connection = connection;
this.statement = statement;
try {
if (!statement.hasStepped()) {
statement.step();
}
this.schema = schema;
} catch (final SQLiteException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasNext() {
final boolean hasRow = statement.hasRow();
if (!hasRow) {
statement.dispose();
connection.dispose();
}
return hasRow;
}

@Override
public TupleBatch next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

/* Allocate TupleBatch parameters */
final int numFields = schema.numColumns();
final List<ColumnBuilder<?>> columnBuilders = ColumnFactory.allocateColumns(schema);

/**
* Loop through resultSet, adding one row at a time. Stop when numTuples hits BATCH_SIZE or there are no more
* results.
*/
int numTuples;
try {
for (numTuples = 0; numTuples < TupleBatch.BATCH_SIZE && statement.hasRow(); ++numTuples) {
for (int column = 0; column < numFields; ++column) {
columnBuilders.get(column).appendFromSQLite(statement, column);
}
statement.step();
}
} catch (final SQLiteException e) {
LOGGER.error("Got SQLiteException:" + e + "in TupleBatchIterator.next()");
throw new RuntimeException(e);
}

List<Column<?>> columns = new ArrayList<Column<?>>(columnBuilders.size());
for (ColumnBuilder<?> cb : columnBuilders) {
columns.add(cb.build());
}

return new TupleBatch(schema, columns, numTuples);
}

@Override
public void remove() {
throw new UnsupportedOperationException("SQLiteTupleBatchIterator.remove()");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.washington.escience.myria.api.encoding;

import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.CatalogQueryScan;

public class CatalogScanEncoding extends LeafOperatorEncoding<CatalogQueryScan> {
@Required
public Schema schema;
@Required
public String sql;

@Override
public CatalogQueryScan construct(final ConstructArgs args) {
return new CatalogQueryScan(sql, schema, args.getServer().getCatalog());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@Type(name = "BinaryFileScan", value = BinaryFileScanEncoding.class),
@Type(name = "BroadcastConsumer", value = BroadcastConsumerEncoding.class),
@Type(name = "BroadcastProducer", value = BroadcastProducerEncoding.class),
@Type(name = "CatalogScan", value = CatalogScanEncoding.class),
@Type(name = "CollectConsumer", value = CollectConsumerEncoding.class),
@Type(name = "CollectProducer", value = CollectProducerEncoding.class),
@Type(name = "Consumer", value = ConsumerEncoding.class), @Type(name = "Counter", value = CounterEncoding.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -17,8 +18,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import jersey.repackaged.com.google.common.collect.ImmutableSet;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,13 +33,15 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;

import edu.washington.escience.myria.MyriaConstants.FTMode;
import edu.washington.escience.myria.MyriaConstants.ProfilingMode;
import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.accessmethod.SQLiteTupleBatchIterator;
import edu.washington.escience.myria.api.MyriaJsonMapperProvider;
import edu.washington.escience.myria.api.encoding.DatasetStatus;
import edu.washington.escience.myria.api.encoding.QueryEncoding;
Expand All @@ -51,6 +52,7 @@
import edu.washington.escience.myria.parallel.RelationWriteMetadata;
import edu.washington.escience.myria.parallel.SocketInfo;
import edu.washington.escience.myria.parallel.SubQueryId;
import edu.washington.escience.myria.storage.TupleBatch;

/**
* This class is intended to store the configuration information for a Myria installation.
Expand Down Expand Up @@ -1983,4 +1985,30 @@ protected String job(final SQLiteConnection sqliteConnection) throws CatalogExce
throw new CatalogException(e);
}
}

/**
* Run q query on the catalog.
*
* @param queryString a SQL query on the catalog
* @param outputSchema the schema of the query result
* @return a tuple iterator over the result
* @throws CatalogException if there is an error.
*/
public Iterator<TupleBatch> tupleBatchIteratorFromQuery(final String queryString, final Schema outputSchema)
throws CatalogException {
try {
return queue.execute(new SQLiteJob<Iterator<TupleBatch>>() {
@Override
protected Iterator<TupleBatch> job(final SQLiteConnection sqliteConnection) throws CatalogException,
SQLiteException {
SQLiteStatement statement = sqliteConnection.prepare(queryString);
List<TupleBatch> tuples = Lists.newLinkedList();
Iterators.addAll(tuples, new SQLiteTupleBatchIterator(statement, sqliteConnection, outputSchema));
return tuples.iterator();
}
}).get();
} catch (InterruptedException | ExecutionException e) {
throw new CatalogException(e);
}
}
}

0 comments on commit 5eca0e5

Please sign in to comment.