From da8635944a0d699dfb04548afafcb142021e57ca Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 7 Dec 2021 17:18:50 -0300 Subject: [PATCH 1/2] removed old telemetry --- splitio/api/telemetry.py | 142 --------------- splitio/client/client.py | 2 +- splitio/client/factory.py | 27 +-- splitio/client/localhost.py | 34 +--- splitio/recorder/recorder.py | 19 +- splitio/storage/__init__.py | 91 ---------- splitio/storage/inmemmory.py | 106 +---------- splitio/storage/redis.py | 186 +------------------- splitio/sync/synchronizer.py | 22 +-- splitio/sync/telemetry.py | 52 ------ splitio/tasks/telemetry_sync.py | 38 ---- tests/api/test_telemetry.py | 118 ------------- tests/client/test_client.py | 43 +---- tests/client/test_factory.py | 48 +---- tests/client/test_input_validator.py | 8 +- tests/client/test_localhost.py | 34 ---- tests/helpers/mockserver.py | 3 +- tests/integration/test_client_e2e.py | 22 +-- tests/integration/test_redis_integration.py | 78 +------- tests/recorder/test_recorder.py | 13 +- tests/storage/test_inmemory_storage.py | 61 +------ tests/storage/test_redis.py | 74 +------- tests/sync/test_manager.py | 6 +- tests/sync/test_synchronizer.py | 30 ++-- tests/sync/test_telemetry_synchronizer.py | 53 ------ tests/tasks/test_telemetry_sync.py | 59 ------- 26 files changed, 58 insertions(+), 1311 deletions(-) delete mode 100644 splitio/api/telemetry.py delete mode 100644 splitio/sync/telemetry.py delete mode 100644 splitio/tasks/telemetry_sync.py delete mode 100644 tests/api/test_telemetry.py delete mode 100644 tests/sync/test_telemetry_synchronizer.py delete mode 100644 tests/tasks/test_telemetry_sync.py diff --git a/splitio/api/telemetry.py b/splitio/api/telemetry.py deleted file mode 100644 index 23ad4644..00000000 --- a/splitio/api/telemetry.py +++ /dev/null @@ -1,142 +0,0 @@ -"""Telemetry API Module.""" -import logging - -from splitio.api import APIException -from splitio.api.commons import headers_from_metadata -from splitio.api.client import HttpClientException - - -_LOGGER = logging.getLogger(__name__) - - -class TelemetryAPI(object): - """Class to handle telemetry submission to the backend.""" - - def __init__(self, client, apikey, sdk_metadata): - """ - Class constructor. - - :param client: HTTP Client responsble for issuing calls to the backend. - :type client: HttpClient - :param apikey: User apikey token. - :type apikey: string - :param sdk_metadata: SDK Version, IP & Machine name - :type sdk_metadata: splitio.client.util.SdkMetadata - """ - self._client = client - self._apikey = apikey - self._metadata = headers_from_metadata(sdk_metadata) - - @staticmethod - def _build_latencies(latencies): - """ - Build a latencies bulk as expected by the BE. - - :param latencies: Latencies to bundle. - :type latencies: dict - """ - return [ - {'name': name, 'latencies': latencies_list} - for name, latencies_list in latencies.items() - ] - - def flush_latencies(self, latencies): - """ - Submit latencies to the backend. - - :param latencies: List of latency buckets with their respective count. - :type latencies: list - """ - bulk = self._build_latencies(latencies) - try: - response = self._client.post( - 'events', - '/metrics/times', - self._apikey, - body=bulk, - extra_headers=self._metadata - ) - if not 200 <= response.status_code < 300: - raise APIException(response.body, response.status_code) - except HttpClientException as exc: - _LOGGER.error( - 'Error posting latencies because an exception was raised by the HTTPClient' - ) - _LOGGER.debug('Error: ', exc_info=True) - raise APIException('Latencies not flushed correctly.') from exc - - @staticmethod - def _build_gauges(gauges): - """ - Build a gauges bulk as expected by the BE. - - :param gauges: Gauges to bundle. - :type gauges: dict - """ - return [ - {'name': name, 'value': value} - for name, value in gauges.items() - ] - - def flush_gauges(self, gauges): - """ - Submit gauges to the backend. - - :param gauges: Gauges measured to be sent to the backend. - :type gauges: List - """ - bulk = self._build_gauges(gauges) - try: - response = self._client.post( - 'events', - '/metrics/gauge', - self._apikey, - body=bulk, - extra_headers=self._metadata - ) - if not 200 <= response.status_code < 300: - raise APIException(response.body, response.status_code) - except HttpClientException as exc: - _LOGGER.error( - 'Error posting gauges because an exception was raised by the HTTPClient' - ) - _LOGGER.debug('Error: ', exc_info=True) - raise APIException('Gauges not flushed correctly.') from exc - - @staticmethod - def _build_counters(counters): - """ - Build a counters bulk as expected by the BE. - - :param counters: Counters to bundle. - :type counters: dict - """ - return [ - {'name': name, 'delta': value} - for name, value in counters.items() - ] - - def flush_counters(self, counters): - """ - Submit counters to the backend. - - :param counters: Counters measured to be sent to the backend. - :type counters: List - """ - bulk = self._build_counters(counters) - try: - response = self._client.post( - 'events', - '/metrics/counters', - self._apikey, - body=bulk, - extra_headers=self._metadata - ) - if not 200 <= response.status_code < 300: - raise APIException(response.body, response.status_code) - except HttpClientException as exc: - _LOGGER.error( - 'Error posting counters because an exception was raised by the HTTPClient' - ) - _LOGGER.debug('Error: ', exc_info=True) - raise APIException('Counters not flushed correctly.') from exc diff --git a/splitio/client/client.py b/splitio/client/client.py index d2e8fa87..5fca7151 100644 --- a/splitio/client/client.py +++ b/splitio/client/client.py @@ -329,7 +329,7 @@ def _build_impression( # pylint: disable=too-many-arguments def _record_stats(self, impressions, start, operation): """ - Record impressions and metrics. + Record impressions. :param impressions: Generated impressions :type impressions: list[tuple[splitio.models.impression.Impression, dict]] diff --git a/splitio/client/factory.py b/splitio/client/factory.py index 3c2c7cc4..83f1004e 100644 --- a/splitio/client/factory.py +++ b/splitio/client/factory.py @@ -15,10 +15,10 @@ # Storage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ - InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage + InMemoryImpressionStorage, InMemoryEventStorage from splitio.storage.adapters import redis from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \ - RedisEventsStorage, RedisTelemetryStorage + RedisEventsStorage # APIs from splitio.api.client import HttpClient @@ -26,7 +26,6 @@ from splitio.api.segments import SegmentsAPI from splitio.api.impressions import ImpressionsAPI from splitio.api.events import EventsAPI -from splitio.api.telemetry import TelemetryAPI from splitio.api.auth import AuthAPI # Tasks @@ -34,7 +33,6 @@ from splitio.tasks.segment_sync import SegmentSynchronizationTask from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask from splitio.tasks.events_sync import EventsSyncTask -from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask # Synchronizer from splitio.sync.synchronizer import SplitTasks, SplitSynchronizers, Synchronizer, \ @@ -44,14 +42,12 @@ from splitio.sync.segment import SegmentSynchronizer from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer from splitio.sync.event import EventSynchronizer -from splitio.sync.telemetry import TelemetrySynchronizer # Recorder from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder # Localhost stuff -from splitio.client.localhost import LocalhostEventsStorage, LocalhostImpressionsStorage, \ - LocalhostTelemetryStorage +from splitio.client.localhost import LocalhostEventsStorage, LocalhostImpressionsStorage _LOGGER = logging.getLogger(__name__) @@ -259,7 +255,6 @@ def resume(self): sdk_ready_flag = threading.Event() self._sdk_internal_ready_flag = sdk_ready_flag self._sync_manager._ready_flag = sdk_ready_flag - self._get_storage('telemetry').clear() self._get_storage('impressions').clear() self._get_storage('events').clear() initialization_thread = threading.Thread( @@ -306,7 +301,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl 'segments': SegmentsAPI(http_client, api_key, sdk_metadata), 'impressions': ImpressionsAPI(http_client, api_key, sdk_metadata, cfg['impressionsMode']), 'events': EventsAPI(http_client, api_key, sdk_metadata), - 'telemetry': TelemetryAPI(http_client, api_key, sdk_metadata) } if not input_validator.validate_apikey_type(apis['segments']): @@ -317,7 +311,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl 'segments': InMemorySegmentStorage(), 'impressions': InMemoryImpressionStorage(cfg['impressionsQueueSize']), 'events': InMemoryEventStorage(cfg['eventsQueueSize']), - 'telemetry': InMemoryTelemetryStorage() } imp_manager = ImpressionsManager( @@ -331,7 +324,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl ImpressionSynchronizer(apis['impressions'], storages['impressions'], cfg['impressionsBulkSize']), EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']), - TelemetrySynchronizer(apis['telemetry'], storages['telemetry']), ImpressionsCountSynchronizer(apis['impressions'], imp_manager), ) @@ -349,10 +341,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl cfg['impressionsRefreshRate'], ), EventsSyncTask(synchronizers.events_sync.synchronize_events, cfg['eventsPushRate']), - TelemetrySynchronizationTask( - synchronizers.telemetry_sync.synchronize_telemetry, - cfg['metricsRefreshRate'], - ), ImpressionsCountSyncTask(synchronizers.impressions_count_sync.synchronize_counters) ) @@ -369,7 +357,6 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl recorder = StandardRecorder( imp_manager, - storages['telemetry'], storages['events'], storages['impressions'], ) @@ -399,13 +386,11 @@ def _build_redis_factory(api_key, cfg): 'segments': RedisSegmentStorage(redis_adapter), 'impressions': RedisImpressionsStorage(redis_adapter, sdk_metadata), 'events': RedisEventsStorage(redis_adapter, sdk_metadata), - 'telemetry': RedisTelemetryStorage(redis_adapter, sdk_metadata) } recorder = PipelinedRecorder( redis_adapter.pipeline, ImpressionsManager(cfg['impressionsMode'], False, _wrap_impression_listener(cfg['impressionListener'], sdk_metadata)), - storages['telemetry'], storages['events'], storages['impressions'], ) @@ -424,19 +409,18 @@ def _build_localhost_factory(cfg): 'segments': InMemorySegmentStorage(), # not used, just to avoid possible future errors. 'impressions': LocalhostImpressionsStorage(), 'events': LocalhostEventsStorage(), - 'telemetry': LocalhostTelemetryStorage() } synchronizers = SplitSynchronizers( LocalSplitSynchronizer(cfg['splitFile'], storages['splits']), - None, None, None, None, None, + None, None, None, None, ) tasks = SplitTasks( SplitSynchronizationTask( synchronizers.split_sync.synchronize_splits, cfg['featuresRefreshRate'], - ), None, None, None, None, None, + ), None, None, None, None, ) sdk_metadata = util.get_metadata(cfg) @@ -446,7 +430,6 @@ def _build_localhost_factory(cfg): manager.start() recorder = StandardRecorder( ImpressionsManager(cfg['impressionsMode'], True, None), - storages['telemetry'], storages['events'], storages['impressions'], ) diff --git a/splitio/client/localhost.py b/splitio/client/localhost.py index 99dba5fd..dec597a9 100644 --- a/splitio/client/localhost.py +++ b/splitio/client/localhost.py @@ -2,7 +2,7 @@ import logging import re -from splitio.storage import ImpressionStorage, EventStorage, TelemetryStorage +from splitio.storage import ImpressionStorage, EventStorage _LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$') _LEGACY_DEFINITION_LINE_RE = re.compile(r'^(?[\w_-]+)\s+(?P[\w_-]+)$') @@ -41,35 +41,3 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ def clear(self, *_, **__): # pylint: disable=arguments-differ """Accept any arguments and do nothing.""" pass - - -class LocalhostTelemetryStorage(TelemetryStorage): - """Impression storage that doesn't cache anything.""" - - def inc_latency(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def inc_counter(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def put_gauge(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def pop_latencies(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def pop_counters(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def pop_gauges(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass - - def clear(self, *_, **__): # pylint: disable=arguments-differ - """Accept any arguments and do nothing.""" - pass diff --git a/splitio/recorder/recorder.py b/splitio/recorder/recorder.py index c009e1eb..cbb28f14 100644 --- a/splitio/recorder/recorder.py +++ b/splitio/recorder/recorder.py @@ -37,21 +37,18 @@ def record_track_stats(self, events): class StandardRecorder(StatsRecorder): """StandardRecorder class.""" - def __init__(self, impressions_manager, telemetry_storage, event_storage, impression_storage): + def __init__(self, impressions_manager, event_storage, impression_storage): """ Class constructor. :param impressions_manager: impression manager instance :type impressions_manager: splitio.engine.impressions.Manager - :param telemetry_storage: telemetry storage instance - :type telemetry_storage: splitio.storage.TelemetryStorage :param event_storage: event storage instance :type event_storage: splitio.storage.EventStorage :param impression_storage: impression storage instance :type impression_storage: splitio.storage.ImpressionStorage """ self._impressions_manager = impressions_manager - self._telemetry_storage = telemetry_storage self._event_sotrage = event_storage self._impression_storage = impression_storage @@ -68,10 +65,9 @@ def record_treatment_stats(self, impressions, latency, operation): """ try: impressions = self._impressions_manager.process_impressions(impressions) - if self._impression_storage.put(impressions): - self._telemetry_storage.inc_latency(operation, latency) + self._impression_storage.put(impressions) except Exception: # pylint: disable=broad-except - _LOGGER.error('Error recording impressions and metrics') + _LOGGER.error('Error recording impressions') _LOGGER.debug('Error: ', exc_info=True) def record_track_stats(self, event): @@ -87,7 +83,7 @@ def record_track_stats(self, event): class PipelinedRecorder(StatsRecorder): """PipelinedRecorder class.""" - def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, impression_storage): + def __init__(self, pipe, impressions_manager, event_storage, impression_storage): """ Class constructor. @@ -95,8 +91,6 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, :type pipe: callable :param impressions_manager: impression manager instance :type impressions_manager: splitio.engine.impressions.Manager - :param telemetry_storage: telemetry storage instance - :type telemetry_storage: splitio.storage.redis.RedisTelemetryStorage :param event_storage: event storage instance :type event_storage: splitio.storage.EventStorage :param impression_storage: impression storage instance @@ -104,7 +98,6 @@ def __init__(self, pipe, impressions_manager, telemetry_storage, event_storage, """ self._make_pipe = pipe self._impressions_manager = impressions_manager - self._telemetry_storage = telemetry_storage self._event_sotrage = event_storage self._impression_storage = impression_storage @@ -123,12 +116,12 @@ def record_treatment_stats(self, impressions, latency, operation): impressions = self._impressions_manager.process_impressions(impressions) pipe = self._make_pipe() self._impression_storage.add_impressions_to_pipe(impressions, pipe) - self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe) + # self._telemetry_storage.add_latency_to_pipe(operation, latency, pipe) result = pipe.execute() if len(result) == 2: self._impression_storage.expire_key(result[0], len(impressions)) except Exception: # pylint: disable=broad-except - _LOGGER.error('Error recording impressions and metrics') + _LOGGER.error('Error recording impressions') _LOGGER.debug('Error: ', exc_info=True) def record_track_stats(self, event): diff --git a/splitio/storage/__init__.py b/splitio/storage/__init__.py index 8d5a04a6..23ccda31 100644 --- a/splitio/storage/__init__.py +++ b/splitio/storage/__init__.py @@ -283,94 +283,3 @@ def clear(self): Clear data. """ pass - - -class TelemetryStorage(object, metaclass=abc.ABCMeta): - """Telemetry storage interface.""" - - @abc.abstractmethod - def inc_latency(self, name, bucket): - """ - Add a latency. - - :param name: Name of the latency metric. - :type name: str - :param value: Value of the latency metric. - :tyoe value: int - """ - pass - - @abc.abstractmethod - def inc_counter(self, name): - """ - Increment a counter. - - :param name: Name of the counter metric. - :type name: str - """ - pass - - @abc.abstractmethod - def put_gauge(self, name, value): - """ - Add a gauge metric. - - :param name: Name of the gauge metric. - :type name: str - :param value: Value of the gauge metric. - :type value: int - """ - pass - - @abc.abstractmethod - def pop_counters(self): - """ - Get all the counters. - - :rtype: list - """ - pass - - @abc.abstractmethod - def pop_gauges(self): - """ - Get all the gauges. - - :rtype: list - - """ - pass - - @abc.abstractmethod - def pop_latencies(self): - """ - Get all latencies. - - :rtype: list - """ - pass - - @abc.abstractmethod - def clear(self): - """ - Clear data. - """ - pass - - -class TelemetryPipelinedStorage(object, metaclass=abc.ABCMeta): - """Telemetry Pipelined Storage interface.""" - - @abc.abstractmethod - def add_latency_to_pipe(self, latency, operation, pipe): - """ - Add latency operation to pipeline - - :param latency: time took for doing evaluation - :type latency: int - :param operation: operation type - :type operation: str - :param pipe: Redis pipe. - :type pipe: redis.pipe - """ - pass diff --git a/splitio/storage/inmemmory.py b/splitio/storage/inmemmory.py index 4e57e2d7..ab0b5176 100644 --- a/splitio/storage/inmemmory.py +++ b/splitio/storage/inmemmory.py @@ -5,8 +5,7 @@ from collections import Counter from splitio.models.segments import Segment -from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \ - TelemetryStorage +from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage MAX_SIZE_BYTES = 5 * 1024 * 1024 @@ -420,106 +419,3 @@ def clear(self): """ with self._lock: self._events = queue.Queue(maxsize=self._queue_size) - - -class InMemoryTelemetryStorage(TelemetryStorage): - """In-Memory implementation of telemetry storage interface.""" - - def __init__(self): - """Constructor.""" - self._latencies = {} - self._gauges = {} - self._counters = {} - self._latencies_lock = threading.Lock() - self._gauges_lock = threading.Lock() - self._counters_lock = threading.Lock() - - def inc_latency(self, name, bucket): - """ - Add a latency. - - :param name: Name of the latency metric. - :type name: str - :param value: Value of the latency metric. - :tyoe value: int - """ - if not 0 <= bucket <= 21: - _LOGGER.warning('Incorect bucket "%d" for latency "%s". Ignoring.', bucket, name) - return - - with self._latencies_lock: - latencies = self._latencies.get(name, [0] * 22) - latencies[bucket] += 1 - self._latencies[name] = latencies - - def inc_counter(self, name): - """ - Increment a counter. - - :param name: Name of the counter metric. - :type name: str - """ - with self._counters_lock: - counter = self._counters.get(name, 0) - counter += 1 - self._counters[name] = counter - - def put_gauge(self, name, value): - """ - Add a gauge metric. - - :param name: Name of the gauge metric. - :type name: str - :param value: Value of the gauge metric. - :type value: int - """ - with self._gauges_lock: - self._gauges[name] = value - - def pop_counters(self): - """ - Get all the counters. - - :rtype: list - """ - with self._counters_lock: - try: - return self._counters - finally: - self._counters = {} - - def pop_gauges(self): - """ - Get all the gauges. - - :rtype: list - - """ - with self._gauges_lock: - try: - return self._gauges - finally: - self._gauges = {} - - def pop_latencies(self): - """ - Get all latencies. - - :rtype: list - """ - with self._latencies_lock: - try: - return self._latencies - finally: - self._latencies = {} - - def clear(self): - """ - Clear data. - """ - with self._latencies_lock: - self._latencies = {} - with self._gauges_lock: - self._gauges = {} - with self._counters_lock: - self._counters = {} diff --git a/splitio/storage/redis.py b/splitio/storage/redis.py index c747c1f3..cdc79b29 100644 --- a/splitio/storage/redis.py +++ b/splitio/storage/redis.py @@ -5,7 +5,7 @@ from splitio.models.impressions import Impression from splitio.models import splits, segments from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \ - ImpressionPipelinedStorage, TelemetryStorage, TelemetryPipelinedStorage + ImpressionPipelinedStorage from splitio.storage.adapters.redis import RedisAdapterException from splitio.storage.adapters.cache_trait import decorate as add_cache, DEFAULT_MAX_AGE @@ -525,187 +525,3 @@ def clear(self): Clear data. """ raise NotImplementedError('Not supported for redis.') - - -class RedisTelemetryStorage(TelemetryStorage, TelemetryPipelinedStorage): - """Redis-based Telemetry storage.""" - - _LATENCY_KEY_TEMPLATE = "SPLITIO/{sdk}/{instance}/latency.{name}.bucket.{bucket}" - _COUNTER_KEY_TEMPLATE = "SPLITIO/{sdk}/{instance}/count.{name}" - _GAUGE_KEY_TEMPLATE = "SPLITIO/{sdk}/{instance}/gauge.{name}" - - def __init__(self, redis_client, sdk_metadata): - """ - Class constructor. - - :param redis_client: Redis client or compliant interface. - :type redis_client: splitio.storage.adapters.redis.RedisAdapter - :param sdk_metadata: SDK & Machine information. - :type sdk_metadata: splitio.client.util.SdkMetadata - """ - self._redis = redis_client - self._metadata = sdk_metadata - - def _get_latency_key(self, name, bucket): - """ - Instantiate and return the latency key template. - - :param name: Name of the latency metric. - :type name: str - :param bucket: Number of bucket. - :type bucket: int - - :return: Redis latency key. - :rtype: str - """ - return self._LATENCY_KEY_TEMPLATE.format( - sdk=self._metadata.sdk_version, - instance=self._metadata.instance_name, - name=name, - bucket=bucket - ) - - def _get_counter_key(self, name): - """ - Instantiate and return the counter key template. - - :param name: Name of the counter metric. - :type name: str - - :return: Redis counter key. - :rtype: str - """ - return self._COUNTER_KEY_TEMPLATE.format( - sdk=self._metadata.sdk_version, - instance=self._metadata.instance_name, - name=name - ) - - def _get_gauge_key(self, name): - """ - Instantiate and return the latency key template. - - :param name: Name of the latency metric. - :type name: str - - :return: Redis latency key. - :rtype: str - """ - return self._GAUGE_KEY_TEMPLATE.format( - sdk=self._metadata.sdk_version, - instance=self._metadata.instance_name, - name=name, - ) - - def _wrap_latency(self, name, bucket): - """ - Wrap latency to be stored. - - :param name: Name of the latency metric. - :type name: str - :param value: Value of the latency metric. - :tyoe value: int - - :return: Redis latency key. - :rtype: str - """ - if not 0 <= bucket <= 21: - _LOGGER.error('Incorect bucket "%d" for latency "%s". Ignoring.', bucket, name) - return None - - return self._get_latency_key(name, bucket) - - def add_latency_to_pipe(self, name, bucket, pipe): - """ - Add latency operation to pipeline - - :param name: Name of the latency metric. - :type name: str - :param value: Value of the latency metric. - :tyoe value: int - :param pipe: Redis pipe. - :type pipe: redis.pipe - """ - key = self._wrap_latency(name, bucket) - if key is None: - return - pipe.incr(key) - - def inc_latency(self, name, bucket): - """ - Add a latency. - - :param name: Name of the latency metric. - :type name: str - :param value: Value of the latency metric. - :tyoe value: int - """ - key = self._wrap_latency(name, bucket) - if key is None: - return - try: - self._redis.incr(key) - except RedisAdapterException: - _LOGGER.error('Something went wrong when trying to store latency in redis') - _LOGGER.debug('Error: ', exc_info=True) - - def inc_counter(self, name): - """ - Increment a counter. - - :param name: Name of the counter metric. - :type name: str - """ - key = self._get_counter_key(name) - try: - self._redis.incr(key) - except RedisAdapterException: - _LOGGER.error('Something went wrong when trying to increment counter in redis') - _LOGGER.debug('Error: ', exc_info=True) - - def put_gauge(self, name, value): - """ - Add a gauge metric. - - :param name: Name of the gauge metric. - :type name: str - :param value: Value of the gauge metric. - :type value: int - """ - key = self._get_gauge_key(name) - try: - self._redis.set(key, value) - except RedisAdapterException: - _LOGGER.error('Something went wrong when trying to set gauge in redis') - _LOGGER.debug('Error: ', exc_info=True) - - def pop_counters(self): - """ - Get all the counters. - - :rtype: list - """ - raise NotImplementedError('Only redis-consumer mode is supported.') - - def pop_gauges(self): - """ - Get all the gauges. - - :rtype: list - - """ - raise NotImplementedError('Only redis-consumer mode is supported.') - - def pop_latencies(self): - """ - Get all latencies. - - :rtype: list - """ - raise NotImplementedError('Only redis-consumer mode is supported.') - - def clear(self): - """ - Clear data. - """ - raise NotImplementedError('Not supported for redis.') diff --git a/splitio/sync/synchronizer.py b/splitio/sync/synchronizer.py index 0bfeb0ca..8c4fe13c 100644 --- a/splitio/sync/synchronizer.py +++ b/splitio/sync/synchronizer.py @@ -13,7 +13,7 @@ class SplitSynchronizers(object): """SplitSynchronizers.""" - def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, telemetry_sync, # pylint:disable=too-many-arguments + def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, # pylint:disable=too-many-arguments impressions_count_sync): """ Class constructor. @@ -26,8 +26,6 @@ def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, tele :type impressions_sync: splitio.sync.impression.ImpressionSynchronizer :param events_sync: sync for events :type events_sync: splitio.sync.event.EventSynchronizer - :param telemetry_sync: sync for telemetry - :type telemetry_sync: splitio.sync.telemetry.TelemetrySynchronizer :param impressions_count_sync: sync for impression_counts :type impressions_count_sync: splitio.sync.impression.ImpressionsCountSynchronizer """ @@ -35,7 +33,6 @@ def __init__(self, split_sync, segment_sync, impressions_sync, events_sync, tele self._segment_sync = segment_sync self._impressions_sync = impressions_sync self._events_sync = events_sync - self._telemetry_sync = telemetry_sync self._impressions_count_sync = impressions_count_sync @property @@ -58,11 +55,6 @@ def events_sync(self): """Return events synchonizer.""" return self._events_sync - @property - def telemetry_sync(self): - """Return telemetry synchonizer.""" - return self._telemetry_sync - @property def impressions_count_sync(self): """Return impressions count synchonizer.""" @@ -72,7 +64,7 @@ def impressions_count_sync(self): class SplitTasks(object): """SplitTasks.""" - def __init__(self, split_task, segment_task, impressions_task, events_task, telemetry_task, # pylint:disable=too-many-arguments + def __init__(self, split_task, segment_task, impressions_task, events_task, # pylint:disable=too-many-arguments impressions_count_task): """ Class constructor. @@ -85,8 +77,6 @@ def __init__(self, split_task, segment_task, impressions_task, events_task, tele :type impressions_task: splitio.tasks.impressions_sync.ImpressionsSyncTask :param events_task: sync for events :type events_task: splitio.tasks.events_sync.EventsSyncTask - :param telemetry_task: sync for telemetry - :type telemetry_task: splitio.tasks.telemetry_sync.TelemetrySynchronizationTask :param impressions_count_task: sync for impression_counts :type impressions_count_task: splitio.tasks.impressions_sync.ImpressionsCountSyncTask """ @@ -94,7 +84,6 @@ def __init__(self, split_task, segment_task, impressions_task, events_task, tele self._segment_task = segment_task self._impressions_task = impressions_task self._events_task = events_task - self._telemetry_task = telemetry_task self._impressions_count_task = impressions_count_task @property @@ -117,11 +106,6 @@ def events_task(self): """Return events sync task.""" return self._events_task - @property - def telemetry_task(self): - """Return telemetry sync task.""" - return self._telemetry_task - @property def impressions_count_task(self): """Return impressions count sync task.""" @@ -307,7 +291,6 @@ def start_periodic_data_recording(self): _LOGGER.debug('Starting periodic data recording') self._split_tasks.impressions_task.start() self._split_tasks.events_task.start() - self._split_tasks.telemetry_task.start() self._split_tasks.impressions_count_task.start() def stop_periodic_data_recording(self, blocking): @@ -332,7 +315,6 @@ def stop_periodic_data_recording(self, blocking): self._split_tasks.impressions_task.stop() self._split_tasks.events_task.stop() self._split_tasks.impressions_count_task.stop() - self._split_tasks.telemetry_task.stop() def kill_split(self, split_name, default_treatment, change_number): """ diff --git a/splitio/sync/telemetry.py b/splitio/sync/telemetry.py deleted file mode 100644 index f0e48613..00000000 --- a/splitio/sync/telemetry.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging - -from splitio.api import APIException - - -_LOGGER = logging.getLogger(__name__) - - -class TelemetrySynchronizer(object): - def __init__(self, api, storage): - """ - Class constructor. - - :param api: Telemetry API Client. - :type api: splitio.api.telemetry.TelemetryAPI - :param storage: Telemetry Storage. - :type storage: splitio.storage.InMemoryTelemetryStorage - - """ - self._api = api - self._storage = storage - - def synchronize_telemetry(self): - """ - Send latencies, counters and gauges to split BE. - - :return: True if synchronization is complete. - :rtype: bool - """ - try: - latencies = self._storage.pop_latencies() - if latencies: - self._api.flush_latencies(latencies) - except APIException: - _LOGGER.error('Failed send telemetry/latencies to split BE.') - _LOGGER.debug('Exception information: ', exc_info=True) - - try: - counters = self._storage.pop_counters() - if counters: - self._api.flush_counters(counters) - except APIException: - _LOGGER.error('Failed send telemetry/counters to split BE.') - _LOGGER.debug('Exception information: ', exc_info=True) - - try: - gauges = self._storage.pop_gauges() - if gauges: - self._api.flush_gauges(gauges) - except APIException: - _LOGGER.error('Failed send telemetry/gauges to split BE.') - _LOGGER.debug('Exception information: ', exc_info=True) diff --git a/splitio/tasks/telemetry_sync.py b/splitio/tasks/telemetry_sync.py deleted file mode 100644 index b17bc2ad..00000000 --- a/splitio/tasks/telemetry_sync.py +++ /dev/null @@ -1,38 +0,0 @@ -"""Split Synchronization task.""" - -from splitio.tasks import BaseSynchronizationTask -from splitio.tasks.util.asynctask import AsyncTask - - -class TelemetrySynchronizationTask(BaseSynchronizationTask): - """Split Synchronization task class.""" - - def __init__(self, synchronize_telemetry, period): - """ - Class constructor. - - :param synchronize_telemetry: handler. - :type synchronize_telemetry: splitio.api.telemetry.TelemetryAPI - :param period: Period of task - :type period: int - - """ - self._period = period - self._task = AsyncTask(synchronize_telemetry, period) - - def start(self): - """Start the task.""" - self._task.start() - - def stop(self, event=None): - """Stop the task. Accept an optional event to set when the task has finished.""" - self._task.stop(event) - - def is_running(self): - """ - Return whether the task is running. - - :return: True if the task is running. False otherwise. - :rtype bool - """ - return self._task.running() diff --git a/tests/api/test_telemetry.py b/tests/api/test_telemetry.py deleted file mode 100644 index abec2d50..00000000 --- a/tests/api/test_telemetry.py +++ /dev/null @@ -1,118 +0,0 @@ -"""Telemetry API tests module.""" - -import pytest -from splitio.api import telemetry, client, APIException -from splitio.client.util import SdkMetadata - - -class EventsAPITests(object): - """Impressions API test cases.""" - - def test_post_latencies(self, mocker): - """Test impressions posting API call.""" - httpclient = mocker.Mock(spec=client.HttpClient) - httpclient.post.return_value = client.HttpResponse(200, '') - sdk_metadata = SdkMetadata('python-1.2.3', 'some_machine_name', '123.123.123.123') - telemetry_api = telemetry.TelemetryAPI(httpclient, 'some_api_key', sdk_metadata) - response = telemetry_api.flush_latencies({ - 'l1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] - }) - - call_made = httpclient.post.mock_calls[0] - - # validate positional arguments - assert call_made[1] == ('events', '/metrics/times', 'some_api_key') - - # validate key-value args (headers) - assert call_made[2]['extra_headers'] == { - 'SplitSDKVersion': 'python-1.2.3', - 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' - } - - # validate key-value args (body) - assert call_made[2]['body'] == [{ - 'name': 'l1', - 'latencies': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] - }] - - httpclient.reset_mock() - def raise_exception(*args, **kwargs): - raise client.HttpClientException('some_message') - httpclient.post.side_effect = raise_exception - with pytest.raises(APIException) as exc_info: - response = telemetry_api.flush_latencies({ - 'l1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22] - }) - assert exc_info.type == APIException - assert exc_info.value.message == 'some_message' - - def test_post_counters(self, mocker): - """Test impressions posting API call.""" - httpclient = mocker.Mock(spec=client.HttpClient) - httpclient.post.return_value = client.HttpResponse(200, '') - sdk_metadata = SdkMetadata('python-1.2.3', 'some_machine_name', '123.123.123.123') - telemetry_api = telemetry.TelemetryAPI(httpclient, 'some_api_key', sdk_metadata) - response = telemetry_api.flush_counters({'counter1': 1, 'counter2': 2}) - - call_made = httpclient.post.mock_calls[0] - - # validate positional arguments - assert call_made[1] == ('events', '/metrics/counters', 'some_api_key') - - # validate key-value args (headers) - assert call_made[2]['extra_headers'] == { - 'SplitSDKVersion': 'python-1.2.3', - 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' - } - - # validate key-value args (body) - assert call_made[2]['body'] == [ - {'name': 'counter1', 'delta': 1}, - {'name': 'counter2', 'delta': 2} - ] - - httpclient.reset_mock() - def raise_exception(*args, **kwargs): - raise client.HttpClientException('some_message') - httpclient.post.side_effect = raise_exception - with pytest.raises(APIException) as exc_info: - response = telemetry_api.flush_counters({'counter1': 1, 'counter2': 2}) - assert exc_info.type == APIException - assert exc_info.value.message == 'some_message' - - def test_post_gauge(self, mocker): - """Test impressions posting API call.""" - httpclient = mocker.Mock(spec=client.HttpClient) - httpclient.post.return_value = client.HttpResponse(200, '') - sdk_metadata = SdkMetadata('python-1.2.3', 'some_machine_name', '123.123.123.123') - telemetry_api = telemetry.TelemetryAPI(httpclient, 'some_api_key', sdk_metadata) - response = telemetry_api.flush_gauges({'gauge1': 1, 'gauge2': 2}) - - call_made = httpclient.post.mock_calls[0] - - # validate positional arguments - assert call_made[1] == ('events', '/metrics/gauge', 'some_api_key') - - # validate key-value args (headers) - assert call_made[2]['extra_headers'] == { - 'SplitSDKVersion': 'python-1.2.3', - 'SplitSDKMachineIP': '123.123.123.123', - 'SplitSDKMachineName': 'some_machine_name' - } - - # validate key-value args (body) - assert call_made[2]['body'] == [ - {'name': 'gauge1', 'value': 1}, - {'name': 'gauge2', 'value': 2} - ] - - httpclient.reset_mock() - def raise_exception(*args, **kwargs): - raise client.HttpClientException('some_message') - httpclient.post.side_effect = raise_exception - with pytest.raises(APIException) as exc_info: - response = telemetry_api.flush_gauges({'gauge1': 1, 'gauge2': 2}) - assert exc_info.type == APIException - assert exc_info.value.message == 'some_message' diff --git a/tests/client/test_client.py b/tests/client/test_client.py index b4592d22..057a9ddc 100644 --- a/tests/client/test_client.py +++ b/tests/client/test_client.py @@ -8,10 +8,9 @@ from splitio.engine.evaluator import Evaluator from splitio.models.impressions import Impression, Label from splitio.models.events import Event, EventWrapper -from splitio.storage import EventStorage, ImpressionStorage, SegmentStorage, SplitStorage, \ - TelemetryStorage +from splitio.storage import EventStorage, ImpressionStorage, SegmentStorage, SplitStorage from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ - InMemoryImpressionStorage, InMemoryTelemetryStorage, InMemoryEventStorage + InMemoryImpressionStorage, InMemoryEventStorage from splitio.models import splits, segments from splitio.engine.impressions import Manager as ImpressionManager @@ -28,7 +27,6 @@ def test_get_treatment(self, mocker): segment_storage = mocker.Mock(spec=SegmentStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -36,7 +34,6 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] destroyed_property = mocker.PropertyMock() @@ -51,8 +48,7 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { @@ -69,7 +65,6 @@ def _get_storage_mock(name): assert mocker.call( [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] ) in impmanager.process_impressions.mock_calls - assert mocker.call('sdk.getTreatment', 5) in telemetry_storage.inc_latency.mock_calls assert _logger.mock_calls == [] # Test with client not ready @@ -93,7 +88,6 @@ def _raise(*_): assert mocker.call( [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] ) in impmanager.process_impressions.mock_calls - assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatment_with_config(self, mocker): """Test get_treatment execution paths.""" @@ -101,7 +95,6 @@ def test_get_treatment_with_config(self, mocker): segment_storage = mocker.Mock(spec=SegmentStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -109,7 +102,6 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] destroyed_property = mocker.PropertyMock() @@ -124,8 +116,7 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) client._evaluator = mocker.Mock(spec=Evaluator) client._evaluator.evaluate_feature.return_value = { @@ -146,7 +137,6 @@ def _get_storage_mock(name): assert mocker.call( [(Impression('some_key', 'some_feature', 'on', 'some_label', 123, None, 1000), None)] ) in impmanager.process_impressions.mock_calls - assert mocker.call('sdk.getTreatmentWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert _logger.mock_calls == [] # Test with client not ready @@ -171,7 +161,6 @@ def _raise(*_): assert mocker.call( [(Impression('some_key', 'some_feature', 'control', 'exception', -1, None, 1000), None)] ) in impmanager.process_impressions.mock_calls - assert len(telemetry_storage.inc_latency.mock_calls) == 3 def test_get_treatments(self, mocker): """Test get_treatment execution paths.""" @@ -179,7 +168,6 @@ def test_get_treatments(self, mocker): segment_storage = mocker.Mock(spec=SegmentStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -187,7 +175,6 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] destroyed_property = mocker.PropertyMock() @@ -202,8 +189,7 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { @@ -225,7 +211,6 @@ def _get_storage_mock(name): impressions_called = impmanager.process_impressions.mock_calls[0][1][0] assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called - assert mocker.call('sdk.getTreatments', 5) in telemetry_storage.inc_latency.mock_calls assert _logger.mock_calls == [] # Test with client not ready @@ -246,7 +231,6 @@ def _raise(*_): raise Exception('something') client._evaluator.evaluate_features.side_effect = _raise assert client.get_treatments('key', ['f1', 'f2']) == {'f1': 'control', 'f2': 'control'} - assert len(telemetry_storage.inc_latency.mock_calls) == 2 def test_get_treatments_with_config(self, mocker): """Test get_treatment execution paths.""" @@ -254,7 +238,6 @@ def test_get_treatments_with_config(self, mocker): segment_storage = mocker.Mock(spec=SegmentStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -262,7 +245,6 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] destroyed_property = mocker.PropertyMock() @@ -277,8 +259,7 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.get_latency_bucket_index', new=lambda x: 5) impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) client._evaluator = mocker.Mock(spec=Evaluator) evaluation = { @@ -302,7 +283,6 @@ def _get_storage_mock(name): impressions_called = impmanager.process_impressions.mock_calls[0][1][0] assert (Impression('key', 'f1', 'on', 'some_label', 123, None, 1000), None) in impressions_called assert (Impression('key', 'f2', 'on', 'some_label', 123, None, 1000), None) in impressions_called - assert mocker.call('sdk.getTreatmentsWithConfig', 5) in telemetry_storage.inc_latency.mock_calls assert _logger.mock_calls == [] # Test with client not ready @@ -326,7 +306,6 @@ def _raise(*_): 'f1': ('control', None), 'f2': ('control', None) } - assert len(telemetry_storage.inc_latency.mock_calls) == 2 def test_destroy(self, mocker): """Test that destroy/destroyed calls are forwarded to the factory.""" @@ -334,7 +313,6 @@ def test_destroy(self, mocker): segment_storage = mocker.Mock(spec=SegmentStorage) impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -342,15 +320,13 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] factory = mocker.Mock(spec=SplitFactory) destroyed_mock = mocker.PropertyMock() type(factory).destroyed = destroyed_mock impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) client.destroy() assert factory.destroy.mock_calls == [mocker.call()] @@ -364,7 +340,6 @@ def test_track(self, mocker): impression_storage = mocker.Mock(spec=ImpressionStorage) event_storage = mocker.Mock(spec=EventStorage) event_storage.put.return_value = True - telemetry_storage = mocker.Mock(spec=TelemetryStorage) def _get_storage_mock(name): return { @@ -372,7 +347,6 @@ def _get_storage_mock(name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] factory = mocker.Mock(spec=SplitFactory) factory._get_storage = _get_storage_mock @@ -384,8 +358,7 @@ def _get_storage_mock(name): mocker.patch('splitio.client.client.utctime_ms', new=lambda: 1000) impmanager = mocker.Mock(spec=ImpressionManager) - recorder = StandardRecorder(impmanager, telemetry_storage, event_storage, - impression_storage) + recorder = StandardRecorder(impmanager, event_storage, impression_storage) client = Client(factory, recorder, True) assert client.track('key', 'user', 'purchase', 12) is True assert mocker.call([ diff --git a/tests/client/test_factory.py b/tests/client/test_factory.py index 854c184e..065584e8 100644 --- a/tests/client/test_factory.py +++ b/tests/client/test_factory.py @@ -9,13 +9,12 @@ _LOGGER as _logger from splitio.client.config import DEFAULT_CONFIG from splitio.storage import redis, inmemmory -from splitio.tasks import events_sync, impressions_sync, split_sync, segment_sync, telemetry_sync +from splitio.tasks import events_sync, impressions_sync, split_sync, segment_sync from splitio.tasks.util import asynctask from splitio.api.splits import SplitsAPI from splitio.api.segments import SegmentsAPI from splitio.api.impressions import ImpressionsAPI from splitio.api.events import EventsAPI -from splitio.api.telemetry import TelemetryAPI from splitio.engine.impressions import Manager as ImpressionsManager from splitio.sync.manager import Manager from splitio.sync.synchronizer import Synchronizer, SplitSynchronizers, SplitTasks @@ -48,13 +47,11 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk assert factory._storages['impressions']._impressions.maxsize == 10000 assert isinstance(factory._storages['events'], inmemmory.InMemoryEventStorage) assert factory._storages['events']._events.maxsize == 10000 - assert isinstance(factory._storages['telemetry'], inmemmory.InMemoryTelemetryStorage) assert isinstance(factory._sync_manager, Manager) assert isinstance(factory._recorder, StandardRecorder) assert isinstance(factory._recorder._impressions_manager, ImpressionsManager) - assert isinstance(factory._recorder._telemetry_storage, inmemmory.TelemetryStorage) assert isinstance(factory._recorder._event_sotrage, inmemmory.EventStorage) assert isinstance(factory._recorder._impression_storage, inmemmory.ImpressionStorage) @@ -97,7 +94,6 @@ def test_redis_client_creation(self, mocker): assert isinstance(factory._get_storage('segments'), redis.RedisSegmentStorage) assert isinstance(factory._get_storage('impressions'), redis.RedisImpressionsStorage) assert isinstance(factory._get_storage('events'), redis.RedisEventsStorage) - assert isinstance(factory._get_storage('telemetry'), redis.RedisTelemetryStorage) assert factory._sync_manager is None @@ -105,7 +101,6 @@ def test_redis_client_creation(self, mocker): assert adapter == factory._get_storage('segments')._redis assert adapter == factory._get_storage('impressions')._redis assert adapter == factory._get_storage('events')._redis - assert adapter == factory._get_storage('telemetry')._redis assert strict_redis_mock.mock_calls == [mocker.call( host='some_host', @@ -134,7 +129,6 @@ def test_redis_client_creation(self, mocker): assert isinstance(factory._recorder, PipelinedRecorder) assert isinstance(factory._recorder._impressions_manager, ImpressionsManager) assert isinstance(factory._recorder._make_pipe(), RedisPipelineAdapter) - assert isinstance(factory._recorder._telemetry_storage, redis.RedisTelemetryStorage) assert isinstance(factory._recorder._event_sotrage, redis.RedisEventsStorage) assert isinstance(factory._recorder._impression_storage, redis.RedisImpressionsStorage) factory.block_until_ready() @@ -150,13 +144,11 @@ def test_uwsgi_forked_client_creation(self): assert factory._storages['impressions']._impressions.maxsize == 10000 assert isinstance(factory._storages['events'], inmemmory.InMemoryEventStorage) assert factory._storages['events']._events.maxsize == 10000 - assert isinstance(factory._storages['telemetry'], inmemmory.InMemoryTelemetryStorage) assert isinstance(factory._sync_manager, Manager) assert isinstance(factory._recorder, StandardRecorder) assert isinstance(factory._recorder._impressions_manager, ImpressionsManager) - assert isinstance(factory._recorder._telemetry_storage, inmemmory.TelemetryStorage) assert isinstance(factory._recorder._event_sotrage, inmemmory.EventStorage) assert isinstance(factory._recorder._impression_storage, inmemmory.ImpressionStorage) @@ -204,15 +196,6 @@ def _event_task_init_mock(self, synchronize_events, period): self._task = evt_async_task_mock mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock) - telemetry_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - telemetry_async_task_mock.stop.side_effect = stop_mock - - def _telemetry_task_init_mock(self, synchronize_counters, period): - self._period = period - self._task = telemetry_async_task_mock - mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__', - new=_telemetry_task_init_mock) - imp_count_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) imp_count_async_task_mock.stop.side_effect = stop_mock @@ -226,10 +209,9 @@ def _imppression_count_task_init_mock(self, synchronize_counters): segment_sync = mocker.Mock(spec=SegmentSynchronizer) segment_sync.synchronize_segments.return_values = None syncs = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) tasks = SplitTasks(split_async_task_mock, segment_async_task_mock, imp_async_task_mock, - evt_async_task_mock, telemetry_async_task_mock, - imp_count_async_task_mock) + evt_async_task_mock, imp_count_async_task_mock) # Setup synchronizer def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, sse_url=None, client_key=None): @@ -248,7 +230,6 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk factory.destroy() assert len(imp_async_task_mock.stop.mock_calls) == 1 assert len(evt_async_task_mock.stop.mock_calls) == 1 - assert len(telemetry_async_task_mock.stop.mock_calls) == 1 assert len(imp_count_async_task_mock.stop.mock_calls) == 1 assert factory.destroyed is True @@ -298,15 +279,6 @@ def _event_task_init_mock(self, synchronize_events, period): self._task = evt_async_task_mock mocker.patch('splitio.client.factory.EventsSyncTask.__init__', new=_event_task_init_mock) - telemetry_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) - telemetry_async_task_mock.stop.side_effect = stop_mock_2 - - def _telemetry_task_init_mock(self, synchronize_counters, period): - self._period = period - self._task = telemetry_async_task_mock - mocker.patch('splitio.client.factory.ImpressionsCountSyncTask.__init__', - new=_telemetry_task_init_mock) - imp_count_async_task_mock = mocker.Mock(spec=asynctask.AsyncTask) imp_count_async_task_mock.stop.side_effect = stop_mock @@ -320,10 +292,9 @@ def _imppression_count_task_init_mock(self, synchronize_counters): segment_sync = mocker.Mock(spec=SegmentSynchronizer) segment_sync.synchronize_segments.return_values = None syncs = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) tasks = SplitTasks(split_async_task_mock, segment_async_task_mock, imp_async_task_mock, - evt_async_task_mock, telemetry_async_task_mock, - imp_count_async_task_mock) + evt_async_task_mock, imp_count_async_task_mock) # Setup synchronizer def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, sse_url=None, client_key=None): @@ -346,7 +317,6 @@ def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk assert event.is_set() assert len(imp_async_task_mock.stop.mock_calls) == 1 assert len(evt_async_task_mock.stop.mock_calls) == 1 - assert len(telemetry_async_task_mock.stop.mock_calls) == 1 assert len(imp_count_async_task_mock.stop.mock_calls) == 1 assert factory.destroyed is True @@ -470,20 +440,14 @@ def clear_impressions(): def clear_events(): clear_events._called += 1 - def clear_telemetry(): - clear_telemetry._called += 1 - clear_impressions._called = 0 clear_events._called = 0 - clear_telemetry._called = 0 split_storage = mocker.Mock(spec=inmemmory.SplitStorage) segment_storage = mocker.Mock(spec=inmemmory.SegmentStorage) impression_storage = mocker.Mock(spec=inmemmory.ImpressionStorage) impression_storage.clear.side_effect = clear_impressions event_storage = mocker.Mock(spec=inmemmory.EventStorage) event_storage.clear.side_effect = clear_events - telemetry_storage = mocker.Mock(spec=inmemmory.TelemetryStorage) - telemetry_storage.clear.side_effect = clear_telemetry def _get_storage_mock(self, name): return { @@ -491,7 +455,6 @@ def _get_storage_mock(self, name): 'segments': segment_storage, 'impressions': impression_storage, 'events': event_storage, - 'telemetry': telemetry_storage }[name] mocker.patch('splitio.client.factory.SplitFactory._get_storage', new=_get_storage_mock) @@ -520,7 +483,6 @@ def _get_storage_mock(self, name): assert clear_impressions._called == 1 assert clear_events._called == 1 - assert clear_telemetry._called == 1 def test_error_prefork(self, mocker): """Test not handling fork.""" diff --git a/tests/client/test_input_validator.py b/tests/client/test_input_validator.py index 5ad58a25..98416fe6 100644 --- a/tests/client/test_input_validator.py +++ b/tests/client/test_input_validator.py @@ -5,8 +5,7 @@ from splitio.client.client import CONTROL, Client, _LOGGER as _logger from splitio.client.manager import SplitManager from splitio.client.key import Key -from splitio.storage import SplitStorage, EventStorage, ImpressionStorage, TelemetryStorage, \ - SegmentStorage +from splitio.storage import SplitStorage, EventStorage, ImpressionStorage, SegmentStorage from splitio.models.splits import Split from splitio.client import input_validator from splitio.recorder.recorder import StandardRecorder @@ -33,7 +32,6 @@ def _get_storage_mock(storage): 'segments': mocker.Mock(spec=SegmentStorage), 'impressions': mocker.Mock(spec=ImpressionStorage), 'events': mocker.Mock(spec=EventStorage), - 'telemetry': mocker.Mock(spec=TelemetryStorage) }[storage] factory_mock = mocker.Mock(spec=SplitFactory) factory_mock._get_storage.side_effect = _get_storage_mock @@ -262,7 +260,6 @@ def _get_storage_mock(storage): 'segments': mocker.Mock(spec=SegmentStorage), 'impressions': mocker.Mock(spec=ImpressionStorage), 'events': mocker.Mock(spec=EventStorage), - 'telemetry': mocker.Mock(spec=TelemetryStorage) }[storage] factory_mock = mocker.Mock(spec=SplitFactory) factory_mock._get_storage.side_effect = _get_storage_mock @@ -526,7 +523,7 @@ def test_track(self, mocker): event_storage = mocker.Mock(spec=EventStorage) event_storage.put.return_value = True - recorder = StandardRecorder(mocker.Mock(), mocker.Mock(), event_storage, mocker.Mock()) + recorder = StandardRecorder(mocker.Mock(), event_storage, mocker.Mock()) client = Client(factory_mock, recorder) client._event_storage = event_storage _logger = mocker.Mock() @@ -788,7 +785,6 @@ def _get_storage_mock(storage): 'segments': mocker.Mock(spec=SegmentStorage), 'impressions': mocker.Mock(spec=ImpressionStorage), 'events': mocker.Mock(spec=EventStorage), - 'telemetry': mocker.Mock(spec=TelemetryStorage) }[storage] factory_mock = mocker.Mock(spec=SplitFactory) factory_mock._get_storage.side_effect = _get_storage_mock diff --git a/tests/client/test_localhost.py b/tests/client/test_localhost.py index c78e359e..d211bf2c 100644 --- a/tests/client/test_localhost.py +++ b/tests/client/test_localhost.py @@ -40,40 +40,6 @@ def test_dummy_event_storage(self): assert evt_storage.pop_many([2]) is None assert evt_storage.pop_many(object) is None - def test_dummy_telemetry_storage(self): - """Test that dummy telemetry storage never complains.""" - telemetry_storage = localhost.LocalhostTelemetryStorage() - assert telemetry_storage.inc_latency() is None - assert telemetry_storage.inc_latency('ads') is None - assert telemetry_storage.inc_latency(3) is None - assert telemetry_storage.inc_latency([2]) is None - assert telemetry_storage.inc_latency(object) is None - assert telemetry_storage.pop_latencies() is None - assert telemetry_storage.pop_latencies('ads') is None - assert telemetry_storage.pop_latencies(3) is None - assert telemetry_storage.pop_latencies([2]) is None - assert telemetry_storage.pop_latencies(object) is None - assert telemetry_storage.inc_counter() is None - assert telemetry_storage.inc_counter('ads') is None - assert telemetry_storage.inc_counter(3) is None - assert telemetry_storage.inc_counter([2]) is None - assert telemetry_storage.inc_counter(object) is None - assert telemetry_storage.pop_counters() is None - assert telemetry_storage.pop_counters('ads') is None - assert telemetry_storage.pop_counters(3) is None - assert telemetry_storage.pop_counters([2]) is None - assert telemetry_storage.pop_counters(object) is None - assert telemetry_storage.put_gauge() is None - assert telemetry_storage.put_gauge('ads') is None - assert telemetry_storage.put_gauge(3) is None - assert telemetry_storage.put_gauge([2]) is None - assert telemetry_storage.put_gauge(object) is None - assert telemetry_storage.pop_gauges() is None - assert telemetry_storage.pop_gauges('ads') is None - assert telemetry_storage.pop_gauges(3) is None - assert telemetry_storage.pop_gauges([2]) is None - assert telemetry_storage.pop_gauges(object) is None - class SplitFetchingTaskTests(object): """Localhost split fetching task test cases.""" diff --git a/tests/helpers/mockserver.py b/tests/helpers/mockserver.py index 621d40b6..9d05b3c9 100644 --- a/tests/helpers/mockserver.py +++ b/tests/helpers/mockserver.py @@ -236,8 +236,7 @@ def do_POST(self): #pylint:disable=invalid-name self._req_queue.put(Request('POST', self.path, headers, body)) if self.path in set(['/api/testImpressions/bulk', '/testImpressions/count', - '/api/events/bulk', '/metrics/times', '/metrics/count', - '/metrics/gauge']): + '/api/events/bulk']): self.send_response(200) self.send_header("Content-type", "application/json") diff --git a/tests/integration/test_client_e2e.py b/tests/integration/test_client_e2e.py index 9723f655..50ea1cae 100644 --- a/tests/integration/test_client_e2e.py +++ b/tests/integration/test_client_e2e.py @@ -9,9 +9,9 @@ from splitio.client.factory import get_factory, SplitFactory from splitio.client.util import SdkMetadata from splitio.storage.inmemmory import InMemoryEventStorage, InMemoryImpressionStorage, \ - InMemorySegmentStorage, InMemorySplitStorage, InMemoryTelemetryStorage + InMemorySegmentStorage, InMemorySplitStorage from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, \ - RedisSplitStorage, RedisSegmentStorage, RedisTelemetryStorage + RedisSplitStorage, RedisSegmentStorage from splitio.storage.adapters.redis import build, RedisAdapter from splitio.models import splits, segments from splitio.engine.impressions import Manager as ImpressionsManager, ImpressionsMode @@ -48,11 +48,9 @@ def setup_method(self): 'segments': segment_storage, 'impressions': InMemoryImpressionStorage(5000), 'events': InMemoryEventStorage(5000), - 'telemetry': InMemoryTelemetryStorage() } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) - recorder = StandardRecorder(impmanager, storages['telemetry'], storages['events'], - storages['impressions']) + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions']) self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init def teardown_method(self): @@ -298,11 +296,9 @@ def setup_method(self): 'segments': segment_storage, 'impressions': InMemoryImpressionStorage(5000), 'events': InMemoryEventStorage(5000), - 'telemetry': InMemoryTelemetryStorage() } impmanager = ImpressionsManager(ImpressionsMode.OPTIMIZED, True) - recorder = StandardRecorder(impmanager, storages['telemetry'], storages['events'], - storages['impressions']) + recorder = StandardRecorder(impmanager, storages['events'], storages['impressions']) self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init def _validate_last_impressions(self, client, *to_validate): @@ -518,10 +514,9 @@ def setup_method(self): 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), - 'telemetry': RedisTelemetryStorage(redis_client, metadata) } impmanager = ImpressionsManager(ImpressionsMode.DEBUG, False) - recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['telemetry'], + recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions']) self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init @@ -743,8 +738,6 @@ def test_manager_methods(self): def teardown_method(self): """Clear redis cache.""" keys_to_delete = [ - "SPLITIO/python-1.2.3/some_ip/latency.sdk.getTreatment.bucket.0", - "SPLITIO/python-1.2.3/some_ip/latency.sdk.getTreatmentWithConfig.bucket.0", "SPLITIO.segment.human_beigns", "SPLITIO.segment.employees.till", "SPLITIO.split.sample_feature", @@ -753,8 +746,6 @@ def teardown_method(self): "SPLITIO.split.all_feature", "SPLITIO.split.whitelist_feature", "SPLITIO.segment.employees", - "SPLITIO/python-1.2.3/some_ip/latency.sdk.getTreatments.bucket.0", - "SPLITIO/python-1.2.3/some_ip/latency.sdk.getTreatmentsWithConfig.bucket.0", "SPLITIO.split.regex_test", "SPLITIO.segment.human_beigns.till", "SPLITIO.split.boolean_test", @@ -800,10 +791,9 @@ def setup_method(self): 'segments': segment_storage, 'impressions': RedisImpressionsStorage(redis_client, metadata), 'events': RedisEventsStorage(redis_client, metadata), - 'telemetry': RedisTelemetryStorage(redis_client, metadata) } impmanager = ImpressionsManager(storages['impressions'].put, ImpressionsMode.DEBUG) - recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['telemetry'], + recorder = PipelinedRecorder(redis_client.pipeline, impmanager, storages['events'], storages['impressions']) self.factory = SplitFactory('some_api_key', storages, True, recorder) # pylint:disable=attribute-defined-outside-init diff --git a/tests/integration/test_redis_integration.py b/tests/integration/test_redis_integration.py index 52b02db8..685f72c5 100644 --- a/tests/integration/test_redis_integration.py +++ b/tests/integration/test_redis_integration.py @@ -7,7 +7,7 @@ from splitio.client.util import get_metadata from splitio.models import splits, impressions, events from splitio.storage.redis import RedisSplitStorage, RedisSegmentStorage, RedisImpressionsStorage, \ - RedisEventsStorage, RedisTelemetryStorage + RedisEventsStorage from splitio.storage.adapters.redis import _build_default_client from splitio.client.config import DEFAULT_CONFIG @@ -242,79 +242,3 @@ def test_put_fetch_contains_ip_address_disabled(self): assert event['m']['n'] == 'NA' finally: adapter.delete('SPLITIO.events') - - -class TelemetryStorageTests(object): - """Redis Telemetry storage e2e tests.""" - - def test_put_fetch_contains(self): - """Test storing and retrieving splits in redis.""" - adapter = _build_default_client({}) - cfg = DEFAULT_CONFIG.copy() - cfg.update({'IPAddressesEnabled': False}) - metadata = get_metadata(cfg) - storage = RedisTelemetryStorage(adapter, metadata) - try: - - storage.inc_counter('counter1') - storage.inc_counter('counter1') - storage.inc_counter('counter2') - assert adapter.get(storage._get_counter_key('counter1')) == '2' - assert adapter.get(storage._get_counter_key('counter2')) == '1' - - storage.inc_latency('latency1', 3) - storage.inc_latency('latency1', 3) - storage.inc_latency('latency2', 6) - assert adapter.get(storage._get_latency_key('latency1', 3)) == '2' - assert adapter.get(storage._get_latency_key('latency2', 6)) == '1' - - storage.put_gauge('gauge1', 3) - storage.put_gauge('gauge2', 1) - assert adapter.get(storage._get_gauge_key('gauge1')) == '3' - assert adapter.get(storage._get_gauge_key('gauge2')) == '1' - - finally: - adapter.delete( - storage._get_counter_key('counter1'), - storage._get_counter_key('counter2'), - storage._get_latency_key('latency1', 3), - storage._get_latency_key('latency2', 6), - storage._get_gauge_key('gauge1'), - storage._get_gauge_key('gauge2') - ) - - def test_put_fetch_contains_ip_address_disabled(self): - """Test storing and retrieving splits in redis.""" - adapter = _build_default_client({}) - cfg = DEFAULT_CONFIG.copy() - cfg.update({'IPAddressesEnabled': False}) - metadata = get_metadata(cfg) - storage = RedisTelemetryStorage(adapter, metadata) - try: - - storage.inc_counter('counter1') - storage.inc_counter('counter1') - storage.inc_counter('counter2') - assert adapter.get(storage._get_counter_key('counter1')) == '2' - assert adapter.get(storage._get_counter_key('counter2')) == '1' - - storage.inc_latency('latency1', 3) - storage.inc_latency('latency1', 3) - storage.inc_latency('latency2', 6) - assert adapter.get(storage._get_latency_key('latency1', 3)) == '2' - assert adapter.get(storage._get_latency_key('latency2', 6)) == '1' - - storage.put_gauge('gauge1', 3) - storage.put_gauge('gauge2', 1) - assert adapter.get(storage._get_gauge_key('gauge1')) == '3' - assert adapter.get(storage._get_gauge_key('gauge2')) == '1' - - finally: - adapter.delete( - storage._get_counter_key('counter1'), - storage._get_counter_key('counter2'), - storage._get_latency_key('latency1', 3), - storage._get_latency_key('latency2', 6), - storage._get_gauge_key('gauge1'), - storage._get_gauge_key('gauge2') - ) diff --git a/tests/recorder/test_recorder.py b/tests/recorder/test_recorder.py index 9212b307..2e32c3fb 100644 --- a/tests/recorder/test_recorder.py +++ b/tests/recorder/test_recorder.py @@ -4,8 +4,8 @@ from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder from splitio.engine.impressions import Manager as ImpressionsManager -from splitio.storage.inmemmory import TelemetryStorage, EventStorage, ImpressionStorage -from splitio.storage.redis import TelemetryPipelinedStorage, ImpressionPipelinedStorage, EventStorage +from splitio.storage.inmemmory import EventStorage, ImpressionStorage +from splitio.storage.redis import ImpressionPipelinedStorage, EventStorage from splitio.storage.adapters.redis import RedisAdapter from splitio.models.impressions import Impression @@ -20,14 +20,12 @@ def test_standard_recorder(self, mocker): ] impmanager = mocker.Mock(spec=ImpressionsManager) impmanager.process_impressions.return_value = impressions - telemetry = mocker.Mock(spec=TelemetryStorage) event = mocker.Mock(spec=EventStorage) impression = mocker.Mock(spec=ImpressionStorage) - recorder = StandardRecorder(impmanager, telemetry, event, impression) + recorder = StandardRecorder(impmanager, event, impression) recorder.record_treatment_stats(impressions, 1, 'some') assert recorder._impression_storage.put.mock_calls[0][1][0] == impressions - assert recorder._telemetry_storage.inc_latency.mock_calls == [mocker.call('some', 1)] def test_pipelined_recorder(self, mocker): impressions = [ @@ -37,12 +35,9 @@ def test_pipelined_recorder(self, mocker): redis = mocker.Mock(spec=RedisAdapter) impmanager = mocker.Mock(spec=ImpressionsManager) impmanager.process_impressions.return_value = impressions - telemetry = mocker.Mock(spec=TelemetryPipelinedStorage) event = mocker.Mock(spec=EventStorage) impression = mocker.Mock(spec=ImpressionPipelinedStorage) - recorder = PipelinedRecorder(redis, impmanager, telemetry, event, impression) + recorder = PipelinedRecorder(redis, impmanager, event, impression) recorder.record_treatment_stats(impressions, 1, 'some') assert recorder._impression_storage.add_impressions_to_pipe.mock_calls[0][1][0] == impressions - assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][0] == 'some' - assert recorder._telemetry_storage.add_latency_to_pipe.mock_calls[0][1][1] == 1 diff --git a/tests/storage/test_inmemory_storage.py b/tests/storage/test_inmemory_storage.py index f54ca329..8594a443 100644 --- a/tests/storage/test_inmemory_storage.py +++ b/tests/storage/test_inmemory_storage.py @@ -6,7 +6,7 @@ from splitio.models.events import Event, EventWrapper from splitio.storage.inmemmory import InMemorySplitStorage, InMemorySegmentStorage, \ - InMemoryImpressionStorage, InMemoryEventStorage, InMemoryTelemetryStorage + InMemoryImpressionStorage, InMemoryEventStorage class InMemorySplitStorageTests(object): @@ -392,62 +392,3 @@ def test_clear(self): assert storage._events.qsize() == 1 storage.clear() assert storage._events.qsize() == 0 - - -class InMemoryTelemetryStorageTests(object): - """In-Memory telemetry storage unit tests.""" - - def test_latencies(self): - """Test storing and retrieving latencies.""" - storage = InMemoryTelemetryStorage() - storage.inc_latency('sdk.get_treatment', -1) - storage.inc_latency('sdk.get_treatment', 0) - storage.inc_latency('sdk.get_treatment', 1) - storage.inc_latency('sdk.get_treatment', 5) - storage.inc_latency('sdk.get_treatment', 5) - storage.inc_latency('sdk.get_treatment', 22) - latencies = storage.pop_latencies() - assert latencies['sdk.get_treatment'][0] == 1 - assert latencies['sdk.get_treatment'][1] == 1 - assert latencies['sdk.get_treatment'][5] == 2 - assert len(latencies['sdk.get_treatment']) == 22 - assert storage.pop_latencies() == {} - - def test_counters(self): - """Test storing and retrieving counters.""" - storage = InMemoryTelemetryStorage() - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_2') - counters = storage.pop_counters() - assert counters['some_counter_1'] == 3 - assert counters['some_counter_2'] == 1 - assert storage.pop_counters() == {} - - def test_gauges(self): - """Test storing and retrieving gauges.""" - storage = InMemoryTelemetryStorage() - storage.put_gauge('some_gauge_1', 321) - storage.put_gauge('some_gauge_2', 654) - gauges = storage.pop_gauges() - assert gauges['some_gauge_1'] == 321 - assert gauges['some_gauge_2'] == 654 - assert storage.pop_gauges() == {} - - def test_clear(self): - """Test clear.""" - storage = InMemoryTelemetryStorage() - storage.put_gauge('some_gauge_1', 321) - storage.inc_counter('some_counter_1') - storage.inc_latency('sdk.get_treatment', 5) - - assert len(storage._counters) == 1 - assert len(storage._gauges) == 1 - assert len(storage._latencies) == 1 - - storage.clear() - - assert len(storage._counters) == 0 - assert len(storage._gauges) == 0 - assert len(storage._latencies) == 0 diff --git a/tests/storage/test_redis.py b/tests/storage/test_redis.py index 35493043..2a239904 100644 --- a/tests/storage/test_redis.py +++ b/tests/storage/test_redis.py @@ -6,7 +6,7 @@ from splitio.client.util import get_metadata from splitio.storage.redis import RedisEventsStorage, RedisImpressionsStorage, \ - RedisSegmentStorage, RedisSplitStorage, RedisTelemetryStorage + RedisSegmentStorage, RedisSplitStorage from splitio.models.segments import Segment from splitio.models.impressions import Impression from splitio.models.events import Event, EventWrapper @@ -380,75 +380,3 @@ def _raise_exc(*_): raise RedisAdapterException('something') adapter.rpush.side_effect = _raise_exc assert storage.put(events) is False - - -class RedisTelemetryStorageTests(object): - """Redis-based telemetry storage test cases.""" - - def test_inc_latency(self, mocker): - """Test incrementing latency.""" - adapter = mocker.Mock(spec=RedisAdapter) - metadata = get_metadata({}) - - storage = RedisTelemetryStorage(adapter, metadata) - storage.inc_latency('some_latency', 0) - storage.inc_latency('some_latency', 1) - storage.inc_latency('some_latency', 5) - storage.inc_latency('some_latency', 5) - storage.inc_latency('some_latency', 22) - assert adapter.incr.mock_calls == [ - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.0'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.1'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.5'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.5') - ] - - def test_add_latency_to_pipe(self, mocker): - """Test incrementing latency.""" - adapter = mocker.Mock(spec=RedisAdapter) - metadata = get_metadata({}) - - storage = RedisTelemetryStorage(adapter, metadata) - storage.inc_latency('some_latency', 0) - storage.inc_latency('some_latency', 1) - storage.inc_latency('some_latency', 5) - storage.inc_latency('some_latency', 5) - storage.inc_latency('some_latency', 22) - assert adapter.incr.mock_calls == [ - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.0'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.1'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.5'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/latency.some_latency.bucket.5') - ] - - def test_inc_counter(self, mocker): - """Test incrementing latency.""" - adapter = mocker.Mock(spec=RedisAdapter) - metadata = get_metadata({}) - - storage = RedisTelemetryStorage(adapter, metadata) - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_1') - storage.inc_counter('some_counter_2') - storage.inc_counter('some_counter_2') - assert adapter.incr.mock_calls == [ - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/count.some_counter_1'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/count.some_counter_1'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/count.some_counter_1'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/count.some_counter_2'), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/count.some_counter_2') - ] - - def test_inc_gauge(self, mocker): - """Test incrementing latency.""" - adapter = mocker.Mock(spec=RedisAdapter) - metadata = get_metadata({}) - - storage = RedisTelemetryStorage(adapter, metadata) - storage.put_gauge('gauge1', 123) - storage.put_gauge('gauge2', 456) - assert adapter.set.mock_calls == [ - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/gauge.gauge1', 123), - mocker.call('SPLITIO/' + metadata.sdk_version + '/' + metadata.instance_name + '/gauge.gauge2', 456) - ] diff --git a/tests/sync/test_manager.py b/tests/sync/test_manager.py index fbe98ba4..27c026c1 100644 --- a/tests/sync/test_manager.py +++ b/tests/sync/test_manager.py @@ -7,13 +7,11 @@ from splitio.tasks.segment_sync import SegmentSynchronizationTask from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask from splitio.tasks.events_sync import EventsSyncTask -from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask from splitio.sync.split import SplitSynchronizer from splitio.sync.segment import SegmentSynchronizer from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer from splitio.sync.event import EventSynchronizer -from splitio.sync.telemetry import TelemetrySynchronizer from splitio.sync.synchronizer import Synchronizer, SplitTasks, SplitSynchronizers from splitio.sync.manager import Manager @@ -30,7 +28,7 @@ class ManagerTests(object): def test_error(self, mocker): split_task = mocker.Mock(spec=SplitSynchronizationTask) split_tasks = SplitTasks(split_task, mocker.Mock(), mocker.Mock(), mocker.Mock(), - mocker.Mock(), mocker.Mock()) + mocker.Mock()) storage = mocker.Mock(spec=SplitStorage) api = mocker.Mock() @@ -43,7 +41,7 @@ def run(x): split_sync = SplitSynchronizer(api, storage) synchronizers = SplitSynchronizers(split_sync, mocker.Mock(), mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) synchronizer = Synchronizer(synchronizers, split_tasks) manager = Manager(threading.Event(), synchronizer, mocker.Mock(), False, SdkMetadata('1.0', 'some', '1.2.3.4')) diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 216e8d77..43377841 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -7,12 +7,10 @@ from splitio.tasks.segment_sync import SegmentSynchronizationTask from splitio.tasks.impressions_sync import ImpressionsSyncTask, ImpressionsCountSyncTask from splitio.tasks.events_sync import EventsSyncTask -from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask from splitio.sync.split import SplitSynchronizer from splitio.sync.segment import SegmentSynchronizer from splitio.sync.impression import ImpressionSynchronizer, ImpressionsCountSynchronizer from splitio.sync.event import EventSynchronizer -from splitio.sync.telemetry import TelemetrySynchronizer from splitio.storage import SegmentStorage, SplitStorage from splitio.api import APIException from splitio.models.splits import Split @@ -30,7 +28,7 @@ def run(x, c): split_sync = SplitSynchronizer(api, storage) split_synchronizers = SplitSynchronizers(split_sync, mocker.Mock(), mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) sychronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) sychronizer.synchronize_splits(None) # APIExceptions are handled locally and should not be propagated! @@ -51,7 +49,7 @@ def run(x, y): segment_sync = SegmentSynchronizer(api, split_storage, storage) split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) sychronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) sychronizer.sync_all() # SyncAll should not throw! @@ -88,7 +86,7 @@ def test_sync_all(self, mocker): segment_sync = SegmentSynchronizer(segment_api, split_storage, segment_storage) split_synchronizers = SplitSynchronizers(split_sync, segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) synchronizer = Synchronizer(split_synchronizers, mocker.Mock(spec=SplitTasks)) synchronizer.sync_all() @@ -106,7 +104,7 @@ def test_start_periodic_fetching(self, mocker): split_task = mocker.Mock(spec=SplitSynchronizationTask) segment_task = mocker.Mock(spec=SegmentSynchronizationTask) split_tasks = SplitTasks(split_task, segment_task, mocker.Mock(), mocker.Mock(), - mocker.Mock(), mocker.Mock()) + mocker.Mock()) synchronizer = Synchronizer(mocker.Mock(spec=SplitSynchronizers), split_tasks) synchronizer.start_periodic_fetching() @@ -118,9 +116,9 @@ def test_stop_periodic_fetching(self, mocker): segment_task = mocker.Mock(spec=SegmentSynchronizationTask) segment_sync = mocker.Mock(spec=SegmentSynchronizer) split_synchronizers = SplitSynchronizers(mocker.Mock(), segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) split_tasks = SplitTasks(split_task, segment_task, mocker.Mock(), mocker.Mock(), - mocker.Mock(), mocker.Mock()) + mocker.Mock()) synchronizer = Synchronizer(split_synchronizers, split_tasks) synchronizer.stop_periodic_fetching() @@ -132,16 +130,14 @@ def test_start_periodic_data_recording(self, mocker): impression_task = mocker.Mock(spec=ImpressionsSyncTask) impression_count_task = mocker.Mock(spec=ImpressionsCountSyncTask) event_task = mocker.Mock(spec=EventsSyncTask) - telemetry_task = mocker.Mock(spec=TelemetrySynchronizationTask) split_tasks = SplitTasks(mocker.Mock(), mocker.Mock(), impression_task, event_task, - telemetry_task, impression_count_task) + impression_count_task) synchronizer = Synchronizer(mocker.Mock(spec=SplitSynchronizers), split_tasks) synchronizer.start_periodic_data_recording() assert len(impression_task.start.mock_calls) == 1 assert len(impression_count_task.start.mock_calls) == 1 assert len(event_task.start.mock_calls) == 1 - assert len(telemetry_task.start.mock_calls) == 1 def test_stop_periodic_data_recording(self, mocker): @@ -158,17 +154,14 @@ def stop_mock_2(): impression_count_task.stop.side_effect = stop_mock event_task = mocker.Mock(spec=EventsSyncTask) event_task.stop.side_effect = stop_mock - telemetry_task = mocker.Mock(spec=TelemetrySynchronizationTask) - telemetry_task.stop.side_effect = stop_mock_2 split_tasks = SplitTasks(mocker.Mock(), mocker.Mock(), impression_task, event_task, - telemetry_task, impression_count_task) + impression_count_task) synchronizer = Synchronizer(mocker.Mock(spec=SplitSynchronizers), split_tasks) synchronizer.stop_periodic_data_recording(True) assert len(impression_task.stop.mock_calls) == 1 assert len(impression_count_task.stop.mock_calls) == 1 assert len(event_task.stop.mock_calls) == 1 - assert len(telemetry_task.stop.mock_calls) == 1 def test_shutdown(self, mocker): @@ -189,15 +182,13 @@ def stop_mock_2(): impression_count_task.stop.side_effect = stop_mock event_task = mocker.Mock(spec=EventsSyncTask) event_task.stop.side_effect = stop_mock - telemetry_task = mocker.Mock(spec=TelemetrySynchronizationTask) - telemetry_task.stop.side_effect = stop_mock_2 segment_sync = mocker.Mock(spec=SegmentSynchronizer) split_synchronizers = SplitSynchronizers(mocker.Mock(), segment_sync, mocker.Mock(), - mocker.Mock(), mocker.Mock(), mocker.Mock()) + mocker.Mock(), mocker.Mock()) split_tasks = SplitTasks(split_task, segment_task, impression_task, event_task, - telemetry_task, impression_count_task) + impression_count_task) synchronizer = Synchronizer(split_synchronizers, split_tasks) synchronizer.shutdown(True) @@ -207,7 +198,6 @@ def stop_mock_2(): assert len(impression_task.stop.mock_calls) == 1 assert len(impression_count_task.stop.mock_calls) == 1 assert len(event_task.stop.mock_calls) == 1 - assert len(telemetry_task.stop.mock_calls) == 1 def test_sync_all_ok(self, mocker): """Test that 3 attempts are done before failing.""" diff --git a/tests/sync/test_telemetry_synchronizer.py b/tests/sync/test_telemetry_synchronizer.py deleted file mode 100644 index 2d831bb3..00000000 --- a/tests/sync/test_telemetry_synchronizer.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Split Worker tests.""" - -import threading -import time -import pytest - -from splitio.api.client import HttpResponse -from splitio.api import APIException -from splitio.storage import TelemetryStorage -from splitio.sync.telemetry import TelemetrySynchronizer -from splitio.api.telemetry import TelemetryAPI - - -class TelemetrySynchronizerTests(object): - """Telemetry synchronizer test cases.""" - - def test_synchronize_impressions(self, mocker): - """Test normal behaviour of sync task.""" - api = mocker.Mock(spec=TelemetryAPI) - storage = mocker.Mock(spec=TelemetryStorage) - storage.pop_latencies.return_value = { - 'some_latency1': [1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - 'some_latency2': [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - } - storage.pop_gauges.return_value = { - 'gauge1': 123, - 'gauge2': 456 - } - storage.pop_counters.return_value = { - 'counter1': 1, - 'counter2': 5 - } - telemetry_synchronizer = TelemetrySynchronizer(api, storage) - telemetry_synchronizer.synchronize_telemetry() - - assert mocker.call() in storage.pop_latencies.mock_calls - assert mocker.call() in storage.pop_counters.mock_calls - assert mocker.call() in storage.pop_gauges.mock_calls - - assert mocker.call({ - 'some_latency1': [1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - 'some_latency2': [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - }) in api.flush_latencies.mock_calls - - assert mocker.call({ - 'gauge1': 123, - 'gauge2': 456 - }) in api.flush_gauges.mock_calls - - assert mocker.call({ - 'counter1': 1, - 'counter2': 5 - }) in api.flush_counters.mock_calls diff --git a/tests/tasks/test_telemetry_sync.py b/tests/tasks/test_telemetry_sync.py deleted file mode 100644 index aa144816..00000000 --- a/tests/tasks/test_telemetry_sync.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Telemetry synchronization task unit test module.""" -# pylint: disable=no-self-use -import time -import threading -from splitio.storage import TelemetryStorage -from splitio.api.telemetry import TelemetryAPI -from splitio.tasks.telemetry_sync import TelemetrySynchronizationTask -from splitio.sync.telemetry import TelemetrySynchronizer - - -class TelemetrySyncTests(object): # pylint: disable=too-few-public-methods - """Impressions Syncrhonization task test cases.""" - - def test_normal_operation(self, mocker): - """Test normal behaviour of sync task.""" - api = mocker.Mock(spec=TelemetryAPI) - storage = mocker.Mock(spec=TelemetryStorage) - storage.pop_latencies.return_value = { - 'some_latency1': [1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - 'some_latency2': [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - } - storage.pop_gauges.return_value = { - 'gauge1': 123, - 'gauge2': 456 - } - storage.pop_counters.return_value = { - 'counter1': 1, - 'counter2': 5 - } - telemtry_synchronizer = TelemetrySynchronizer(api, storage) - task = TelemetrySynchronizationTask(telemtry_synchronizer.synchronize_telemetry, 1) - task.start() - time.sleep(2) - assert task.is_running() - - stop_event = threading.Event() - task.stop(stop_event) - stop_event.wait() - - assert stop_event.is_set() - assert not task.is_running() - assert mocker.call() in storage.pop_latencies.mock_calls - assert mocker.call() in storage.pop_counters.mock_calls - assert mocker.call() in storage.pop_gauges.mock_calls - - assert mocker.call({ - 'some_latency1': [1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], - 'some_latency2': [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - }) in api.flush_latencies.mock_calls - - assert mocker.call({ - 'gauge1': 123, - 'gauge2': 456 - }) in api.flush_gauges.mock_calls - - assert mocker.call({ - 'counter1': 1, - 'counter2': 5 - }) in api.flush_counters.mock_calls From 384605646dbebc67596a25a4e3af6d614874b7dd Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 7 Dec 2021 19:01:42 -0300 Subject: [PATCH 2/2] changelog --- CHANGES.txt | 3 +++ splitio/version.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 26899c33..02cf103e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +9.2.0 (Dec XX, 2021) +- + 9.1.0 (Jul 15, 2021) - Added Cache-Control header for on-demand requests to sdk-server. - Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced. diff --git a/splitio/version.py b/splitio/version.py index 8347f6c6..13006c1c 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '9.1.0' +__version__ = '9.2.0-rc1'