Skip to content

Commit

Permalink
Making streaming unit test much faster
Browse files Browse the repository at this point in the history
Unit test now completes in less than 100ms.
Moved more complicated benchmark to an integration test that can
be run separately.
  • Loading branch information
edgan8 committed Apr 25, 2017
1 parent 9cea38b commit 1e292d8
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import edu.stanford.futuredata.macrobase.analysis.summary.itemset.result.AttributeSet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -19,7 +21,9 @@ public Explanation(List<AttributeSet> resultList,
long numInliers,
long numOutliers,
long creationTimeMs) {
itemsets = resultList;

itemsets = new ArrayList<>(resultList);
itemsets.sort((AttributeSet a, AttributeSet b) -> -a.compareTo(b));
this.numInliers = numInliers;
this.numOutliers = numOutliers;
this.creationTimeMs = creationTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,14 @@ public class IncrementalSummarizer implements IncrementalOperator<Explanation> {

public IncrementalSummarizer(int numPanes) {
setWindowSize(numPanes);
initializePanes();
}

public IncrementalSummarizer() {
setWindowSize(1);
}


@Override
public void setWindowSize(int numPanes) {
this.numPanes = numPanes;
initializePanes();
}
@Override
public int getWindowSize() { return numPanes; }

protected IncrementalSummarizer initializePanes() {
inlierPaneCounts = new ArrayDeque<>(numPanes);
outlierPaneCounts = new ArrayDeque<>(numPanes);
Expand All @@ -71,9 +65,15 @@ protected IncrementalSummarizer initializePanes() {
return this;
}

public IncrementalSummarizer setMinSupport(double minSupport) {
@Override
public void setWindowSize(int numPanes) {
this.numPanes = numPanes;
}
@Override
public int getWindowSize() { return numPanes; }

public void setMinSupport(double minSupport) {
this.minOutlierSupport = minSupport;
return this;
}
public double getMinSupport() { return minOutlierSupport; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,12 @@ public double getMaxWindowTime() {
public int getBufferSize() {
return batchBuffer.size();
}
public int getBufferedRows() {
int numRows = 0;
for (DataFrame df : batchBuffer) {
numRows += df.getNumRows();
}
return numRows;
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.stanford.futuredata.macrobase;

import edu.stanford.futuredata.macrobase.analysis.summary.BatchSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.Explanation;
import edu.stanford.futuredata.macrobase.analysis.summary.IncrementalSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.itemset.result.AttributeSet;
Expand All @@ -10,6 +9,7 @@

import java.util.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class StreamingSummarizationTest {
Expand All @@ -18,13 +18,13 @@ public class StreamingSummarizationTest {
* and a "bug" which shows up partway through and affects a specific combination of
* attribute values. Useful for testing streaming summarization operators.
*
* @param n number of rows
* @param k k-way simultaneous bad attributes required for bug
* @param C cardinality of each attribute column
* @param d dimension of number of attribute columns
* @param p probability of a random row being an outlier
* @param n number of rows
* @param k k-way simultaneous bad attributes required for bug
* @param C cardinality of each attribute column
* @param d dimension of number of attribute columns
* @param p probability of a random row being an outlier
* @param changeStartIdx index at which the "bug" starts showing up
* @param changeEndIdx index at which the "bug" ends showing up
* @param changeEndIdx index at which the "bug" ends showing up
* @return Complete dataset
*/
public static DataFrame generateAnomalyDataset(
Expand All @@ -39,7 +39,7 @@ public static DataFrame generateAnomalyDataset(
String[][] attrPrimitiveValues = new String[d][C];
for (int i = 0; i < C; i++) {
for (int j = 0; j < d; j++) {
attrPrimitiveValues[j][i] = String.format("a%d:%d",j,i);
attrPrimitiveValues[j][i] = String.format("a%d:%d", j, i);
}
}

Expand Down Expand Up @@ -73,7 +73,7 @@ public static DataFrame generateAnomalyDataset(
DataFrame df = new DataFrame();
df.addDoubleColumn("time", time);
for (int j = 0; j < d; j++) {
df.addStringColumn("a"+j, attrs[j]);
df.addStringColumn("a" + j, attrs[j]);
}
df.addDoubleColumn("outlier", isOutlier);
return df;
Expand All @@ -91,214 +91,65 @@ public static List<String> getAttributes(int d, boolean getValue) {
return attributes;
}

/**
* Test the baseline performance of a windowed, streaming, summarizer operator
*/
@Test
public void testStreamingWindowed() throws Exception {
// Increase these numbers for more rigorous, slower performance testing
int n = 6000;
public void testDetectSingleChange() throws Exception {
// Prepare data set
int n = 7000;
int k = 2;
int C = 4;
int C = 5;
int d = 5;
double p = 0.01;
int eventIdx = 2000;
int eventEndIdx = 4000;
int windowSize = 2000;
int slideSize = 300;
int slideSize = 1000;

DataFrame df = generateAnomalyDataset(n, k, C, d, p, eventIdx, eventEndIdx);
List<String> attributes = getAttributes(d, false);
List<String> buggyAttributeValues = getAttributes(k, true);
DataFrame df = generateAnomalyDataset(n, k, C, d, p, eventIdx, eventEndIdx);

IncrementalSummarizer outlierSummarizer = new IncrementalSummarizer();
outlierSummarizer.setAttributes(attributes);
outlierSummarizer.setOutlierColumn("outlier");
outlierSummarizer.setMinSupport(.3);
outlierSummarizer.setMinSupport(.5);
WindowedOperator<Explanation> windowedSummarizer = new WindowedOperator<>(outlierSummarizer);
windowedSummarizer.setWindowLength(windowSize);
windowedSummarizer.setTimeColumn("time");
windowedSummarizer.setSlideLength(slideSize);
windowedSummarizer.initialize();

BatchSummarizer bsumm = new BatchSummarizer();
bsumm.setAttributes(attributes);
bsumm.setOutlierColumn("outlier");
bsumm.setMinSupport(.3);

int miniBatchSize = slideSize;
double totalStreamingTime = 0.0;
double totalBatchTime = 0.0;

double miniBatchSize = slideSize;
double startTime = 0.0;
while (startTime < n) {
double endTime = startTime + miniBatchSize;
double ls = startTime;
DataFrame curBatch = df.filter(
"time",
(double t) -> t >= ls && t < endTime
);
long timerStart = System.currentTimeMillis();
windowedSummarizer.process(curBatch);
Explanation curExplanation = windowedSummarizer.getResults();
long timerElapsed = System.currentTimeMillis() - timerStart;
totalStreamingTime += timerElapsed;

DataFrame curWindow = df.filter(
"time",
(double t) -> t >= (endTime - windowSize) && t < endTime
);
timerStart = System.currentTimeMillis();
bsumm.process(curWindow);
Explanation batchExplanation = bsumm.getResults();
timerElapsed = System.currentTimeMillis() - timerStart;
totalBatchTime += timerElapsed;

startTime = endTime;
}

assertTrue(totalStreamingTime < totalBatchTime);
}

@Test
public void testDetectSingleChange() throws Exception {
// Prepare data set
int n = 10000;
int k = 2;
int C = 4;
int d = 5;
double p = 0.01;
int eventIdx = 2000;
int eventEndIdx = 8000;
int windowSize = 6000;
int slideSize = 500;

List<String> attributes = getAttributes(d, false);
List<String> buggyAttributeValues = getAttributes(k, true);
DataFrame df = generateAnomalyDataset(n, k, C, d, p, eventIdx, eventEndIdx);
BatchSummarizer batch = new BatchSummarizer().setAttributes(attributes).setOutlierColumn("outlier");
batch.process(df.filter("time", (double t) -> t >= eventIdx && t < eventEndIdx));
Explanation batchResult = batch.getResults();

/* Code to initialize incremental summarizer */
IncrementalSummarizer summarizer = new IncrementalSummarizer(windowSize / slideSize);
summarizer.setOutlierColumn("outlier").setAttributes(attributes);
/* End */

double startTime = 0.0;
while (startTime < n) {
double endTime = startTime + slideSize;
double ls = startTime;
DataFrame curBatch = df.filter("time", (double t) -> t >= ls && t < endTime);

/* Code to process incremental summarizer on a minibatch */
summarizer.process(curBatch);
Explanation explanation = summarizer.getResults();
/* Code to process windowed summarizer on a minibatch */
windowedSummarizer.process(curBatch);
Explanation explanation = windowedSummarizer.getResults();
/* End */

assert (explanation.getNumInliers() + explanation.getNumOutliers() == Math.min(windowSize, endTime));

if (startTime < eventIdx) { // No results before event time
assert (explanation.getItemsets().size() == 0);
} else {
// Combination:
// make sure that the known anomalous attribute combination has the highest risk ratio
List<AttributeSet> comboAttributes = explanation.getItemsets();
Collections.sort(comboAttributes,
(a, b)->(new Double(b.getRatioToInliers()).compareTo(new Double(a.getRatioToInliers()))));
assert(comboAttributes.get(0).getItems().values().containsAll(buggyAttributeValues));

// Check whether result from the incremental summarizer in the first window agrees with
// results from the batch summarizer for the event
if (endTime == eventEndIdx) {
for (AttributeSet expected : batchResult.getItemsets()) {
if (expected.getItems().values().containsAll(buggyAttributeValues)) {
assert(expected.getNumRecords() == comboAttributes.get(0).getNumRecords());
break;
}
}
}
if (endTime > windowSize) {
assertEquals(windowSize, explanation.getNumInliers() + explanation.getNumOutliers());
}
startTime = endTime;

}
}

@Test
public void testDetectEndingEvent() {
// Prepare data set
int n = 12000;
int k = 2;
int C = 5;
int d = 5;
double p = 0.005;
int eventIdx = 2000;
int eventEndIdx = 4000;
int windowSize = 5000;
int slideSize = 1000;

List<String> attributes = getAttributes(d, false);
List<String> buggyAttributeValues = getAttributes(k, true);
DataFrame df = generateAnomalyDataset(n, k, C, d, p, eventIdx, eventEndIdx);

// Summarizer with combinations
IncrementalSummarizer summarizer = new IncrementalSummarizer(windowSize / slideSize);
summarizer.setOutlierColumn("outlier").setAttributes(attributes);

double startTime = 0.0;
while (startTime < n) {
double endTime = startTime + slideSize;
double ls = startTime;
DataFrame curBatch = df.filter("time", (double t) -> t >= ls && t < endTime);
summarizer.process(curBatch);
Explanation explanation = summarizer.getResults(10);

if (startTime >= eventIdx && endTime < eventEndIdx + windowSize) {
// Make sure that the known anomalous attribute combination has the highest risk ratio
// before all event panes retire
Collections.sort(explanation.getItemsets(),
(a, b)->(new Double(b.getRatioToInliers()).compareTo(new Double(a.getRatioToInliers()))));
assert(explanation.getItemsets().get(0).getItems().values().containsAll(buggyAttributeValues));
if (windowedSummarizer.getMaxWindowTime() > eventIdx
&& windowedSummarizer.getMaxWindowTime() - windowSize < eventEndIdx) {
// make sure that the known anomalous attribute combination has the highest risk ratio
AttributeSet topRankedExplanation = explanation.getItemsets().get(0);
assertTrue(topRankedExplanation.getItems().values().containsAll(buggyAttributeValues));
} else {
// Make sure that there is no explanation before the event, and after all event panes retire
assert(explanation.getItemsets().size() == 0);
// Otherwise make sure that the noisy explanations are all low-cardinality
if (explanation.getItemsets().size() > 0) {
AttributeSet topRankedExplanation = explanation.getItemsets().get(0);
assertTrue(
topRankedExplanation.getNumRecords() < 20
);
}
}
startTime = endTime;
}
}


@Test
public void testAttributeCombo() {
// Prepare data set
int n = 16000;
int k = 4;
int C = 4;
int d = 10;
double p = 0.005;
int eventIdx = 0;
int eventEndIdx = 16000;
int windowSize = 4000;
int slideSize = 4000;

List<String> attributes = getAttributes(d, false);
List<String> buggyAttributeValues = getAttributes(k, true);
DataFrame df = generateAnomalyDataset(n, k, C, d, p, eventIdx, eventEndIdx);

// Summarizer with combinations
IncrementalSummarizer summarizer = new IncrementalSummarizer(windowSize / slideSize);
summarizer.setOutlierColumn("outlier").setAttributes(attributes);

double startTime = 0.0;
while (startTime < n) {
double endTime = startTime + slideSize;
double ls = startTime;
DataFrame curBatch = df.filter("time", (double t) -> t >= ls && t < endTime);
summarizer.process(curBatch);
Explanation explanation = summarizer.getResults(10);
Collections.sort(explanation.getItemsets(),
(a, b)->(new Double(b.getRatioToInliers()).compareTo(new Double(a.getRatioToInliers()))));
assert(explanation.getItemsets().get(0).getItems().values().containsAll(buggyAttributeValues));
startTime = endTime;
}
}
}

0 comments on commit 1e292d8

Please sign in to comment.