Permalink
Cannot retrieve contributors at this time
| #!/usr/bin/env python | |
| # encoding: utf-8 | |
| # This Source Code Form is subject to the terms of the Mozilla Public | |
| # License, v. 2.0. If a copy of the MPL was not distributed with this | |
| # file, You can obtain one at http://mozilla.org/MPL/2.0/. | |
| from collections import defaultdict | |
| from moztelemetry.dataset import Dataset | |
| from moztelemetry.histogram import cached_exponential_buckets | |
| from mozaggregator.bigquery import BigQueryDataset | |
| # Simple measurement, count histogram, and numeric scalars labels & prefixes | |
| SIMPLE_MEASURES_LABELS = cached_exponential_buckets(1, 30000, 50) | |
| COUNT_HISTOGRAM_LABELS = [ | |
| 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 23, 25, 27, 29, 31, 34, | |
| 37, 40, 43, 46, 50, 54, 58, 63, 68, 74, 80, 86, 93, 101, 109, 118, 128, 138, 149, 161, 174, 188, | |
| 203, 219, 237, 256, 277, 299, 323, 349, 377, 408, 441, 477, 516, 558, 603, 652, 705, 762, 824, | |
| 891, 963, 1041, 1125, 1216, 1315, 1422, 1537, 1662, 1797, 1943, 2101, 2271, 2455, 2654, 2869, | |
| 3102, 3354, 3626, 3920, 4238, 4582, 4954, 5356, 5791, 6261, 6769, 7318, 7912, 8554, 9249, 10000, | |
| ] | |
| NUMERIC_SCALARS_LABELS = COUNT_HISTOGRAM_LABELS | |
| SIMPLE_MEASURES_PREFIX = 'SIMPLE_MEASURES' | |
| COUNT_HISTOGRAM_PREFIX = '[[COUNT]]' | |
| NUMERIC_SCALARS_PREFIX = 'SCALARS' | |
| SCALAR_MEASURE_MAP = { | |
| SIMPLE_MEASURES_PREFIX: SIMPLE_MEASURES_LABELS, | |
| COUNT_HISTOGRAM_PREFIX: COUNT_HISTOGRAM_LABELS, | |
| NUMERIC_SCALARS_PREFIX: NUMERIC_SCALARS_LABELS | |
| } | |
| PROCESS_TYPES = {"parent", "content", "gpu"} | |
| def aggregate_metrics( | |
| sc, | |
| channels, | |
| submission_date, | |
| main_ping_fraction=1, | |
| num_reducers=10000, | |
| source="moztelemetry", | |
| project_id=None, | |
| dataset_id=None, | |
| avro_prefix=None, | |
| ): | |
| """ Returns the build-id and submission date aggregates for a given submission date. | |
| :param sc: A SparkContext instance | |
| :param channel: Either the name of a channel or a list/tuple of names | |
| :param submission-date: The submission date for which the data will be aggregated | |
| :param fraction: An approximative fraction of submissions to consider for aggregation | |
| """ | |
| if not isinstance(channels, (tuple, list)): | |
| channels = [channels] | |
| if source == "bigquery" and project_id and dataset_id: | |
| dataset = BigQueryDataset() | |
| pings = dataset.load( | |
| project_id, | |
| dataset_id, | |
| "main", | |
| submission_date, | |
| channels, | |
| "normalized_app_name <> 'Fennec'" | |
| ) | |
| elif source == "avro" and avro_prefix: | |
| dataset = BigQueryDataset() | |
| pings = dataset.load_avro( | |
| avro_prefix, | |
| "main", | |
| submission_date, | |
| channels, | |
| "normalized_app_name <> 'Fennec'" | |
| ) | |
| else: | |
| pings = Dataset.from_source('telemetry') \ | |
| .where(appUpdateChannel=lambda x: x in channels, | |
| submissionDate=submission_date, | |
| docType='main', | |
| sourceVersion='4', | |
| appName=lambda x: x != 'Fennec') \ | |
| .records(sc, sample=main_ping_fraction) | |
| return _aggregate_metrics(pings, num_reducers) | |
| def _aggregate_metrics(pings, num_reducers=10000): | |
| trimmed = pings.filter(_sample_clients).map(_map_ping_to_dimensions).filter(lambda x: x) | |
| build_id_aggregates = trimmed.aggregateByKey(defaultdict(dict), _aggregate_ping, _aggregate_aggregates, num_reducers) | |
| submission_date_aggregates = build_id_aggregates.map(_map_build_id_key_to_submission_date_key).reduceByKey(_aggregate_aggregates) | |
| return build_id_aggregates, submission_date_aggregates | |
| def _map_build_id_key_to_submission_date_key(aggregate): | |
| return tuple(aggregate[0][:3] + aggregate[0][4:]), aggregate[1] | |
| def _sample_clients(ping): | |
| try: | |
| sample_id = ping.get("meta", {}).get("sampleId") | |
| if not isinstance(sample_id, (int, float)): | |
| return False | |
| # Here "aurora" is actually the dev edition. | |
| if ping.get("application", {}).get("channel") not in ("nightly", "aurora", "beta", "release"): | |
| return False | |
| return sample_id < 100 | |
| except: # noqa | |
| return False | |
| def _extract_histograms(state, payload, process_type="parent"): | |
| if not isinstance(payload, dict): | |
| return | |
| histograms = payload.get("histograms", {}) | |
| _extract_main_histograms(state, histograms, process_type) | |
| keyed_histograms = payload.get("keyedHistograms", {}) | |
| if not isinstance(keyed_histograms, dict): | |
| return | |
| for name, histograms in keyed_histograms.items(): | |
| # See Bug 1275010 and 1275019 | |
| if name in ["MESSAGE_MANAGER_MESSAGE_SIZE", | |
| "VIDEO_DETAILED_DROPPED_FRAMES_PROPORTION"]: | |
| continue | |
| _extract_keyed_histograms(state, name, histograms, process_type) | |
| def _extract_histogram(state, histogram, histogram_name, label, process_type): | |
| if not isinstance(histogram, dict): | |
| return | |
| values = histogram.get("values") | |
| if not isinstance(values, dict): | |
| return | |
| sum = histogram.get("sum") | |
| if not isinstance(sum, int) or sum < 0: | |
| return | |
| histogram_type = histogram.get("histogram_type") | |
| if not isinstance(histogram_type, int): | |
| return | |
| if histogram_type == 4: # Count histogram | |
| return _extract_scalar_value( | |
| state, '_'.join((COUNT_HISTOGRAM_PREFIX, histogram_name)), label, | |
| sum, COUNT_HISTOGRAM_LABELS, process_type=process_type) | |
| # Note that some dimensions don't vary within a single submissions | |
| # (e.g. channel) while some do (e.g. process type). | |
| # The latter should appear within the key of a single metric. | |
| accessor = (histogram_name, label, process_type) | |
| aggregated_histogram = state[accessor]["histogram"] = state[accessor].get("histogram", {}) | |
| state[accessor]["sum"] = state[accessor].get("sum", 0) + sum | |
| state[accessor]["count"] = state[accessor].get("count", 0) + 1 | |
| for k, v in values.items(): | |
| try: | |
| int(k) | |
| except ValueError: | |
| # We have seen some histograms with non-integer bucket keys. | |
| continue | |
| v = v if isinstance(v, int) else 0 | |
| aggregated_histogram[k] = aggregated_histogram.get(k, 0) + v | |
| def _extract_main_histograms(state, histograms, process_type): | |
| if not isinstance(histograms, dict): | |
| return | |
| for histogram_name, histogram in histograms.items(): | |
| _extract_histogram(state, histogram, histogram_name, "", process_type) | |
| def _extract_keyed_histograms(state, histogram_name, histograms, process_type): | |
| if not isinstance(histograms, dict): | |
| return | |
| for key, histogram in histograms.items(): | |
| _extract_histogram(state, histogram, histogram_name, key, process_type) | |
| def _extract_simple_measures(state, simple, process_type="parent"): | |
| if not isinstance(simple, dict): | |
| return | |
| for name, value in simple.items(): | |
| if isinstance(value, dict): | |
| for sub_name, sub_value in value.items(): | |
| if isinstance(sub_value, (int, float)): | |
| _extract_scalar_value( | |
| state, | |
| "_".join((SIMPLE_MEASURES_PREFIX, name.upper(), sub_name.upper())), | |
| "", sub_value, SIMPLE_MEASURES_LABELS, process_type) | |
| elif isinstance(value, (int, float)): | |
| _extract_scalar_value( | |
| state, "_".join((SIMPLE_MEASURES_PREFIX, name.upper())), | |
| "", value, SIMPLE_MEASURES_LABELS, process_type) | |
| def _extract_scalars(state, process_payloads): | |
| for process in PROCESS_TYPES: | |
| _extract_numeric_scalars(state, process_payloads.get(process, {}).get("scalars", {}), process) | |
| _extract_keyed_numeric_scalars(state, process_payloads.get(process, {}).get("keyedScalars", {}), process) | |
| def _extract_numeric_scalars(state, scalar_dict, process): | |
| if not isinstance(scalar_dict, dict): | |
| return | |
| for name, value in scalar_dict.items(): | |
| if not isinstance(value, (int, float)): | |
| continue | |
| if name.startswith("browser.engagement.navigation"): | |
| continue | |
| scalar_name = "_".join((NUMERIC_SCALARS_PREFIX, name.upper())) | |
| _extract_scalar_value(state, scalar_name, "", value, NUMERIC_SCALARS_LABELS, process) | |
| def _extract_keyed_numeric_scalars(state, scalar_dict, process): | |
| if not isinstance(scalar_dict, dict): | |
| return | |
| for name, value in scalar_dict.items(): | |
| if not isinstance(value, dict): | |
| continue | |
| if name.startswith("browser.engagement.navigation"): | |
| continue | |
| scalar_name = "_".join((NUMERIC_SCALARS_PREFIX, name.upper())) | |
| for sub_name, sub_value in value.items(): | |
| if not isinstance(sub_value, (int, float)): | |
| continue | |
| _extract_scalar_value(state, scalar_name, sub_name.upper(), sub_value, NUMERIC_SCALARS_LABELS, process) | |
| def _extract_scalar_value(state, name, label, value, bucket_labels, process_type="parent"): | |
| if value < 0: # Afaik we are collecting only positive values | |
| return | |
| accessor = (name, label, process_type) | |
| aggregated_histogram = state[accessor]["histogram"] = state[accessor].get("histogram", {}) | |
| state[accessor]["sum"] = state[accessor].get("sum", 0) + value | |
| state[accessor]["count"] = state[accessor].get("count", 0) + 1 | |
| insert_bucket = bucket_labels[0] # Initialized to underflow bucket | |
| for bucket in reversed(bucket_labels): | |
| if value >= bucket: | |
| insert_bucket = bucket | |
| break | |
| aggregated_histogram[str(insert_bucket)] = aggregated_histogram.get(str(insert_bucket), 0) + 1 | |
| def _extract_child_payloads(state, child_payloads): | |
| if not isinstance(child_payloads, (list, tuple)): | |
| return | |
| for child in child_payloads: | |
| _extract_histograms(state, child, "content") | |
| _extract_simple_measures(state, child.get("simpleMeasurements", {}), "content") | |
| def _aggregate_ping(state, ping): | |
| if not isinstance(ping, dict): | |
| return | |
| _extract_scalars(state, ping.get("payload", {}).get("processes", {})) | |
| _extract_histograms(state, ping.get("payload", {})) | |
| _extract_simple_measures(state, ping.get("payload", {}).get("simpleMeasurements", {})) | |
| _extract_child_payloads(state, ping.get("payload", {}).get("childPayloads", {})) | |
| _extract_histograms(state, ping.get("payload", {}).get("processes", {}).get("content", {}), "content") | |
| _extract_histograms(state, ping.get("payload", {}).get("processes", {}).get("gpu", {}), "gpu") | |
| return state | |
| def _aggregate_aggregates(agg1, agg2): | |
| for metric, payload in agg2.items(): | |
| if metric not in agg1: | |
| agg1[metric] = payload | |
| continue | |
| agg1[metric]["count"] += payload["count"] | |
| agg1[metric]["sum"] += payload["sum"] | |
| for k, v in payload["histogram"].items(): | |
| agg1[metric]["histogram"][k] = agg1[metric]["histogram"].get(k, 0) + v | |
| return agg1 | |
| def _trim_payload(payload): | |
| return {k: v for k, v in payload.items() | |
| if k in ["histograms", "keyedHistograms", "simpleMeasurements", "processes"]} | |
| def _map_ping_to_dimensions(ping): | |
| try: | |
| submission_date = ping["meta"]["submissionDate"] | |
| channel = ping["application"]["channel"] | |
| version = ping["application"]["version"].split('.')[0] | |
| build_id = ping["application"]["buildId"][:8] | |
| application = ping["application"]["name"] | |
| architecture = ping["application"]["architecture"] | |
| os = ping["environment"]["system"]["os"]["name"] | |
| os_version = ping["environment"]["system"]["os"]["version"] | |
| if os == "Linux": | |
| os_version = str(os_version)[:3] | |
| try: | |
| int(build_id) | |
| except ValueError: | |
| return None | |
| subset = {} | |
| subset["payload"] = _trim_payload(ping["payload"]) | |
| subset["payload"]["childPayloads"] = [_trim_payload(c) for c in ping["payload"].get("childPayloads", [])] | |
| # Note that some dimensions don't vary within a single submissions | |
| # (e.g. channel) while some do (e.g. process type). | |
| # Dimensions that don't vary should appear in the submission key, while | |
| # the ones that do vary should appear within the key of a single metric. | |
| return ((submission_date, channel, version, build_id, application, architecture, os, os_version), subset) | |
| except: # noqa | |
| return None |