Skip to content

Commit

Permalink
Address comments from review and simplify code.
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Jun 10, 2015
1 parent 57059bb commit 278316b
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 64 deletions.
Expand Up @@ -228,7 +228,7 @@ public Iterator<TupleBatch> tupleBatchIteratorFromQuery(final String queryString
throw new DbException(e);
}

return new SQLiteTupleBatchIterator(statement, schema, sqliteConnection);
return new SQLiteTupleBatchIterator(statement, sqliteConnection, schema);
}

@Override
Expand Down
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -34,19 +35,6 @@ public class SQLiteTupleBatchIterator implements Iterator<TupleBatch> {
/** The Schema of the TupleBatches returned by this Iterator. */
private final Schema schema;

/**
* Wraps a SQLiteStatement result set in an Iterator<TupleBatch>.
*
* @param statement the SQLiteStatement containing the results.
* @param schema the Schema describing the format of the TupleBatch containing these results.
* @param connection the connection to the SQLite database.
*/
SQLiteTupleBatchIterator(final SQLiteStatement statement, final Schema schema, final SQLiteConnection connection) {
this.statement = statement;
this.connection = connection;
this.schema = schema;
}

/**
* Wraps a SQLiteStatement result set in an Iterator<TupleBatch>.
*
Expand Down Expand Up @@ -81,6 +69,10 @@ public boolean hasNext() {

@Override
public TupleBatch next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

/* Allocate TupleBatch parameters */
final int numFields = schema.numColumns();
final List<ColumnBuilder<?>> columnBuilders = ColumnFactory.allocateColumns(schema);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;

import edu.washington.escience.myria.MyriaConstants.FTMode;
Expand Down Expand Up @@ -2002,10 +2003,7 @@ protected Iterator<TupleBatch> job(final SQLiteConnection sqliteConnection) thro
SQLiteException {
SQLiteStatement statement = sqliteConnection.prepare(queryString);
List<TupleBatch> tuples = Lists.newLinkedList();
Iterator<TupleBatch> iter = new SQLiteTupleBatchIterator(statement, sqliteConnection, outputSchema);
while (iter.hasNext()) {
tuples.add(iter.next());
}
Iterators.addAll(tuples, new SQLiteTupleBatchIterator(statement, sqliteConnection, outputSchema));
return tuples.iterator();
}
}).get();
Expand Down
21 changes: 5 additions & 16 deletions src/edu/washington/escience/myria/operator/CatalogQueryScan.java
Expand Up @@ -3,17 +3,15 @@
import java.util.Iterator;
import java.util.Objects;

import com.google.common.collect.ImmutableMap;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.coordinator.catalog.CatalogException;
import edu.washington.escience.myria.coordinator.catalog.MasterCatalog;
import edu.washington.escience.myria.storage.TupleBatch;

