Skip to content

Commit

Permalink
unify three encodings for DbQueryScan
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Jan 6, 2017
1 parent 98bec24 commit 7f5ad79
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 172 deletions.
18 changes: 17 additions & 1 deletion src/edu/washington/escience/myria/RelationKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

/**
* This class holds the key that identifies a relation. The notation is user.program.relation.
*
*/
public final class RelationKey implements Serializable {

Expand Down Expand Up @@ -182,4 +181,21 @@ public boolean equals(final Object other) {
public static RelationKey ofTemp(final long queryId, final String table) {
return RelationKey.of("q_" + queryId, "temp", table);
}

/**
* @return the query ID if the relation is a temp relation, null otherwise
*/
public Long tempRelationQueryId() {
if (isTemp()) {
return Long.parseLong(userName.substring(2));
}
return null;
}

/**
* @return if it is a temp relation.
*/
public boolean isTemp() {
return programName.equals("temp");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.washington.escience.myria.api.encoding;

import java.util.List;

import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.DbQueryScan;

public abstract class AbstractQueryScanEncoding extends LeafOperatorEncoding<DbQueryScan> {
/** If it needs to be debroadcasted. */
public boolean debroadcast;

/**
* @param args
* @return the list of relation keys being touched.
*/
public abstract List<RelationKey> sourceRelationKeys(ConstructArgs args);
}
75 changes: 19 additions & 56 deletions src/edu/washington/escience/myria/api/encoding/QueryConstruct.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import edu.washington.escience.myria.operator.network.CollectProducer;
import edu.washington.escience.myria.operator.network.Consumer;
import edu.washington.escience.myria.operator.network.EOSController;
import edu.washington.escience.myria.operator.network.distribute.BroadcastDistributeFunction;
import edu.washington.escience.myria.operator.network.distribute.DistributeFunction;
import edu.washington.escience.myria.parallel.ExchangePairID;
import edu.washington.escience.myria.parallel.JsonSubQuery;
import edu.washington.escience.myria.parallel.RelationWriteMetadata;
Expand Down Expand Up @@ -181,66 +179,31 @@ private static void setAndVerifyScans(
final List<PlanFragmentEncoding> fragments, final ConstructArgs args)
throws CatalogException, DbException {
Server server = args.getServer();

for (PlanFragmentEncoding fragment : fragments) {
for (OperatorEncoding<?> operator : fragment.operators) {
Set<Integer> scanWorkers = ImmutableSet.of();
String scanRelation;
boolean debroadcast = false;

if (operator instanceof TableScanEncoding) {
TableScanEncoding scan = ((TableScanEncoding) operator);
debroadcast = scan.debroadcast;
scanRelation = scan.relationKey.toString();
scanWorkers = server.getWorkersForRelation(scan.relationKey, scan.storedRelationId);
} else if (operator instanceof TempTableScanEncoding) {
TempTableScanEncoding scan = ((TempTableScanEncoding) operator);
debroadcast = scan.debroadcast;
RelationKey relationKey = RelationKey.ofTemp(args.getQueryId(), scan.table);
scanRelation = "temporary relation " + scan.table;
scanWorkers =
server.getQueryManager().getWorkersForTempRelation(args.getQueryId(), relationKey);
} else if (operator instanceof QueryScanEncoding) {
QueryScanEncoding scan = ((QueryScanEncoding) operator);
debroadcast = scan.debroadcast;
scanRelation = "(source relations for query scan):";
int relationIdx = 0;
for (RelationKey relationKey : scan.sourceRelationKeys) {
scanRelation += " " + relationKey.toString();
Set<Integer> workersForRelation = server.getWorkersForRelation(relationKey, null);
// Guava's set operations don't accept null
if (workersForRelation == null) {
workersForRelation = ImmutableSet.of();
}
// REVIEW: This logic will work for broadcast relations stored on
// distinct but overlapping sets of workers, but where will it break?
if (relationIdx == 0) {
scanWorkers = workersForRelation;
} else {
scanWorkers = Sets.intersection(workersForRelation, scanWorkers);
}
++relationIdx;
}
LOGGER.info(
"DbQueryScan operator for relations {} assigned to workers {}",
scanRelation,
Joiner.on(',').join(scanWorkers));
} else {
if (!(operator instanceof AbstractQueryScanEncoding)) {
continue;
}
Preconditions.checkArgument(
scanWorkers != null && !scanWorkers.isEmpty(),
"Unable to find workers that store %s",
scanRelation);
if (debroadcast) {
Set<Integer> scanWorkers = server.getWorkers().keySet();
AbstractQueryScanEncoding scan = ((AbstractQueryScanEncoding) operator);
for (RelationKey relationKey : scan.sourceRelationKeys(args)) {
/* Assuming only workers storing all touched tables will be assigned the scan to. */
scanWorkers = Sets.intersection(server.getWorkersForRelation(relationKey), scanWorkers);
}
Set<Integer> aliveScanWorkers = Sets.intersection(scanWorkers, server.getAliveWorkers());
if (scan.debroadcast && aliveScanWorkers.size() > 0) {
// we need to pick a single worker that is alive
Set<Integer> aliveScanWorkers = Sets.intersection(scanWorkers, server.getAliveWorkers());
scanWorkers = ImmutableSet.of(aliveScanWorkers.iterator().next());
aliveScanWorkers = scanWorkers = ImmutableSet.of(aliveScanWorkers.iterator().next());
}
/*
* Note: the current assumption is that all the partitions need to be scanned. This will not be true if we have
* data replication, or allow to scan only a subset of the partitions. Revise if needed.
*/
String scanRelation = Joiner.on(',').join(scan.sourceRelationKeys(args));
Preconditions.checkArgument(
aliveScanWorkers.size() == scanWorkers.size(),
"Not all the workers needed for retrieving %s are alive",
scanRelation);
LOGGER.info(
"DbQueryScan operator for relations {} assigned to workers {}",
scanRelation,
Joiner.on(',').join(scanWorkers));
setOrVerifyFragmentWorkers(fragment, scanWorkers, "Setting workers for " + scanRelation);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.DbQueryScan;

public class QueryScanEncoding extends LeafOperatorEncoding<DbQueryScan> {
public class QueryScanEncoding extends AbstractQueryScanEncoding {
@Required public Schema schema;
@Required public String sql;
@Required public List<RelationKey> sourceRelationKeys;
public boolean debroadcast;

public List<RelationKey> sourceRelationKeys(ConstructArgs args) {
return sourceRelationKeys;
}

@Override
public DbQueryScan construct(ConstructArgs args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package edu.washington.escience.myria.api.encoding;

import java.util.List;

import javax.ws.rs.core.Response.Status;

import com.google.common.base.Preconditions;
Expand All @@ -11,12 +13,20 @@
import edu.washington.escience.myria.coordinator.CatalogException;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.parallel.Server;
import jersey.repackaged.com.google.common.collect.ImmutableList;

public class TableScanEncoding extends LeafOperatorEncoding<DbQueryScan> {
public class TableScanEncoding extends AbstractQueryScanEncoding {
/** The name of the relation to be scanned. */
@Required public RelationKey relationKey;
/**
* This field is not used by RACO yet but reserved for specifying physical representations of the same logical
* relation key.
*/
public Integer storedRelationId;
public boolean debroadcast;

public List<RelationKey> sourceRelationKeys(ConstructArgs args) {
return ImmutableList.of(relationKey);
}

@Override
public DbQueryScan construct(ConstructArgs args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package edu.washington.escience.myria.api.encoding;

import java.util.List;

import com.google.common.base.Preconditions;

import edu.washington.escience.myria.RelationKey;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.api.encoding.QueryConstruct.ConstructArgs;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.parallel.Server;
import jersey.repackaged.com.google.common.collect.ImmutableList;

public class TempTableScanEncoding extends LeafOperatorEncoding<DbQueryScan> {
public class TempTableScanEncoding extends AbstractQueryScanEncoding {
/** The name of the relation to be scanned. */
@Required public String table;
public boolean debroadcast;

public List<RelationKey> sourceRelationKeys(ConstructArgs args) {
return ImmutableList.of(RelationKey.ofTemp(args.getQueryId(), table));
}

@Override
public DbQueryScan construct(ConstructArgs args) {
Expand Down

0 comments on commit 7f5ad79

Please sign in to comment.