diff --git a/splitio/client/config.py b/splitio/client/config.py index 32992174..6b40a2c7 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -6,6 +6,7 @@ _LOGGER = logging.getLogger(__name__) +DEFAULT_DATA_SAMPLING = 1 DEFAULT_CONFIG = { @@ -52,6 +53,7 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split'), 'preforkedInitialization': False, + 'dataSampling': DEFAULT_DATA_SAMPLING, } diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 83f1004e..40a44dfd 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -8,7 +8,7 @@ from splitio.client.client import Client from splitio.client import input_validator from splitio.client.manager import SplitManager -from splitio.client.config import sanitize as sanitize_config +from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper from splitio.engine.impressions import Manager as ImpressionsManager @@ -53,6 +53,7 @@ _LOGGER = logging.getLogger(__name__) _INSTANTIATED_FACTORIES = Counter() _INSTANTIATED_FACTORIES_LOCK = threading.RLock() +_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10% class Status(Enum): @@ -387,12 +388,18 @@ def _build_redis_factory(api_key, cfg): 'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata), 'events': RedisEventsStorage(redis_adapter, sdk_metadata), } + data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING) + if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED: + _LOGGER.warning("dataSampling cannot be less than %f, defaulting to minimum", + _MIN_DEFAULT_DATA_SAMPLING_ALLOWED) + data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED recorder = PipelinedRecorder( redis_adapter.pipeline, ImpressionsManager(cfg['impressionsMode'], False, _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)), storages['events'], storages['impressions'], + data_sampling, ) return SplitFactory( api_key, diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index cbb28f14..0f818037 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -1,6 +1,10 @@ """Stats Recorder.""" import abc import logging +import random + + +from splitio.client.config import DEFAULT_DATA_SAMPLING _LOGGER = logging.getLogger(__name__) @@ -83,7 +87,8 @@ def record_track_stats(self, event): class PipelinedRecorder(StatsRecorder): """PipelinedRecorder class.""" - def __init__(self, pipe, impressions_manager, event_storage, impression_storage): + def __init__(self, pipe, impressions_manager, event_storage, + impression_storage, data_sampling=DEFAULT_DATA_SAMPLING): """ Class constructor. @@ -95,11 +100,14 @@ def __init__(self, pipe, impressions_manager, event_storage, impression_storage) :type event_storage: splitio.storage.EventStorage :param impression_storage: impression storage instance :type impression_storage: splitio.storage.redis.RedisImpressionsStorage + :param data_sampling: data sampling factor + :type data_sampling: number """ self._make_pipe = pipe self._impressions_manager = impressions_manager self._event_sotrage = event_storage self._impression_storage = impression_storage + self._data_sampling = data_sampling def record_treatment_stats(self, impressions, latency, operation): """ @@ -113,11 +121,21 @@ def record_treatment_stats(self, impressions, latency, operation): :type operation: str """ try: + # TODO @matias.melograno + # Changing logic until TelemetryV2 released to avoid using pipelined operations + # Deprecated Old Telemetry + if self._data_sampling < DEFAULT_DATA_SAMPLING: + rnumber = random.uniform(0, 1) + if self._data_sampling < rnumber: + return impressions = self._impressions_manager.process_impressions(impressions) - pipe = self._make_pipe() - self._impression_storage.add_impressions_to_pipe(impressions, pipe) + # pipe = self._make_pipe() + # self._impression_storage.add_impressions_to_pipe(impressions, pipe) # self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe) - result = pipe.execute() + # result = pipe.execute() + # if len(result) == 2: + # self._impression_storage.expire_key(result[0], len(impressions)) + result = self._impression_storage.put(impressions) if len(result) == 2: self._impression_storage.expire_key(result[0], len(impressions)) except Exception: # pylint: disable=broad-except diff --git a/tests/recorder/test_recorder.py b/tests/recorder/test_recorder.py index 2e32c3fb..5e559f82 100644 --- a/tests/recorder/test_recorder.py +++ b/tests/recorder/test_recorder.py @@ -36,8 +36,35 @@ def test_pipelined_recorder(self, mocker): impmanager = mocker.Mock(spec=ImpressionsManager) impmanager.process_impressions.return_value = impressions event = mocker.Mock(spec=EventStorage) - impression = mocker.Mock(spec=ImpressionPipelinedStorage) + impression = mocker.Mock(spec=ImpressionStorage) recorder = PipelinedRecorder(redis, impmanager, event, impression) recorder.record_treatment_stats(impressions, 1, 'some') + assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions + + # TODO @matias.melograno Commented until we implement TelemetryV2 + # assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 + + + def test_sampled_recorder(self, mocker): + impressions = [ + Impression('k1', 'f1', 'on', 'l1', 123, None, None), + Impression('k1', 'f2', 'on', 'l1', 123, None, None) + ] + redis = mocker.Mock(spec=RedisAdapter) + impmanager = mocker.Mock(spec=ImpressionsManager) + impmanager.process_impressions.return_value = impressions + event = mocker.Mock(spec=EventStorage) + impression = mocker.Mock(spec=ImpressionStorage) + recorder = PipelinedRecorder(redis, impmanager, event, impression, 0.5) + + def put(x): + return + + recorder._impression_storage.put.side_effect = put - assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions + for _ in range(100): + recorder.record_treatment_stats(impressions, 1, 'some') + print(recorder._impression_storage.put.call_count) + assert recorder._impression_storage.put.call_count < 80