Skip to content

Commit

Permalink
fix broadcast and round robin calls
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Feb 14, 2017
1 parent a70ed21 commit 0eced3c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ public long createView(
workerId,
new SubQueryPlan(
new DbCreateView(
EmptyRelation.of(Schema.EMPTY_SCHEMA), viewName, viewDefinition, true, null)));
EmptyRelation.of(Schema.EMPTY_SCHEMA), viewName, viewDefinition, false, null)));
}
ListenableFuture<Query> qf =
queryManager.submitQuery(
Expand Down Expand Up @@ -1156,7 +1156,7 @@ public long createMaterializedView(
workerId,
new SubQueryPlan(
new DbCreateView(
EmptyRelation.of(Schema.EMPTY_SCHEMA), viewName, viewDefinition, false, null)));
EmptyRelation.of(Schema.EMPTY_SCHEMA), viewName, viewDefinition, true, null)));
}
ListenableFuture<Query> qf =
queryManager.submitQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;

import edu.washington.escience.myria.CsvTupleReader;
import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.MyriaConstants;
import edu.washington.escience.myria.RelationKey;
Expand All @@ -35,6 +36,7 @@
import edu.washington.escience.myria.operator.EOSSource;
import edu.washington.escience.myria.operator.EmptySink;
import edu.washington.escience.myria.operator.RootOperator;
import edu.washington.escience.myria.operator.TupleSource;
import edu.washington.escience.myria.operator.network.Consumer;
import edu.washington.escience.myria.operator.network.GenericShuffleProducer;
import edu.washington.escience.myria.operator.network.distribute.BroadcastDistributeFunction;
Expand Down Expand Up @@ -137,20 +139,19 @@ public HashMap<Integer, RelationKey> ingestFact(final PerfEnforceTableEncoding f
factTableDesc.relationKey.getProgramName(),
factTableDesc.relationKey.getRelationName() + currentSize + "_U");

final ExchangePairID shuffleId = ExchangePairID.newID();
DbQueryScan scan = new DbQueryScan(previousRelationKey, factTableDesc.schema);

int[] producingWorkers =
PerfEnforceUtils.getRangeInclusiveArray(Collections.min(diff), Collections.max(diff));
int[] receivingWorkers =
PerfEnforceUtils.getRangeInclusiveArray(1, Collections.max(currentWorkerRange));
final ExchangePairID shuffleId = ExchangePairID.newID();

RoundRobinDistributeFunction df = new RoundRobinDistributeFunction();
df.setDestinations(receivingWorkers.length, 1);
GenericShuffleProducer producer =
new GenericShuffleProducer(
scan,
new ExchangePairID[] {shuffleId},
receivingWorkers,
new RoundRobinDistributeFunction());
scan, new ExchangePairID[] {shuffleId}, receivingWorkers, df);
Consumer consumer = new Consumer(factTableDesc.schema, shuffleId, producingWorkers);
DbInsert insert = new DbInsert(consumer, currentRelationKeyToUnion, true);

Expand Down Expand Up @@ -200,37 +201,18 @@ public void ingestDimension(final PerfEnforceTableEncoding dimTableDesc)
PerfEnforceUtils.getWorkerRangeSet(Collections.max(PerfEnforceDriver.configurations));

try {
server.parallelIngestDataset(
dimTableDesc.relationKey,
dimTableDesc.schema,
dimTableDesc.delimiter,
null,
null,
null,
dimTableDesc.source,
totalWorkers,
null);

DbQueryScan dbscan = new DbQueryScan(dimTableDesc.relationKey, dimTableDesc.schema);

final ExchangePairID broadcastID = ExchangePairID.newID();

GenericShuffleProducer producer =
new GenericShuffleProducer(
dbscan,
new ExchangePairID[] {broadcastID},
PerfEnforceUtils.getRangeInclusiveArray(
Collections.min(totalWorkers), Collections.max(totalWorkers)),
new BroadcastDistributeFunction());
TupleSource source =
new TupleSource(
new CsvTupleReader(dimTableDesc.schema, dimTableDesc.delimiter), dimTableDesc.source);

Consumer consumer = new Consumer(dimTableDesc.schema, broadcastID, totalWorkers);
DbInsert insert = new DbInsert(consumer, dimTableDesc.relationKey, true);
Map<Integer, RootOperator[]> workerPlans = new HashMap<>(totalWorkers.size());
for (Integer workerID : totalWorkers) {
workerPlans.put(workerID, new RootOperator[] {producer, insert});
}
server.ingestDataset(
dimTableDesc.relationKey,
new ArrayList<Integer>(totalWorkers),
null,
source,
new BroadcastDistributeFunction());

server.submitQueryPlan(new EmptySink(new EOSSource()), workerPlans).get();
} catch (Exception e) {
throw new PerfEnforceException("Error ingesting dimension tables");
}
Expand Down

0 comments on commit 0eced3c

Please sign in to comment.