diff --git a/moztelemetry/dataset.py b/moztelemetry/dataset.py index 281bb81..4deac43 100644 --- a/moztelemetry/dataset.py +++ b/moztelemetry/dataset.py @@ -6,6 +6,7 @@ import functools import json import random +import re import types from copy import copy from inspect import isfunction @@ -19,6 +20,7 @@ from .store import S3Store MAX_CONCURRENCY = int(cpu_count() * 1.5) +SANITIZE_PATTERN = re.compile("[^a-zA-Z0-9_.]") def _group_by_size_greedy(obj_list, tot_groups): @@ -154,12 +156,27 @@ def _apply_selection(self, json_obj): return dict((name, path.search(json_obj)) for name, path in self.selection_compiled.items()) + def _sanitize_dimension(self, v): + """Sanitize the given string by replacing illegal characters + with underscores. + + For String conditions, we should pre-sanitize so that users of + the `where` function do not need to know about the nuances of how + S3 dimensions are sanitized during ingestion. + + See https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/moz_telemetry/s3.lua#L167 + + :param v: a string value that should be sanitized. + """ + return re.sub(SANITIZE_PATTERN, "_", v) + def where(self, **kwargs): """Return a new Dataset refined using the given condition :param kwargs: a map of `dimension` => `condition` to filter the elements of the dataset. `condition` can either be an exact value or a - callable returning a boolean value. + callable returning a boolean value. If `condition` is a value, it is + converted to a string, then sanitized. """ clauses = copy(self.clauses) for dimension, condition in kwargs.items(): @@ -170,7 +187,7 @@ def where(self, **kwargs): if isfunction(condition) or isinstance(condition, functools.partial): clauses[dimension] = condition else: - clauses[dimension] = functools.partial((lambda x, y: x == y), condition) + clauses[dimension] = functools.partial((lambda x, y: x == y), self._sanitize_dimension(str(condition))) return Dataset(self.bucket, self.schema, store=self.store, prefix=self.prefix, clauses=clauses, selection=self.selection) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 21e5a39..ef14dfe 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -91,11 +91,11 @@ def test_apply_selection(): def test_where_exact_match(): dataset = Dataset('test-bucket', ['dim1', 'dim2'], prefix='prefix/') - new_dataset = dataset.where(dim1='my-value') + new_dataset = dataset.where(dim1='myvalue') assert new_dataset is not dataset assert new_dataset.clauses.keys() == ['dim1'] condition = new_dataset.clauses['dim1'] - assert condition('my-value') + assert condition('myvalue') def test_where_wrong_dimension(): @@ -169,11 +169,11 @@ def test_scan_multiple_where_params(spark_context): def test_scan_multiple_params(): dataset = Dataset('test-bucket', ['dim1', 'dim2'], prefix='prefix/') - new_dataset = dataset.where(dim1='my-value') + new_dataset = dataset.where(dim1='myvalue') assert new_dataset is not dataset assert new_dataset.clauses.keys() == ['dim1'] condition = new_dataset.clauses['dim1'] - assert condition('my-value') + assert condition('myvalue') def test_summaries(spark_context): @@ -396,3 +396,17 @@ def test_prefix_slash(spark_context): summaries_filtered = dataset.where(dim1='dir1').summaries(spark_context) assert len(summaries_filtered) == 1 assert summaries_filtered[0]['key'] == 'a/b/dir1/subdir1/key1' + + +def test_sanitized_dimensions(spark_context): + bucket_name = 'test-bucket' + store = InMemoryStore(bucket_name) + store.store['dir_1/subdir1/key1'] = 'value1' + store.store['dir_1/subdir2/key2'] = 'value2' + store.store['dir_2/subdir3/key3'] = 'value3' + store.store['dir_3/subdir4/key4'] = 'value4' + + dataset = Dataset(bucket_name, ['dim1', 'dim2'], store=store).where(dim1="dir-1") + + summaries = dataset.summaries(spark_context) + assert len(summaries) == 2