Skip to content

Commit

Permalink
Bug 1646825 Compact encoding for use counter histograms (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
jklukas committed Jul 9, 2020
1 parent 6a718a9 commit 8782555
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ private PayloadWithBeam(List<String> strictSchemaDocTypes, String schemasLocatio
"not_coerced_to_int");
private static final Counter notCoercedToBool = Metrics.counter(PubsubMessageToTableRow.class,
"not_coerced_to_bool");
private static final Counter invalidHistogramType = Metrics
.counter(PubsubMessageToTableRow.class, "invalid_histogram_type");
private static final Counter invalidHistogramSum = Metrics
.counter(PubsubMessageToTableRow.class, "invalid_histogram_sum");
private static final Counter invalidHistogramUseCounter = Metrics
.counter(PubsubMessageToTableRow.class, "invalid_histogram_use_counter");
private static final Counter invalidHistogramRange = Metrics
.counter(PubsubMessageToTableRow.class, "invalid_histogram_range");

/** measure rate of CoercedToInt. */
@Override
Expand All @@ -115,6 +123,30 @@ protected void incrementNotCoercedToBool() {
notCoercedToBool.inc();
}

/** measure rate of InvalidHistogramType. */
@Override
protected void incrementInvalidHistogramType() {
invalidHistogramType.inc();
}

/** measure rate of InvalidHistogramSum. */
@Override
protected void incrementInvalidHistogramSum() {
invalidHistogramSum.inc();
}

/** measure rate of InvalidHistogramUseCounter. */
@Override
protected void incrementInvalidHistogramUseCounter() {
invalidHistogramUseCounter.inc();
}

/** measure rate of InvalidHistogramRange. */
@Override
protected void incrementInvalidHistogramRange() {
invalidHistogramRange.inc();
}

@Override
public ObjectNode apply(TableId tableId, Map<String, String> attributes, byte[] data) {
// attempt to decompress data that may have been compressed to minimize shuffle size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ private WithOpenCensusMetrics(Cache<String, String> normalizedNameCache,
"The number of values that failed to be coerced to int", "1");
private static final MeasureLong NOT_COERCED_TO_BOOL = MeasureLong.create(
"not_coerced_to_bool", "The number of values that failed to be coerced to bool", "1");
private static final MeasureLong INVALID_HISTOGRAM_TYPE = MeasureLong.create(
"invalid_histogram_type", //
"The number of parsed histograms with an invalid histogram_type", "1");
private static final MeasureLong INVALID_HISTOGRAM_SUM = MeasureLong.create(
"invalid_histogram_sum", //
"The number of parsed histograms with an invalid sum field", "1");
private static final MeasureLong INVALID_HISTOGRAM_USE_COUNTER = MeasureLong.create(
"invalid_histogram_use_counter", //
"The number of parsed histograms failing use counter invariants", "1");
private static final MeasureLong INVALID_HISTOGRAM_RANGE = MeasureLong.create(
"invalid_histogram_range", //
"The number of parsed histograms with an invalid bucket_count or range field", "1");
private static final Aggregation.Count COUNT_AGGREGATION = Aggregation.Count.create();
private static final StatsRecorder STATS_RECORDER = Stats.getStatsRecorder();

Expand Down Expand Up @@ -170,6 +182,31 @@ protected void incrementNotCoercedToInt() {
protected void incrementNotCoercedToBool() {
STATS_RECORDER.newMeasureMap().put(NOT_COERCED_TO_BOOL, 1).record();
}

/** measure rate of InvalidHistogramType. */
@Override
protected void incrementInvalidHistogramType() {
STATS_RECORDER.newMeasureMap().put(INVALID_HISTOGRAM_TYPE, 1).record();
}

/** measure rate of InvalidHistogramSum. */
@Override
protected void incrementInvalidHistogramSum() {
STATS_RECORDER.newMeasureMap().put(INVALID_HISTOGRAM_SUM, 1).record();
}

/** measure rate of InvalidHistogramUseCounter. */
@Override
protected void incrementInvalidHistogramUseCounter() {
STATS_RECORDER.newMeasureMap().put(INVALID_HISTOGRAM_USE_COUNTER, 1).record();
}

/** measure rate of InvalidHistogramRange. */
@Override
protected void incrementInvalidHistogramRange() {
STATS_RECORDER.newMeasureMap().put(INVALID_HISTOGRAM_RANGE, 1).record();
}

}

public static Payload of(List<String> strictSchemaDocTypes, String schemasLocation,
Expand Down Expand Up @@ -223,6 +260,22 @@ protected void incrementNotCoercedToInt() {
protected void incrementNotCoercedToBool() {
}

/** measure rate of InvalidHistogramType. */
protected void incrementInvalidHistogramType() {
}

/** measure rate of InvalidHistogramSum. */
protected void incrementInvalidHistogramSum() {
}

/** measure rate of InvalidHistogramUseCounter. */
protected void incrementInvalidHistogramUseCounter() {
}

/** measure rate of InvalidHistogramRange. */
protected void incrementInvalidHistogramRange() {
}

/**
* Turn message data into an {@link ObjectNode}.
*
Expand Down Expand Up @@ -541,6 +594,8 @@ private Optional<JsonNode> coerceSingleValueToBqType(JsonNode o, Field field) {
if (field.getType() == LegacySQLTypeName.STRING) {
if (o.isTextual()) {
return Optional.of(o);
} else if (o.isObject() && o.has("histogram_type")) {
return Optional.of(compactHistogramEncoding(o, field.getName()));
} else {
// If not already a string, we JSON-ify the value.
// We have many fields that we expect to be coerced to string (histograms, userPrefs,
Expand Down Expand Up @@ -596,6 +651,76 @@ private String getAndCacheBqName(String name) {
}
}

/**
* Encode the given JSON object as a compact string.
*
* <p>We maintain a BigQuery persistent UDF that can parse a variety of formats, so any format
* used here must first be supported in:
* https://github.com/mozilla/bigquery-etl/blob/master/mozfun/hist/extract/udf.sql
*
* <p>Schema validation should generally ensure that all histograms are well-formed, but we
* perform some light sanity checking on the values and fall back to encoding the histogram
* as a JSON string if anything seems off.
*/
private TextNode compactHistogramEncoding(JsonNode o, String fieldName) {
final int histogramType = o.path("histogram_type").asInt(-1);
if (histogramType < 0 || histogramType > 5) {
incrementInvalidHistogramType();
return jsonHistogramEncoding(o);
}
final long sum = o.path("sum").asLong(-1);
if (sum < 0) {
incrementInvalidHistogramSum();
return jsonHistogramEncoding(o);
}
final JsonNode values = o.path("values");
if (histogramType == 2 && fieldName.startsWith("use_counter2")) {
// Histograms named as "use_counter" are reported as type 2 (boolean), but only ever have
// a non-zero value in the "1" (true) bucket (and the "sum" field should match this count).
// They can be encoded as a textual representation of that single number without any loss
// of information, and since use counters make up the majority of histogram data volume,
// this optimization case is the most important one.
// See https://firefox-source-docs.mozilla.org/toolkit/components/telemetry/collection/use-counters.html
if (sum == values.path("1").asLong(0)) {
return TextNode.valueOf(Long.toString(sum));
} else {
incrementInvalidHistogramUseCounter();
return jsonHistogramEncoding(o);
}
}
return jsonHistogramEncoding(o);
// TODO: Uncomment the following section and related test cases once we have transitioned
// analysis use cases to tolerate the additional encodings.
/*
else if (histogramType == 4) {
return TextNode.valueOf(Long.toString(sum));
} else if (histogramType == 2) {
// Type 2 are "boolean" histograms where bucket "0" is a count of false values and
// bucket "1" is a count of true values.
return TextNode.valueOf(
String.format("%d,%d", values.path("0").longValue(), values.path("1").longValue()));
} else {
final int bucketCount = o.path("bucket_count").asInt(-1);
final long rangeLo = o.path("range").path(0).asLong(-1);
final long rangeHi = o.path("range").path(1).asLong(-1);
final String valString = Json.asString(values) //
.replace("{", "") //
.replace("}", "") //
.replace("\"", "");
if (bucketCount <= 0 || rangeLo < 0 || rangeHi < 0) {
incrementInvalidHistogramRange();
return jsonHistogramEncoding(o);
}
return TextNode.valueOf(String.format("%d;%d;%d;%d,%d;%s", //
bucketCount, histogramType, sum, rangeLo, rangeHi, valString));
}
*/
}

private static TextNode jsonHistogramEncoding(JsonNode o) {
return TextNode.valueOf(Json.asString(o));
}

/**
* Converts a name to a BigQuery-friendly format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.junit.Ignore;
import org.junit.Test;

public class PubsubMessageToObjectNodeBeamTest {
Expand Down Expand Up @@ -149,6 +150,111 @@ public void testTypeCoercion() throws Exception {
assertEquals(expectedAdditionalProperties, Json.asMap(additionalProperties));
}

@Test
public void testCoerceUseCounterHistogramToString() throws Exception {
String mainPing = "{\"payload\":{\"histograms\":{\"use_counter2_css_property_all_page\":"
+ "{\"bucket_count\":3,\"histogram_type\":2,\"sum\":5,\"range\":[1,2]"
+ ",\"values\":{\"0\":0,\"1\":5,\"2\":0}}}}}";
ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList.of(Field.of("payload", LegacySQLTypeName.RECORD,
Field.of("histograms", LegacySQLTypeName.RECORD,
Field.of("use_counter2_css_property_all_page", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload",
ImmutableMap.of("histograms", ImmutableMap.of("use_counter2_css_property_all_page", "5")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Test
public void testCoerceEmptyUseCounterHistogramToString() throws Exception {
String mainPing = "{\"payload\":{\"histograms\":{\"use_counter2_css_property_all_page\":"
+ "{\"bucket_count\":3,\"histogram_type\":2,\"sum\":0,\"range\":[1,2]"
+ ",\"values\":{}}}}}";
ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList.of(Field.of("payload", LegacySQLTypeName.RECORD,
Field.of("histograms", LegacySQLTypeName.RECORD,
Field.of("use_counter2_css_property_all_page", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload",
ImmutableMap.of("histograms", ImmutableMap.of("use_counter2_css_property_all_page", "0")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Ignore("Waiting for full compact histogram encoding implementation; see bug 1646825")
@Test
public void testCoerceType2HistogramToString() throws Exception {
String mainPing = "{\"payload\":{\"histograms\":{\"some_histo\":"
+ "{\"bucket_count\":3,\"histogram_type\":2,\"sum\":1,\"range\":[1,2]"
+ ",\"values\":{\"0\":0,\"1\":1,\"2\":0}}}}}";

ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList
.of(Field.of("payload", LegacySQLTypeName.RECORD, Field.of("histograms",
LegacySQLTypeName.RECORD, Field.of("some_histo", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload",
ImmutableMap.of("histograms", ImmutableMap.of("some_histo", "0,1")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Ignore("Waiting for full compact histogram encoding implementation; see bug 1646825")
@Test
public void testCoerceType4HistogramToString() throws Exception {
String mainPing = "{\"payload\":{\"histograms\":{\"some_histo\":"
+ "{\"bucket_count\":3,\"histogram_type\":4,\"sum\":3,\"range\":[1,2]"
+ ",\"values\":{\"0\":0,\"1\":3,\"2\":0}}}}}";
ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList
.of(Field.of("payload", LegacySQLTypeName.RECORD, Field.of("histograms",
LegacySQLTypeName.RECORD, Field.of("some_histo", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload",
ImmutableMap.of("histograms", ImmutableMap.of("some_histo", "3")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Ignore("Waiting for full compact histogram encoding implementation; see bug 1646825")
@Test
public void testCoerceType1HistogramToString() throws Exception {
String mainPing = "{\"payload\":{\"histograms\":{\"some_histo\":"
+ "{\"bucket_count\":10,\"histogram_type\":1,\"sum\":2628,\"range\":[1,100]"
+ ",\"values\":{\"0\":12434,\"1\":297,\"13\":8}}}}}";
ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList
.of(Field.of("payload", LegacySQLTypeName.RECORD, Field.of("histograms",
LegacySQLTypeName.RECORD, Field.of("some_histo", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload", ImmutableMap.of("histograms",
ImmutableMap.of("some_histo", "10;1;2628;1,100;0:12434,1:297,13:8")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Test
public void testCoerceHistogramFallback() throws Exception {
// We construct a histogram with invalid negative range to test that we fall back to
// writing out the JSON explicitly.
String mainPing = "{\"payload\":{\"histograms\":{\"some_histo\":"
+ "{\"bucket_count\":10,\"histogram_type\":1,\"sum\":2628,\"range\":[-5,100]"
+ ",\"values\":{\"0\":12434,\"1\":297,\"13\":8}}}}}";
ObjectNode parent = Json.readObjectNode(mainPing);
ObjectNode additionalProperties = Json.createObjectNode();
List<Field> bqFields = ImmutableList
.of(Field.of("payload", LegacySQLTypeName.RECORD, Field.of("histograms",
LegacySQLTypeName.RECORD, Field.of("some_histo", LegacySQLTypeName.STRING))));
Map<String, Object> expected = ImmutableMap.of("payload",
ImmutableMap.of("histograms",
ImmutableMap.of("some_histo",
"{\"bucket_count\":10,\"histogram_type\":1,\"sum\":2628,\"range\":[-5,100]"
+ ",\"values\":{\"0\":12434,\"1\":297,\"13\":8}}")));
TRANSFORM.transformForBqSchema(parent, bqFields, additionalProperties);
assertEquals(expected, Json.asMap(parent));
}

@Test
public void testUnmap() throws Exception {
ObjectNode parent = Json.createObjectNode().set("mapField",
Expand Down

0 comments on commit 8782555

Please sign in to comment.