This repository has been archived by the owner. It is now read-only.
Permalink
Cannot retrieve contributors at this time
| # This Source Code Form is subject to the terms of the Mozilla Public | |
| # License, v. 2.0. If a copy of the MPL was not distributed with this | |
| # file, you can obtain one at http://mozilla.org/MPL/2.0/. | |
| from __future__ import division, print_function | |
| import functools | |
| import heapq | |
| import json | |
| import random | |
| import re | |
| import types | |
| from copy import copy | |
| from inspect import isfunction | |
| from itertools import chain | |
| from multiprocessing import cpu_count | |
| from six.moves import copyreg | |
| from pyspark.sql import Row | |
| import jmespath | |
| from concurrent import futures | |
| from .heka import message_parser | |
| from .store import S3Store | |
| DEFAULT_MAX_CONCURRENCY = int(cpu_count() * 1.5) | |
| SANITIZE_PATTERN = re.compile("[^a-zA-Z0-9_.]") | |
| def _group_by_size_greedy(obj_list, tot_groups): | |
| """Partition a list of objects in even buckets | |
| The idea is to choose the bucket for an object in a round-robin fashion. | |
| The list of objects is sorted to also try to keep the total size in bytes | |
| as balanced as possible. | |
| :param obj_list: a list of dict-like objects with a 'size' property | |
| :param tot_groups: number of partitions to split the data into. | |
| :return: a list of lists, one for each partition. | |
| """ | |
| sorted_list = sorted(obj_list, key=lambda x: x['size'], reverse=True) | |
| groups = [[] for _ in range(tot_groups)] | |
| for index, obj in enumerate(sorted_list): | |
| current_group = groups[index % len(groups)] | |
| current_group.append(obj) | |
| return groups | |
| def _group_by_equal_size(obj_list, tot_groups, threshold=pow(2, 32)): | |
| """Partition a list of objects evenly and by file size | |
| Files are placed according to largest file in the smallest bucket. If the | |
| file is larger than the given threshold, then it is placed in a new bucket | |
| by itself. | |
| :param obj_list: a list of dict-like objects with a 'size' property | |
| :param tot_groups: number of partitions to split the data | |
| :param threshold: the maximum size of each bucket | |
| :return: a list of lists, one for each partition | |
| """ | |
| sorted_obj_list = sorted([(obj['size'], obj) for obj in obj_list], reverse=True) | |
| groups = [(random.random(), []) for _ in range(tot_groups)] | |
| if tot_groups <= 1: | |
| groups = _group_by_size_greedy(obj_list, tot_groups) | |
| return groups | |
| heapq.heapify(groups) | |
| for obj in sorted_obj_list: | |
| if obj[0] > threshold: | |
| heapq.heappush(groups, (obj[0], [obj[1]])) | |
| else: | |
| size, files = heapq.heappop(groups) | |
| size += obj[0] | |
| files.append(obj[1]) | |
| heapq.heappush(groups, (size, files)) | |
| groups = [group[1] for group in groups] | |
| return groups | |
| def _pickle_method(m): | |
| """Make instance methods pickable | |
| See http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma/1816969#1816969 | |
| """ | |
| if m.im_self is None: | |
| return getattr, (m.im_class, m.im_func.func_name) | |
| else: | |
| return getattr, (m.im_self, m.im_func.func_name) | |
| copyreg.pickle(types.MethodType, _pickle_method) | |
| class Dataset: | |
| """Represents a collection of objects on S3. | |
| A Dataset can have zero, one or many filters, which are refined using the | |
| `where` method. The result of refining a Dataset is a Dataset itself, so | |
| it's possible to chain multiple `where` clauses together. | |
| The actual data retrieval is triggered by the `records` method, which returns | |
| a Spark RDD containing the list of records retrieved. To call `records` | |
| a SparkContext object must be provided. | |
| Usage example:: | |
| bucket = 'test-bucket' | |
| schema = ['submissionDate', 'docType', 'platform'] | |
| records = Dataset(bucket, schema) \\ | |
| .select( | |
| 'clientId', | |
| os_name='environment.system.os.name', | |
| first_paint='payload.simpleMeasurements.firstPaint', | |
| // Take the first 2 stacks for each thread hang. | |
| stack_list='payload.threadHangStats[].hangs[].stack[0:2]' | |
| ).where( | |
| docType='main', | |
| appUpdateChannel='nightly', | |
| submissionDate=lambda x: x.startswith('201607'), | |
| ).records(sc) | |
| For convenience Dataset objects can be created using the factory method | |
| `from_source`, that takes a source name (e.g. 'telemetry') and returns a | |
| new Dataset instance. The instance created will be aware of the list of | |
| dimensions, available on its `schema` attribute for inspection. | |
| """ | |
| def __init__(self, | |
| bucket, | |
| schema, | |
| store=None, | |
| prefix=None, | |
| clauses=None, | |
| selection=None, | |
| max_concurrency=None): | |
| """Initialize a Dataset provided bucket and schema | |
| :param bucket: bucket name | |
| :param schema: a list of fields describing the structure of the dataset | |
| :param store: an instance of S3Store, potentially reused among several | |
| datasets | |
| :param prefix: a prefix to the | |
| :param clauses: mapping of fields -> callables to refine the dataset | |
| :param max_concurrency: number of threads to spawn when collecting S3 summaries, | |
| defaults to 1.5 * cpu_count | |
| """ | |
| self.bucket = bucket | |
| self.schema = schema | |
| self.prefix = prefix or '' | |
| self.clauses = clauses or {} | |
| self.store = store or S3Store(self.bucket) | |
| self.selection = selection or {} | |
| self.selection_compiled = {} | |
| self.max_concurrency = max_concurrency or DEFAULT_MAX_CONCURRENCY | |
| def __repr__(self): | |
| params = ['bucket', 'schema', 'store', 'prefix', 'clauses', 'selection', 'max_concurrency'] | |
| stmts = ['{}={!r}'.format(param, getattr(self, param)) for param in params] | |
| return 'Dataset({})'.format(', '.join(stmts)) | |
| def _copy(self, | |
| bucket=None, | |
| schema=None, | |
| store=None, | |
| prefix=None, | |
| clauses=None, | |
| selection=None, | |
| max_concurrency=None): | |
| return Dataset( | |
| bucket=bucket or self.bucket, | |
| schema=schema or self.schema, | |
| store=store or self.store, | |
| prefix=prefix or self.prefix, | |
| clauses=clauses or self.clauses, | |
| selection=selection or self.selection, | |
| max_concurrency=max_concurrency or self.max_concurrency) | |
| def select(self, *properties, **aliased_properties): | |
| """Specify which properties of the dataset must be returned | |
| Property extraction is based on `JMESPath <http://jmespath.org>`_ expressions. | |
| This method returns a new Dataset narrowed down by the given selection. | |
| :param properties: JMESPath to use for the property extraction. | |
| The JMESPath string will be used as a key in the output dictionary. | |
| :param aliased_properties: Same as properties, but the output dictionary will contain | |
| the parameter name instead of the JMESPath string. | |
| """ | |
| if not (properties or aliased_properties): | |
| return self | |
| merged_properties = dict(zip(properties, properties)) | |
| merged_properties.update(aliased_properties) | |
| for prop_name in (merged_properties.keys()): | |
| if prop_name in self.selection: | |
| raise Exception('The property {} has already been selected'.format(prop_name)) | |
| new_selection = self.selection.copy() | |
| new_selection.update(merged_properties) | |
| return self._copy(selection=new_selection) | |
| def _compile_selection(self): | |
| if not self.selection_compiled: | |
| self.selection_compiled = dict((name, jmespath.compile(path)) | |
| for name, path in self.selection.items()) | |
| def _apply_selection(self, json_obj): | |
| if not self.selection: | |
| return json_obj | |
| # This is mainly for testing purposes. | |
| # For performance reasons the selection should be compiled | |
| # outside of this function. | |
| if not self.selection_compiled: | |
| self._compile_selection() | |
| return dict((name, path.search(json_obj)) | |
| for name, path in self.selection_compiled.items()) | |
| def _sanitize_dimension(self, v): | |
| """Sanitize the given string by replacing illegal characters | |
| with underscores. | |
| For String conditions, we should pre-sanitize so that users of | |
| the `where` function do not need to know about the nuances of how | |
| S3 dimensions are sanitized during ingestion. | |
| See https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/moz_telemetry/s3.lua#L167 | |
| :param v: a string value that should be sanitized. | |
| """ | |
| return re.sub(SANITIZE_PATTERN, "_", v) | |
| def where(self, **kwargs): | |
| """Return a new Dataset refined using the given condition | |
| :param kwargs: a map of `dimension` => `condition` to filter the elements | |
| of the dataset. `condition` can either be an exact value or a | |
| callable returning a boolean value. If `condition` is a value, it is | |
| converted to a string, then sanitized. If `condition` is a callable, note that it will | |
| be passed sanitized values -- i.e., characters outside [a-zA-Z0-9_.] are converted | |
| to `_`. | |
| """ | |
| clauses = copy(self.clauses) | |
| for dimension, condition in kwargs.items(): | |
| if dimension in self.clauses: | |
| raise Exception('There should be only one clause for {}'.format(dimension)) | |
| if dimension not in self.schema: | |
| raise Exception('The dimension {} doesn\'t exist'.format(dimension)) | |
| if isfunction(condition) or isinstance(condition, functools.partial): | |
| clauses[dimension] = condition | |
| else: | |
| clauses[dimension] = functools.partial((lambda x, y: x == y), self._sanitize_dimension(str(condition))) | |
| return self._copy(clauses=clauses) | |
| def _scan(self, dimensions, prefixes, clauses, executor): | |
| if not dimensions or not clauses: | |
| return prefixes | |
| else: | |
| dimension = dimensions[0] | |
| clause = clauses.get(dimension) | |
| matched = executor.map(self.store.list_folders, prefixes) | |
| # Using chain to flatten the results of map | |
| matched = chain(*matched) | |
| if clause: | |
| matched = [x for x in matched if clause(x.strip('/').split('/')[-1])] | |
| del clauses[dimension] | |
| return self._scan(dimensions[1:], matched, clauses, executor) | |
| def summaries(self, sc, limit=None): | |
| """Summary of the files contained in the current dataset | |
| Every item in the summary is a dict containing a key name and the corresponding size of | |
| the key item in bytes, e.g.:: | |
| {'key': 'full/path/to/my/key', 'size': 200} | |
| :param limit: Max number of objects to retrieve | |
| :return: An iterable of summaries | |
| """ | |
| clauses = copy(self.clauses) | |
| schema = self.schema | |
| if self.prefix: | |
| schema = ['prefix'] + schema | |
| # Add a clause for the prefix that always returns True, in case | |
| # the output is not filtered at all (so that we do a scan/filter | |
| # on the prefix directory) | |
| clauses['prefix'] = lambda x: True | |
| with futures.ThreadPoolExecutor(self.max_concurrency) as executor: | |
| scanned = self._scan(schema, [self.prefix], clauses, executor) | |
| keys = sc.parallelize(scanned).flatMap(self.store.list_keys) | |
| return keys.take(limit) if limit else keys.collect() | |
| def records(self, sc, group_by='greedy', limit=None, sample=1, seed=42, decode=None, summaries=None): | |
| """Retrieve the elements of a Dataset | |
| :param sc: a SparkContext object | |
| :param group_by: specifies a partition strategy for the objects | |
| :param limit: maximum number of objects to retrieve | |
| :param decode: an optional transformation to apply to the objects retrieved | |
| :param sample: percentage of results to return. Useful to return a sample | |
| of the dataset. This parameter is ignored when `limit` is set. | |
| :param seed: initialize internal state of the random number generator (42 by default). | |
| This is used to make the dataset sampling reproducible. It can be set to None to obtain | |
| different samples. | |
| :param summaries: an iterable containing a summary for each item in the dataset. If None, | |
| it will computed calling the summaries dataset. | |
| :return: a Spark rdd containing the elements retrieved | |
| """ | |
| decode = decode or message_parser.parse_heka_message | |
| summaries = summaries or self.summaries(sc, limit) | |
| # Calculate the sample if summaries is not empty and limit is not set | |
| if summaries and limit is None and sample != 1: | |
| if sample < 0 or sample > 1: | |
| raise ValueError('sample must be between 0 and 1') | |
| print( | |
| "WARNING: THIS IS NOT A REPRESENTATIVE SAMPLE.\n" | |
| "This 'sampling' is based on s3 files and is highly\n" | |
| "susceptible to skew. Use only for quicker performance\n" | |
| "while prototyping." | |
| ) | |
| # We want this sample to be reproducible. | |
| # See https://bugzilla.mozilla.org/show_bug.cgi?id=1318681 | |
| seed_state = random.getstate() | |
| try: | |
| random.seed(seed) | |
| summaries = random.sample(summaries, | |
| int(len(summaries) * sample)) | |
| finally: | |
| random.setstate(seed_state) | |
| # Obtain size in MB | |
| total_size = functools.reduce(lambda acc, item: acc + item['size'], summaries, 0) | |
| total_size_mb = total_size / float(1 << 20) | |
| print("fetching %.5fMB in %s files..." % (total_size_mb, len(summaries))) | |
| if group_by == 'equal_size': | |
| groups = _group_by_equal_size(summaries, 10*sc.defaultParallelism) | |
| elif group_by == 'greedy': | |
| groups = _group_by_size_greedy(summaries, 10*sc.defaultParallelism) | |
| else: | |
| raise Exception("group_by specification is invalid") | |
| self._compile_selection() | |
| keys = ( | |
| sc.parallelize(groups, len(groups)) | |
| .flatMap(lambda x: x) | |
| .map(lambda x: x['key']) | |
| ) | |
| file_handles = keys.map(self.store.get_key) | |
| # decode(fp: file-object) -> list[dict] | |
| data = file_handles.flatMap(decode) | |
| return data.map(self._apply_selection) | |
| def dataframe(self, spark, group_by='greedy', limit=None, sample=1, seed=42, decode=None, summaries=None, schema=None, table_name=None): | |
| """Convert RDD returned from records function to a dataframe | |
| :param spark: a SparkSession object | |
| :param group_by: specifies a paritition strategy for the objects | |
| :param limit: maximum number of objects to retrieve | |
| :param decode: an optional transformation to apply to the objects retrieved | |
| :param sample: percentage of results to return. Useful to return a sample | |
| of the dataset. This parameter is ignored when 'limit' is set. | |
| :param seed: initialize internal state of the random number generator (42 by default). | |
| This is used to make the dataset sampling reproducible. It an be set to None to obtain | |
| different samples. | |
| :param summaries: an iterable containing the summary for each item in the dataset. If None, it | |
| will compute calling the summaries dataset. | |
| :param schema: a Spark schema that overrides automatic conversion to a dataframe | |
| :param table_name: allows resulting dataframe to easily be queried using SparkSQL | |
| :return: a Spark DataFrame | |
| """ | |
| rdd = self.records(spark.sparkContext, group_by, limit, sample, seed, decode, summaries) | |
| if not schema: | |
| df = rdd.map(lambda d: Row(**d)).toDF() | |
| else: | |
| df = spark.createDataFrame(rdd, schema=schema) | |
| if table_name: | |
| df.createOrReplaceTempView(table_name) | |
| return df | |
| @staticmethod | |
| def from_source(source_name): | |
| """Create a Dataset configured for the given source_name | |
| This is particularly convenient when the user doesn't know | |
| the list of dimensions or the bucket name, but only the source name. | |
| Usage example:: | |
| records = Dataset.from_source('telemetry').where( | |
| docType='main', | |
| submissionDate='20160701', | |
| appUpdateChannel='nightly' | |
| ) | |
| """ | |
| meta_bucket = 'net-mozaws-prod-us-west-2-pipeline-metadata' | |
| store = S3Store(meta_bucket) | |
| try: | |
| source = json.loads(store.get_key('sources.json').read().decode('utf-8'))[source_name] | |
| except KeyError: | |
| raise Exception('Unknown source {}'.format(source_name)) | |
| schema = store.get_key('{}/schema.json'.format(source['metadata_prefix'])).read().decode('utf-8') | |
| dimensions = [f['field_name'] for f in json.loads(schema)['dimensions']] | |
| return Dataset(source['bucket'], dimensions, prefix=source['prefix']) |