Skip to content

Commit

Permalink
Documentation for hybrid system
Browse files Browse the repository at this point in the history
  • Loading branch information
kraftp committed Jan 26, 2018
1 parent 4ddab1f commit df047e2
Showing 1 changed file with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public List<APLExplanationResult> explain(
}
for (int curOrder = 1; curOrder <= 3; curOrder++) {
long startTime = System.currentTimeMillis();
// Ceiling on attribute values that will be perfect-hashed instead of mapped.
int perfectHashingThresholdExponent = 5;
// Ceiling on attribute values that will be perfect-hashed instead of mapped. Cannot be greater than
// 32/curOrder or indices will not fit in an int.
int perfectHashingThresholdExponent = 8;
// Equal to 2**perfectHashingThresholdExponent
int perfectHashingThreshold = Math.toIntExact(Math.round(Math.pow(2, perfectHashingThresholdExponent)));
// The maximum size of the perfect hash array implied by perfectHashingThreshold
Expand All @@ -104,13 +105,15 @@ public List<APLExplanationResult> explain(
// Run the critical path of the algorithm--candidate generation--in parallel.
final int curOrderFinal = curOrder;
final int numThreads = 1;//Runtime.getRuntime().availableProcessors();
// Group by and calculate aggregates for each of the candidates
// The perfect hash-table. This is not synchronized, so all values in it are approximate.
final double [][] threadSetAggregatesArray = new double[perfectHashArraySize][numAggregates];
// The per-thread hashmaps. These store less-common values.
final ArrayList<Long2ObjectOpenHashMap<double []>> threadSetAggregates = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threadSetAggregates.add(new Long2ObjectOpenHashMap<>());
}
final CountDownLatch doneSignal = new CountDownLatch(numThreads);
// Shard the dataset by row into threads.
for (int threadNum = 0; threadNum < numThreads; threadNum++) {
final int startIndex = (numRows * threadNum) / numThreads;
final int endIndex = (numRows * (threadNum + 1)) / numThreads;
Expand All @@ -119,13 +122,18 @@ public List<APLExplanationResult> explain(
Runnable APrioriLinearRunnable = () -> {
for (int i = startIndex; i < endIndex; i++) {
int[] curRowAttributes = attributes[i];
// First, generate the candidates
ArrayList<Long> candidates = getCandidates(
curOrderFinal,
curRowAttributes,
precalculatedCandidates,
logCardinality
);
// Next, aggregate candidates. This loop uses a hybrid system where common candidates are
// perfect-hashed for speed while less common ones are stored in a hash table.
for (long curCandidate: candidates) {
// If all components of a candidate are in the perfectHashingThreshold most common,
// store it in the perfect hash table.
if(IntSetAsLong.checkLessThan(curCandidate, perfectHashingThreshold, logCardinality)) {
if (maxPerfectHashArraySize < cardPerfectHashArraySize) {
curCandidate = IntSetAsLong.changePacking(curCandidate,
Expand All @@ -134,6 +142,7 @@ public List<APLExplanationResult> explain(
for (int a = 0; a < numAggregates; a++) {
threadSetAggregatesArray[(int) curCandidate][a] += aRows[i][a];
}
// Otherwise, store it in a hashmap.
} else {
double[] candidateVal = thisThreadSetAggregates.get(curCandidate);
if (candidateVal == null) {
Expand All @@ -156,7 +165,7 @@ public List<APLExplanationResult> explain(
doneSignal.await();
} catch (InterruptedException ex) {ex.printStackTrace();}

// Collect the threadSetAggregatesArray into a HashMap for simplicity
// Collect the aggregates stored in the perfect hash table.
Long2ObjectOpenHashMap<double []> setAggregates = new Long2ObjectOpenHashMap<>();
for (int curCandidateKey = 0; curCandidateKey < perfectHashArraySize; curCandidateKey++) {
if (threadSetAggregatesArray[curCandidateKey][0] == 0 && threadSetAggregatesArray[curCandidateKey][1] == 0) continue;
Expand All @@ -175,7 +184,8 @@ public List<APLExplanationResult> explain(
}
}
}


// Collect the aggregates stored in the per-thread hashmaps.
for (Long2ObjectOpenHashMap<double []> set : threadSetAggregates) {
for (long curCandidateKey : set.keySet()) {
double[] curCandidateValue = set.get(curCandidateKey);
Expand All @@ -190,6 +200,7 @@ public List<APLExplanationResult> explain(
}
}

// Prune all the collected aggregates
LongOpenHashSet curOrderNext = new LongOpenHashSet();
LongOpenHashSet curOrderSaved = new LongOpenHashSet();
int pruned = 0;
Expand Down

0 comments on commit df047e2

Please sign in to comment.