Skip to content
This repository was archived by the owner on Nov 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions moztelemetry/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import functools
import json
import random
import re
import types
from copy import copy
from inspect import isfunction
Expand All @@ -19,6 +20,7 @@
from .store import S3Store

MAX_CONCURRENCY = int(cpu_count() * 1.5)
SANITIZE_PATTERN = re.compile("[^a-zA-Z0-9_.]")


def _group_by_size_greedy(obj_list, tot_groups):
Expand Down Expand Up @@ -154,12 +156,27 @@ def _apply_selection(self, json_obj):
return dict((name, path.search(json_obj))
for name, path in self.selection_compiled.items())

def _sanitize_dimension(self, v):
"""Sanitize the given string by replacing illegal characters
with underscores.

For String conditions, we should pre-sanitize so that users of
the `where` function do not need to know about the nuances of how
S3 dimensions are sanitized during ingestion.

See https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/moz_telemetry/s3.lua#L167

:param v: a string value that should be sanitized.
"""
return re.sub(SANITIZE_PATTERN, "_", v)

def where(self, **kwargs):
"""Return a new Dataset refined using the given condition

:param kwargs: a map of `dimension` => `condition` to filter the elements
of the dataset. `condition` can either be an exact value or a
callable returning a boolean value.
callable returning a boolean value. If `condition` is a value, it is
converted to a string, then sanitized.
"""
clauses = copy(self.clauses)
for dimension, condition in kwargs.items():
Expand All @@ -170,7 +187,7 @@ def where(self, **kwargs):
if isfunction(condition) or isinstance(condition, functools.partial):
clauses[dimension] = condition
else:
clauses[dimension] = functools.partial((lambda x, y: x == y), condition)
clauses[dimension] = functools.partial((lambda x, y: x == y), self._sanitize_dimension(str(condition)))
return Dataset(self.bucket, self.schema, store=self.store,
prefix=self.prefix, clauses=clauses, selection=self.selection)

Expand Down
22 changes: 18 additions & 4 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def test_apply_selection():

def test_where_exact_match():
dataset = Dataset('test-bucket', ['dim1', 'dim2'], prefix='prefix/')
new_dataset = dataset.where(dim1='my-value')
new_dataset = dataset.where(dim1='myvalue')
assert new_dataset is not dataset
assert new_dataset.clauses.keys() == ['dim1']
condition = new_dataset.clauses['dim1']
assert condition('my-value')
assert condition('myvalue')


def test_where_wrong_dimension():
Expand Down Expand Up @@ -169,11 +169,11 @@ def test_scan_multiple_where_params(spark_context):

def test_scan_multiple_params():
dataset = Dataset('test-bucket', ['dim1', 'dim2'], prefix='prefix/')
new_dataset = dataset.where(dim1='my-value')
new_dataset = dataset.where(dim1='myvalue')
assert new_dataset is not dataset
assert new_dataset.clauses.keys() == ['dim1']
condition = new_dataset.clauses['dim1']
assert condition('my-value')
assert condition('myvalue')


def test_summaries(spark_context):
Expand Down Expand Up @@ -396,3 +396,17 @@ def test_prefix_slash(spark_context):
summaries_filtered = dataset.where(dim1='dir1').summaries(spark_context)
assert len(summaries_filtered) == 1
assert summaries_filtered[0]['key'] == 'a/b/dir1/subdir1/key1'


def test_sanitized_dimensions(spark_context):
bucket_name = 'test-bucket'
store = InMemoryStore(bucket_name)
store.store['dir_1/subdir1/key1'] = 'value1'
store.store['dir_1/subdir2/key2'] = 'value2'
store.store['dir_2/subdir3/key3'] = 'value3'
store.store['dir_3/subdir4/key4'] = 'value4'

dataset = Dataset(bucket_name, ['dim1', 'dim2'], store=store).where(dim1="dir-1")

summaries = dataset.summaries(spark_context)
assert len(summaries) == 2