/**
* Push a select query down into a JDBC based database and scan over the query result.
* */
* Operator to get the result of a query on the catalog. The catalog is a SQLite database.
*/
public class CatalogQueryScan extends LeafOperator {

/**
Expand Down Expand Up @@ -50,14 +48,9 @@ public class CatalogQueryScan extends LeafOperator {
* @param catalog see the corresponding field.
* */
public CatalogQueryScan(final String sql, final Schema outputSchema, final MasterCatalog catalog) {
Objects.requireNonNull(sql);
Objects.requireNonNull(outputSchema);
Objects.requireNonNull(catalog);

this.sql = sql;
this.outputSchema = outputSchema;
this.catalog = catalog;
tuples = null;
this.sql = Objects.requireNonNull(sql);;
this.outputSchema = Objects.requireNonNull(outputSchema);
this.catalog = Objects.requireNonNull(catalog);
}

@Override
Expand Down Expand Up @@ -87,8 +80,4 @@ protected final TupleBatch fetchNextReady() throws DbException {
public final Schema generateSchema() {
return outputSchema;
}

@Override
protected final void init(final ImmutableMap<String, Object> execEnvVars) throws DbException {
}
}
35 changes: 8 additions & 27 deletions src/edu/washington/escience/myria/operator/DbQueryScan.java
Expand Up @@ -68,13 +68,8 @@ public class DbQueryScan extends LeafOperator implements DbReader {
* @param outputSchema see the corresponding field.
* */
public DbQueryScan(final String baseSQL, final Schema outputSchema) {
Objects.requireNonNull(baseSQL);
Objects.requireNonNull(outputSchema);

this.baseSQL = baseSQL;
this.outputSchema = outputSchema;
connectionInfo = null;
tuples = null;
this.baseSQL = Objects.requireNonNull(baseSQL);
this.outputSchema = Objects.requireNonNull(outputSchema);
sortedColumns = null;
ascending = null;
}
Expand All @@ -88,8 +83,7 @@ public DbQueryScan(final String baseSQL, final Schema outputSchema) {
* */
public DbQueryScan(final ConnectionInfo connectionInfo, final String baseSQL, final Schema outputSchema) {
this(baseSQL, outputSchema);
Objects.requireNonNull(connectionInfo);
this.connectionInfo = connectionInfo;
this.connectionInfo = Objects.requireNonNull(connectionInfo);
}

/**
Expand All @@ -99,14 +93,8 @@ public DbQueryScan(final ConnectionInfo connectionInfo, final String baseSQL, fi
* @param outputSchema the Schema of the returned tuples.
*/
public DbQueryScan(final RelationKey relationKey, final Schema outputSchema) {
Objects.requireNonNull(relationKey);
Objects.requireNonNull(outputSchema);

this.relationKey = relationKey;
this.outputSchema = outputSchema;
baseSQL = null;
connectionInfo = null;
tuples = null;
this.relationKey = Objects.requireNonNull(relationKey);
this.outputSchema = Objects.requireNonNull(outputSchema);
sortedColumns = null;
ascending = null;
}
Expand All @@ -121,8 +109,7 @@ public DbQueryScan(final RelationKey relationKey, final Schema outputSchema) {
*/
public DbQueryScan(final ConnectionInfo connectionInfo, final RelationKey relationKey, final Schema outputSchema) {
this(relationKey, outputSchema);
Objects.requireNonNull(connectionInfo);
this.connectionInfo = connectionInfo;
this.connectionInfo = Objects.requireNonNull(connectionInfo);
}

/**
Expand All @@ -135,16 +122,10 @@ public DbQueryScan(final ConnectionInfo connectionInfo, final RelationKey relati
*/
public DbQueryScan(final RelationKey relationKey, final Schema outputSchema, final int[] sortedColumns,
final boolean[] ascending) {
Objects.requireNonNull(relationKey);
Objects.requireNonNull(outputSchema);

this.relationKey = relationKey;
this.outputSchema = outputSchema;
this.relationKey = Objects.requireNonNull(relationKey);
this.outputSchema = Objects.requireNonNull(outputSchema);
this.sortedColumns = sortedColumns;
this.ascending = ascending;
baseSQL = null;
connectionInfo = null;
tuples = null;
}

/**
Expand Down
Expand Up @@ -12,8 +12,6 @@
import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
Expand Down Expand Up @@ -59,7 +57,7 @@ public void Cleanup() {

@Test
public final void testQueryQueries() throws DbException, CatalogException {
Schema schema = new Schema(ImmutableList.of(Type.LONG_TYPE, Type.STRING_TYPE), ImmutableList.of("id", "raw"));
Schema schema = Schema.ofFields(Type.LONG_TYPE, "id", Type.STRING_TYPE, "raw");
CatalogQueryScan scan = new CatalogQueryScan("select query_id, raw_query from queries", schema, catalog);
scan.open(null);

Expand Down

1 comment on commit 278316b

@senderista
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Please sign in to comment.