Skip to content

Commit

Permalink
derive numDestinations from partitionToDestinations
Browse files Browse the repository at this point in the history
  • Loading branch information
jingjingwang committed Jan 4, 2017
1 parent efb3a95 commit 54486a2
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ public class BroadcastProducerEncoding extends AbstractProducerEncoding<GenericS

@Override
public GenericShuffleProducer construct(ConstructArgs args) {
return new GenericShuffleProducer(
null,
getRealOperatorIds().toArray(new ExchangePairID[] {}),
Ints.toArray(getRealWorkerIds()),
new BroadcastDistributeFunction(getRealWorkerIds().size()));
GenericShuffleProducer ret =
new GenericShuffleProducer(
null,
getRealOperatorIds().toArray(new ExchangePairID[] {}),
Ints.toArray(getRealWorkerIds()),
new BroadcastDistributeFunction());
ret.getDistributeFunction().setDestinations(getRealWorkerIds().size(), 1);
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class GenericShuffleProducerEncoding
public GenericShuffleProducer construct(final ConstructArgs args) {
Set<Integer> workerIds = getRealWorkerIds();
List<ExchangePairID> operatorIds = getRealOperatorIds();
distributeFunction.setNumDestinations(workerIds.size(), operatorIds.size());
distributeFunction.setDestinations(workerIds.size(), operatorIds.size());
GenericShuffleProducer producer =
new GenericShuffleProducer(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public CollectProducer(
child,
new ExchangePairID[] {operatorID},
new int[] {consumerWorkerID},
new BroadcastDistributeFunction(1));
new BroadcastDistributeFunction());
this.distributeFunction.setDestinations(1, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class GenericShuffleProducer extends Producer {
private static final long serialVersionUID = 1L;

/** the distribute function. */
private final DistributeFunction distributeFunction;
protected final DistributeFunction distributeFunction;

/**
* Shuffle to multiple operator IDs on multiple workers. The most generic constructor.
Expand Down Expand Up @@ -69,4 +69,11 @@ protected final void childEOI() throws DbException {
writePartitionsIntoChannels(
distributeFunction.distribute(TupleBatch.eoiTupleBatch(getSchema())));
}

/**
* @return the distribute function
*/
public final DistributeFunction getDistributeFunction() {
return distributeFunction;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public LocalMultiwayProducer(final Operator child, final ExchangePairID[] operat
child,
operatorIDs,
new int[] {IPCConnectionPool.SELF_IPC_ID},
new BroadcastDistributeFunction(operatorIDs.length));
new BroadcastDistributeFunction());
this.distributeFunction.setDestinations(1, operatorIDs.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,8 @@ public BroadcastDistributeFunction() {
super(new SinglePartitionFunction());
}

/**
* @param numDestinations number of destination
*/
public BroadcastDistributeFunction(final int numDestinations) {
this();
partitionToDestination = MyriaArrayUtils.create2DHorizontalIndexList(numDestinations);
this.numDestinations = numDestinations;
}

@Override
public void setNumDestinations(final int numWorker, final int numOperatorId) {
public void setDestinations(final int numWorker, final int numOperatorId) {
partitionToDestination = MyriaArrayUtils.create2DHorizontalIndexList(numWorker * numOperatorId);
numDestinations = numWorker * numOperatorId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -37,9 +39,6 @@ public abstract class DistributeFunction implements Serializable {
/** The mapping from partitions to destinations. */
protected List<List<Integer>> partitionToDestination;

/** number of destinations. */
protected int numDestinations;

/**
* @param partitionFunction partition function.
*/
Expand All @@ -54,11 +53,11 @@ public DistributeFunction(final PartitionFunction partitionFunction) {
public List<List<TupleBatch>> distribute(@Nonnull final TupleBatch data) {
List<List<TupleBatch>> result = new ArrayList<List<TupleBatch>>();
if (data.isEOI()) {
for (int i = 0; i < numDestinations; ++i) {
for (int i = 0; i < getNumDestinations(); ++i) {
result.add(Lists.newArrayList(data));
}
} else {
for (int i = 0; i < numDestinations; ++i) {
for (int i = 0; i < getNumDestinations(); ++i) {
result.add(new ArrayList<TupleBatch>());
}
TupleBatch[] tbs = partitionFunction.partition(data);
Expand All @@ -71,9 +70,20 @@ public List<List<TupleBatch>> distribute(@Nonnull final TupleBatch data) {
return result;
}

/**
* @return number of destinations
*/
public int getNumDestinations() {
Set<Integer> d = new HashSet<Integer>();
for (List<Integer> t : partitionToDestination) {
d.addAll(t);
}
return d.size();
}

/**
* @param numWorker the number of workers to distribute on
* @param numOperatorId the number of involved operator IDs
*/
public abstract void setNumDestinations(final int numWorker, final int numOperatorId);
public abstract void setDestinations(final int numWorker, final int numOperatorId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ public HashDistributeFunction(@JsonProperty("indexes") final int[] indexes) {
}

@Override
public void setNumDestinations(final int numWorker, final int numOperatorId) {
partitionFunction.setNumPartitions(numWorker);
public void setDestinations(final int numWorker, final int numOperatorId) {
partitionToDestination = MyriaArrayUtils.create2DVerticalIndexList(numWorker);
numDestinations = numWorker;
partitionFunction.setNumPartitions(numWorker);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ public final class HashPartitionFunction extends PartitionFunction {
private static final long serialVersionUID = 1L;

/** The indices used for partitioning. */
// @JsonProperty
private final int[] indexes;

/** The index of the chosen hashcode in <code>HashUtils</code>. */
// @JsonProperty
private final int seedIndex;

/**
* @param indexes the indices used for partitioning.
*/
// @JsonCreator
// public HashPartitionFunction(@JsonProperty("indexes") final int[] indexes) {
public HashPartitionFunction(final int[] indexes) {
this(indexes, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public List<Integer> getAllDestinations() {
}

@Override
public void setNumDestinations(final int numWorker, final int numOperatorId) {
numDestinations = getAllDestinations().size();
public void setDestinations(int numWorker, int numOperatorId) {
partitionFunction.setNumPartitions(partitionToDestination.size());
/* TODO: should this be the same as the product of the sizes of all dimensions? */
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ public IdentityDistributeFunction(@JsonProperty("index") final int index) {
}

@Override
public void setNumDestinations(final int numWorker, final int numOperatorId) {
partitionFunction.setNumPartitions(numWorker);
public void setDestinations(int numWorker, int numOperatorId) {
partitionToDestination = MyriaArrayUtils.create2DVerticalIndexList(numWorker);
numDestinations = numWorker;
partitionFunction.setNumPartitions(numWorker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ public RoundRobinDistributeFunction() {
}

@Override
public void setNumDestinations(final int numWorker, final int numOperatorId) {
partitionFunction.setNumPartitions(numWorker);
public void setDestinations(final int numWorker, final int numOperatorId) {
partitionToDestination = MyriaArrayUtils.create2DVerticalIndexList(numWorker);
numDestinations = numWorker;
partitionFunction.setNumPartitions(numWorker);
}
}
2 changes: 1 addition & 1 deletion src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ public DatasetStatus ingestDataset(

/* The master plan: send the tuples out. */
ExchangePairID scatterId = ExchangePairID.newID();
df.setNumDestinations(workersArray.length, 1);
df.setDestinations(workersArray.length, 1);
GenericShuffleProducer scatter =
new GenericShuffleProducer(source, new ExchangePairID[] {scatterId}, workersArray, df);

Expand Down

0 comments on commit 54486a2

Please sign in to comment.