Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions splitio/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


_LOGGER = logging.getLogger(__name__)
DEFAULT_DATA_SAMPLING = 1


DEFAULT_CONFIG = {
Expand Down Expand Up @@ -52,6 +53,7 @@
'machineIp': None,
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
'preforkedInitialization': False,
'dataSampling': DEFAULT_DATA_SAMPLING,
}


Expand Down
9 changes: 8 additions & 1 deletion splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from splitio.client.client import Client
from splitio.client import input_validator
from splitio.client.manager import SplitManager
from splitio.client.config import sanitize as sanitize_config
from splitio.client.config import sanitize as sanitize_config, DEFAULT_DATA_SAMPLING
from splitio.client import util
from splitio.client.listener import ImpressionListenerWrapper
from splitio.engine.impressions import Manager as ImpressionsManager
Expand Down Expand Up @@ -53,6 +53,7 @@
_LOGGER = logging.getLogger(__name__)
_INSTANTIATED_FACTORIES = Counter()
_INSTANTIATED_FACTORIES_LOCK = threading.RLock()
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED = 0.1 # 10%


class Status(Enum):
Expand Down Expand Up @@ -387,12 +388,18 @@ def _build_redis_factory(api_key, cfg):
'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata),
'events': RedisEventsStorage(redis_adapter, sdk_metadata),
}
data_sampling = cfg.get('dataSampling', DEFAULT_DATA_SAMPLING)
if data_sampling < _MIN_DEFAULT_DATA_SAMPLING_ALLOWED:
_LOGGER.warning("dataSampling cannot be less than %f, defaulting to minimum",
_MIN_DEFAULT_DATA_SAMPLING_ALLOWED)
data_sampling = _MIN_DEFAULT_DATA_SAMPLING_ALLOWED
recorder = PipelinedRecorder(
redis_adapter.pipeline,
ImpressionsManager(cfg['impressionsMode'], False,
_wrap_impression_listener(cfg['impressionListener'], sdk_metadata)),
storages['events'],
storages['impressions'],
data_sampling,
)
return SplitFactory(
api_key,
Expand Down
26 changes: 22 additions & 4 deletions splitio/recorder/recorder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Stats Recorder."""
import abc
import logging
import random


from splitio.client.config import DEFAULT_DATA_SAMPLING


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,7 +87,8 @@ def record_track_stats(self, event):
class PipelinedRecorder(StatsRecorder):
"""PipelinedRecorder class."""

def __init__(self, pipe, impressions_manager, event_storage, impression_storage):
def __init__(self, pipe, impressions_manager, event_storage,
impression_storage, data_sampling=DEFAULT_DATA_SAMPLING):
"""
Class constructor.

Expand All @@ -95,11 +100,14 @@ def __init__(self, pipe, impressions_manager, event_storage, impression_storage)
:type event_storage: splitio.storage.EventStorage
:param impression_storage: impression storage instance
:type impression_storage: splitio.storage.redis.RedisImpressionsStorage
:param data_sampling: data sampling factor
:type data_sampling: number
"""
self._make_pipe = pipe
self._impressions_manager = impressions_manager
self._event_sotrage = event_storage
self._impression_storage = impression_storage
self._data_sampling = data_sampling

def record_treatment_stats(self, impressions, latency, operation):
"""
Expand All @@ -113,11 +121,21 @@ def record_treatment_stats(self, impressions, latency, operation):
:type operation: str
"""
try:
# TODO @matias.melograno
# Changing logic until TelemetryV2 released to avoid using pipelined operations
# Deprecated Old Telemetry
if self._data_sampling < DEFAULT_DATA_SAMPLING:
rnumber = random.uniform(0, 1)
if self._data_sampling < rnumber:
return
impressions = self._impressions_manager.process_impressions(impressions)
pipe = self._make_pipe()
self._impression_storage.add_impressions_to_pipe(impressions, pipe)
# pipe = self._make_pipe()
# self._impression_storage.add_impressions_to_pipe(impressions, pipe)
# self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe)
result = pipe.execute()
# result = pipe.execute()
# if len(result) == 2:
# self._impression_storage.expire_key(result[0], len(impressions))
result = self._impression_storage.put(impressions)
if len(result) == 2:
self._impression_storage.expire_key(result[0], len(impressions))
except Exception: # pylint: disable=broad-except
Expand Down
31 changes: 29 additions & 2 deletions tests/recorder/test_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,35 @@ def test_pipelined_recorder(self, mocker):
impmanager = mocker.Mock(spec=ImpressionsManager)
impmanager.process_impressions.return_value = impressions
event = mocker.Mock(spec=EventStorage)
impression = mocker.Mock(spec=ImpressionPipelinedStorage)
impression = mocker.Mock(spec=ImpressionStorage)
recorder = PipelinedRecorder(redis, impmanager, event, impression)
recorder.record_treatment_stats(impressions, 1, 'some')
assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions

# TODO @matias.melograno Commented until we implement TelemetryV2
# assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some'
# assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1


def test_sampled_recorder(self, mocker):
impressions = [
Impression('k1', 'f1', 'on', 'l1', 123, None, None),
Impression('k1', 'f2', 'on', 'l1', 123, None, None)
]
redis = mocker.Mock(spec=RedisAdapter)
impmanager = mocker.Mock(spec=ImpressionsManager)
impmanager.process_impressions.return_value = impressions
event = mocker.Mock(spec=EventStorage)
impression = mocker.Mock(spec=ImpressionStorage)
recorder = PipelinedRecorder(redis, impmanager, event, impression, 0.5)

def put(x):
return

recorder._impression_storage.put.side_effect = put

assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions
for _ in range(100):
recorder.record_treatment_stats(impressions, 1, 'some')
print(recorder._impression_storage.put.call_count)
assert recorder._impression_storage.put.call_count < 80