From 6b485b4ac44d27d20ebb7c00d8056afd45cdc8c7 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 7 Dec 2021 10:28:55 -0300 Subject: [PATCH 1/3] throtling imps --- splitio/client/config.py | 1 + splitio/recorder/recorder.py | 26 ++++++++++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/splitio/client/config.py b/splitio/client/config.py index 32992174..44839e53 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -52,6 +52,7 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split'), 'preforkedInitialization': False, + 'dataThrotling': 1, } diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index c009e1eb..19d6a9c4 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -1,9 +1,11 @@ """Stats Recorder.""" import abc import logging +import random _LOGGER = logging.getLogger(__name__) +_MIN_THROTLING = 1 class StatsRecorder(object, metaclass=abc.ABCMeta): @@ -87,7 +89,7 @@ def record_track_stats(self, event): class PipelinedRecorder(StatsRecorder): """PipelinedRecorder class.""" - def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, impression_storage): + def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, impression_storage, data_throtling): """ Class constructor. @@ -107,6 +109,7 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, self._telemetry_storage = telemetry_storage self._event_sotrage = event_storage self._impression_storage = impression_storage + self._data_trothling = data_throtling def record_treatment_stats(self, impressions, latency, operation): """ @@ -120,15 +123,22 @@ def record_treatment_stats(self, impressions, latency, operation): :type operation: str """ try: + # Changing logic until TelemetryV2 released to avoid using pipelined operations + # Deprecated Old Telemetry + if self._data_trothling < _MIN_THROTLING: + rnumber = random.uniform(0, 1) + if self._data_trothling > rnumber: + return impressions = self._impressions_manager.process_impressions(impressions) - pipe = self._make_pipe() - self._impression_storage.add_impressions_to_pipe(impressions, pipe) - self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe) - result = pipe.execute() - if len(result) == 2: - self._impression_storage.expire_key(result[0], len(impressions)) + self._impression_storage.put(impressions) + # pipe = self._make_pipe() + # self._impression_storage.add_impressions_to_pipe(impressions, pipe) + # self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe) + # result = pipe.execute() + # if len(result) == 2: + # self._impression_storage.expire_key(result[0], len(impressions)) except Exception: # pylint: disable=broad-except - _LOGGER.error('Error recording impressions and metrics') + _LOGGER.error('Error recording impressions') _LOGGER.debug('Error: ', exc_info=True) def record_track_stats(self, event): From b4ced4fd8efd121deceafa2b77733ae737619337 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 7 Dec 2021 15:52:38 -0300 Subject: [PATCH 2/3] data_throttlong for imp --- splitio/client/config.py | 3 ++- splitio/client/factory.py | 9 ++++++++- splitio/recorder/recorder.py | 16 +++++++++++----- tests/recorder/test_recorder.py | 10 ++++++---- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/splitio/client/config.py b/splitio/client/config.py index 44839e53..a4e776dc 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -6,6 +6,7 @@ _LOGGER = logging.getLogger(__name__) +DEFAULT_DATA_THROTTLING = 1 DEFAULT_CONFIG = { @@ -52,7 +53,7 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split'), 'preforkedInitialization': False, - 'dataThrotling': 1, + 'dataThrottling': DEFAULT_DATA_THROTTLING, } diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 3c2c7cc4..344ecf48 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -8,7 +8,7 @@ from splitio.client.client import Client from splitio.client import input_validator from splitio.client.manager import SplitManager -from splitio.client.config import sanitize as sanitize_config +from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_THROTTLING from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper from splitio.engine.impressions import Manager as ImpressionsManager @@ -57,6 +57,7 @@ _LOGGER = logging.getLogger(__name__) _INSTANTIATED_FACTORIES = Counter() _INSTANTIATED_FACTORIES_LOCK = threading.RLock() +_MIN_DEFAULT_DATA_THROTTLING_ALLOWED = 0.1 # 10% class Status(Enum): @@ -401,6 +402,11 @@ def _build_redis_factory(api_key, cfg): 'events': RedisEventsStorage(redis_adapter, sdk_metadata), 'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata) } + data_throttling = cfg.get('dataThrottling', DEFAULT_DATA_THROTTLING) + if data_throttling < _MIN_DEFAULT_DATA_THROTTLING_ALLOWED: + _LOGGER.warning("dataThrottling cannot be less than %f, defaulting to minimum", + _MIN_DEFAULT_DATA_THROTTLING_ALLOWED) + data_throttling = _MIN_DEFAULT_DATA_THROTTLING_ALLOWED recorder = PipelinedRecorder( redis_adapter.pipeline, ImpressionsManager(cfg['impressionsMode'], False, @@ -408,6 +414,7 @@ def _build_redis_factory(api_key, cfg): storages['telemetry'], storages['events'], storages['impressions'], + data_throttling, ) return SplitFactory( api_key, diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index 19d6a9c4..9abebde5 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -4,8 +4,10 @@ import random +from splitio.client.config import DEFAULT_DATA_THROTTLING + + _LOGGER = logging.getLogger(__name__) -_MIN_THROTLING = 1 class StatsRecorder(object, metaclass=abc.ABCMeta): @@ -89,7 +91,8 @@ def record_track_stats(self, event): class PipelinedRecorder(StatsRecorder): """PipelinedRecorder class.""" - def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, impression_storage, data_throtling): + def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, + impression_storage, data_throttling=DEFAULT_DATA_THROTTLING): """ Class constructor. @@ -103,13 +106,15 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, :type event_storage: splitio.storage.EventStorage :param impression_storage: impression storage instance :type impression_storage: splitio.storage.redis.RedisImpressionsStorage + :param data_throttling: data throttling factor + :type data_throttling: number """ self._make_pipe = pipe self._impressions_manager = impressions_manager self._telemetry_storage = telemetry_storage self._event_sotrage = event_storage self._impression_storage = impression_storage - self._data_trothling = data_throtling + self._data_throttling = data_throttling def record_treatment_stats(self, impressions, latency, operation): """ @@ -123,11 +128,12 @@ def record_treatment_stats(self, impressions, latency, operation): :type operation: str """ try: + # TODO @matias.melograno # Changing logic until TelemetryV2 released to avoid using pipelined operations # Deprecated Old Telemetry - if self._data_trothling < _MIN_THROTLING: + if self._data_throttling < DEFAULT_DATA_THROTTLING: rnumber = random.uniform(0, 1) - if self._data_trothling > rnumber: + if self._data_throttling > rnumber: return impressions = self._impressions_manager.process_impressions(impressions) self._impression_storage.put(impressions) diff --git a/tests/recorder/test_recorder.py b/tests/recorder/test_recorder.py index 9212b307..770597a3 100644 --- a/tests/recorder/test_recorder.py +++ b/tests/recorder/test_recorder.py @@ -39,10 +39,12 @@ def test_pipelined_recorder(self, mocker): impmanager.process_impressions.return_value = impressions telemetry = mocker.Mock(spec=TelemetryPipelinedStorage) event = mocker.Mock(spec=EventStorage) - impression = mocker.Mock(spec=ImpressionPipelinedStorage) + impression = mocker.Mock(spec=ImpressionStorage) recorder = PipelinedRecorder(redis, impmanager, telemetry, event, impression) recorder.record_treatment_stats(impressions, 1, 'some') - assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions - assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' - assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 + # TODO @matias.melograno Commented until we implement TelemetryV2 + # assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 + assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions From c867c774a06edbc674533fc65bd596a71b05762c Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 7 Dec 2021 18:57:15 -0300 Subject: [PATCH 3/3] renamed and added test --- splitio/client/config.py | 4 ++-- splitio/client/factory.py | 16 ++++++++-------- splitio/recorder/recorder.py | 14 +++++++------- tests/recorder/test_recorder.py | 30 ++++++++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 17 deletions(-) diff --git a/splitio/client/config.py b/splitio/client/config.py index a4e776dc..6b40a2c7 100644 --- a/splitio/client/config.py +++ b/splitio/client/config.py @@ -6,7 +6,7 @@ _LOGGER = logging.getLogger(__name__) -DEFAULT_DATA_THROTTLING = 1 +DEFAULT_DATA_SAMPLING = 1 DEFAULT_CONFIG = { @@ -53,7 +53,7 @@ 'machineIp': None, 'splitFile': os.path.join(os.path.expanduser('~'), '.split'), 'preforkedInitialization': False, - 'dataThrottling': DEFAULT_DATA_THROTTLING, + 'dataSampling': DEFAULT_DATA_SAMPLING, } diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 344ecf48..eea1f9f7 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -8,7 +8,7 @@ from splitio.client.client import Client from splitio.client import input_validator from splitio.client.manager import SplitManager -from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_THROTTLING +from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING from splitio.client import util from splitio.client.listener import ImpressionListenerWrapper from splitio.engine.impressions import Manager as ImpressionsManager @@ -57,7 +57,7 @@ _LOGGER = logging.getLogger(__name__) _INSTANTIATED_FACTORIES = Counter() _INSTANTIATED_FACTORIES_LOCK = threading.RLock() -_MIN_DEFAULT_DATA_THROTTLING_ALLOWED = 0.1 # 10% +_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10% class Status(Enum): @@ -402,11 +402,11 @@ def _build_redis_factory(api_key, cfg): 'events': RedisEventsStorage(redis_adapter, sdk_metadata), 'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata) } - data_throttling = cfg.get('dataThrottling', DEFAULT_DATA_THROTTLING) - if data_throttling < _MIN_DEFAULT_DATA_THROTTLING_ALLOWED: - _LOGGER.warning("dataThrottling cannot be less than %f, defaulting to minimum", - _MIN_DEFAULT_DATA_THROTTLING_ALLOWED) - data_throttling = _MIN_DEFAULT_DATA_THROTTLING_ALLOWED + data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING) + if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED: + _LOGGER.warning("dataSampling cannot be less than %f, defaulting to minimum", + _MIN_DEFAULT_DATA_SAMPLING_ALLOWED) + data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED recorder = PipelinedRecorder( redis_adapter.pipeline, ImpressionsManager(cfg['impressionsMode'], False, @@ -414,7 +414,7 @@ def _build_redis_factory(api_key, cfg): storages['telemetry'], storages['events'], storages['impressions'], - data_throttling, + data_sampling, ) return SplitFactory( api_key, diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index 9abebde5..b45243b4 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -4,7 +4,7 @@ import random -from splitio.client.config import DEFAULT_DATA_THROTTLING +from splitio.client.config import DEFAULT_DATA_SAMPLING _LOGGER = logging.getLogger(__name__) @@ -92,7 +92,7 @@ class PipelinedRecorder(StatsRecorder): """PipelinedRecorder class.""" def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, - impression_storage, data_throttling=DEFAULT_DATA_THROTTLING): + impression_storage, data_sampling=DEFAULT_DATA_SAMPLING): """ Class constructor. @@ -106,15 +106,15 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, :type event_storage: splitio.storage.EventStorage :param impression_storage: impression storage instance :type impression_storage: splitio.storage.redis.RedisImpressionsStorage - :param data_throttling: data throttling factor - :type data_throttling: number + :param data_sampling: data sampling factor + :type data_sampling: number """ self._make_pipe = pipe self._impressions_manager = impressions_manager self._telemetry_storage = telemetry_storage self._event_sotrage = event_storage self._impression_storage = impression_storage - self._data_throttling = data_throttling + self._data_sampling = data_sampling def record_treatment_stats(self, impressions, latency, operation): """ @@ -131,9 +131,9 @@ def record_treatment_stats(self, impressions, latency, operation): # TODO @matias.melograno # Changing logic until TelemetryV2 released to avoid using pipelined operations # Deprecated Old Telemetry - if self._data_throttling < DEFAULT_DATA_THROTTLING: + if self._data_sampling < DEFAULT_DATA_SAMPLING: rnumber = random.uniform(0, 1) - if self._data_throttling > rnumber: + if self._data_sampling < rnumber: return impressions = self._impressions_manager.process_impressions(impressions) self._impression_storage.put(impressions) diff --git a/tests/recorder/test_recorder.py b/tests/recorder/test_recorder.py index 770597a3..bd7017d7 100644 --- a/tests/recorder/test_recorder.py +++ b/tests/recorder/test_recorder.py @@ -48,3 +48,33 @@ def test_pipelined_recorder(self, mocker): # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions + + def test_sampled_recorder(self, mocker): + impressions = [ + Impression('k1', 'f1', 'on', 'l1', 123, None, None), + Impression('k1', 'f2', 'on', 'l1', 123, None, None) + ] + redis = mocker.Mock(spec=RedisAdapter) + impmanager = mocker.Mock(spec=ImpressionsManager) + impmanager.process_impressions.return_value = impressions + telemetry = mocker.Mock(spec=TelemetryPipelinedStorage) + event = mocker.Mock(spec=EventStorage) + impression = mocker.Mock(spec=ImpressionStorage) + recorder = PipelinedRecorder(redis, impmanager, telemetry, event, impression, 0.5) + + def put(x): + return + + recorder._impression_storage.put.side_effect = put + + for _ in range(100): + recorder.record_treatment_stats(impressions, 1, 'some') + print(recorder._impression_storage.put.call_count) + assert recorder._impression_storage.put.call_count < 80 + + + # TODO @matias.melograno Commented until we implement TelemetryV2 + # assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' + # assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 + # assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions