diff --git a/CHANGES.txt b/CHANGES.txt index 2a5076d5..26899c33 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +9.1.0 (Jul 15, 2021) +- Added Cache-Control header for on-demand requests to sdk-server. +- Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced. + 9.0.0 (May 3, 2021) - BREAKING CHANGE: Removed splitSdkMachineIp and splitSdkMachineName configs. - BREAKING CHANGE: Deprecated `redisCharset` config. diff --git a/LICENSE.txt b/LICENSE.txt index fb8a05ab..18dec1b3 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,4 +1,4 @@ -Copyright © 2020 Split Software, Inc. +Copyright © 2021 Split Software, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/splitio/api/__init__.py b/splitio/api/__init__.py index aff06a51..33f1e588 100644 --- a/splitio/api/__init__.py +++ b/splitio/api/__init__.py @@ -13,31 +13,3 @@ def __init__(self, custom_message, status_code=None): def status_code(self): """Return HTTP status code.""" return self._status_code - - -def headers_from_metadata(sdk_metadata, client_key=None): - """ - Generate a dict with headers required by data-recording API endpoints. - - :param sdk_metadata: SDK Metadata object, generated at sdk initialization time. - :type sdk_metadata: splitio.client.util.SdkMetadata - - :param client_key: client key. - :type client_key: str - - :return: A dictionary with headers. - :rtype: dict - """ - - metadata = { - 'SplitSDKVersion': sdk_metadata.sdk_version, - 'SplitSDKMachineIP': sdk_metadata.instance_ip, - 'SplitSDKMachineName': sdk_metadata.instance_name - } if sdk_metadata.instance_ip != 'NA' and sdk_metadata.instance_ip != 'unknown' else { - 'SplitSDKVersion': sdk_metadata.sdk_version, - } - - if client_key is not None: - metadata['SplitSDKClientKey'] = client_key - - return metadata diff --git a/splitio/api/auth.py b/splitio/api/auth.py index 652c241c..0c1a3d12 100644 --- a/splitio/api/auth.py +++ b/splitio/api/auth.py @@ -3,7 +3,8 @@ import logging import json -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata from splitio.api.client import HttpClientException from splitio.models.token import from_raw diff --git a/splitio/api/commons.py b/splitio/api/commons.py new file mode 100644 index 00000000..53019427 --- /dev/null +++ b/splitio/api/commons.py @@ -0,0 +1,95 @@ +"""Commons module.""" + + +_CACHE_CONTROL = 'Cache-Control' +_CACHE_CONTROL_NO_CACHE = 'no-cache' + + +def headers_from_metadata(sdk_metadata, client_key=None): + """ + Generate a dict with headers required by data-recording API endpoints. + + :param sdk_metadata: SDK Metadata object, generated at sdk initialization time. + :type sdk_metadata: splitio.client.util.SdkMetadata + + :param client_key: client key. + :type client_key: str + + :return: A dictionary with headers. + :rtype: dict + """ + + metadata = { + 'SplitSDKVersion': sdk_metadata.sdk_version, + 'SplitSDKMachineIP': sdk_metadata.instance_ip, + 'SplitSDKMachineName': sdk_metadata.instance_name + } if sdk_metadata.instance_ip != 'NA' and sdk_metadata.instance_ip != 'unknown' else { + 'SplitSDKVersion': sdk_metadata.sdk_version, + } + + if client_key is not None: + metadata['SplitSDKClientKey'] = client_key + + return metadata + + +class FetchOptions(object): + """Fetch Options object.""" + + def __init__(self, cache_control_headers=False, change_number=None): + """ + Class constructor. + + :param cache_control_headers: Flag for Cache-Control header + :type cache_control_headers: bool + + :param change_number: ChangeNumber to use for bypassing CDN in request. + :type change_number: int + """ + self._cache_control_headers = cache_control_headers + self._change_number = change_number + + @property + def cache_control_headers(self): + """Return cache control headers.""" + return self._cache_control_headers + + @property + def change_number(self): + """Return change number.""" + return self._change_number + + def __eq__(self, other): + """Match between other options.""" + if self._cache_control_headers != other._cache_control_headers: + return False + if self._change_number != other._change_number: + return False + return True + + +def build_fetch(change_number, fetch_options, metadata): + """ + Build fetch with new flags if that is the case. + + :param change_number: Last known timestamp of definition. + :type change_number: int + + :param fetch_options: Fetch options for getting definitions. + :type fetch_options: splitio.api.commons.FetchOptions + + :param metadata: Metadata Headers. + :type metadata: dict + + :return: Objects for fetch + :rtype: dict, dict + """ + query = {'since': change_number} + extra_headers = metadata + if fetch_options is None: + return query, extra_headers + if fetch_options.cache_control_headers: + extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE + if fetch_options.change_number is not None: + query['till'] = fetch_options.change_number + return query, extra_headers diff --git a/splitio/api/events.py b/splitio/api/events.py index 84b0f52c..b8ddda36 100644 --- a/splitio/api/events.py +++ b/splitio/api/events.py @@ -1,8 +1,9 @@ """Events API module.""" import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException from splitio.api.client import HttpClientException +from splitio.api.commons import headers_from_metadata _LOGGER = logging.getLogger(__name__) diff --git a/splitio/api/impressions.py b/splitio/api/impressions.py index ffc1452c..02206a1e 100644 --- a/splitio/api/impressions.py +++ b/splitio/api/impressions.py @@ -3,8 +3,9 @@ import logging from itertools import groupby -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException from splitio.api.client import HttpClientException +from splitio.api.commons import headers_from_metadata from splitio.engine.impressions import ImpressionsMode diff --git a/splitio/api/segments.py b/splitio/api/segments.py index 3221d0d1..ebc65a7e 100644 --- a/splitio/api/segments.py +++ b/splitio/api/segments.py @@ -3,7 +3,8 @@ import json import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata, build_fetch from splitio.api.client import HttpClientException @@ -29,25 +30,30 @@ def __init__(self, http_client, apikey, sdk_metadata): self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) - def fetch_segment(self, segment_name, change_number): + def fetch_segment(self, segment_name, change_number, fetch_options): """ Fetch splits from backend. :param segment_name: Name of the segment to fetch changes for. :type segment_name: str - :param change_number: Last known timestamp of a split modification. + + :param change_number: Last known timestamp of a segment modification. :type change_number: int + :param fetch_options: Fetch options for getting segment definitions. + :type fetch_options: splitio.api.commons.FetchOptions + :return: Json representation of a segmentChange response. :rtype: dict """ try: + query, extra_headers = build_fetch(change_number, fetch_options, self._metadata) response = self._client.get( 'sdk', '/segmentChanges/{segment_name}'.format(segment_name=segment_name), self._apikey, - extra_headers=self._metadata, - query={'since': change_number} + extra_headers=extra_headers, + query=query, ) if 200 <= response.status_code < 300: diff --git a/splitio/api/splits.py b/splitio/api/splits.py index 84e3dcfc..e395d454 100644 --- a/splitio/api/splits.py +++ b/splitio/api/splits.py @@ -3,7 +3,8 @@ import logging import json -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata, build_fetch from splitio.api.client import HttpClientException @@ -28,23 +29,27 @@ def __init__(self, client, apikey, sdk_metadata): self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) - def fetch_splits(self, change_number): + def fetch_splits(self, change_number, fetch_options): """ Fetch splits from backend. - :param changeNumber: Last known timestamp of a split modification. - :type changeNumber: int + :param change_number: Last known timestamp of a split modification. + :type change_number: int + + :param fetch_options: Fetch options for getting split definitions. + :type fetch_options: splitio.api.commons.FetchOptions :return: Json representation of a splitChanges response. :rtype: dict """ try: + query, extra_headers = build_fetch(change_number, fetch_options, self._metadata) response = self._client.get( 'sdk', '/splitChanges', self._apikey, - extra_headers=self._metadata, - query={'since': change_number} + extra_headers=extra_headers, + query=query, ) if 200 <= response.status_code < 300: return json.loads(response.body) diff --git a/splitio/api/telemetry.py b/splitio/api/telemetry.py index db4ae682..23ad4644 100644 --- a/splitio/api/telemetry.py +++ b/splitio/api/telemetry.py @@ -1,7 +1,8 @@ """Telemetry API Module.""" import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata from splitio.api.client import HttpClientException diff --git a/splitio/client/input_validator.py b/splitio/client/input_validator.py index b2478c6c..2ca42e1f 100644 --- a/splitio/client/input_validator.py +++ b/splitio/client/input_validator.py @@ -5,6 +5,7 @@ import math from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.client.key import Key from splitio.engine.evaluator import CONTROL @@ -458,7 +459,7 @@ def validate_apikey_type(segment_api): _logger = logging.getLogger('splitio.api.segments') try: _logger.addFilter(api_messages_filter) # pylint: disable=protected-access - segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1) + segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1, FetchOptions()) except APIException as exc: if exc.status_code == 403: _LOGGER.error('factory instantiation: you passed a browser type ' diff --git a/splitio/push/splitsse.py b/splitio/push/splitsse.py index e80e1549..f16a317f 100644 --- a/splitio/push/splitsse.py +++ b/splitio/push/splitsse.py @@ -4,7 +4,7 @@ from enum import Enum from splitio.push.sse import SSEClient, SSE_EVENT_ERROR from splitio.util.threadutil import EventGroup -from splitio.api import headers_from_metadata +from splitio.api.commons import headers_from_metadata _LOGGER = logging.getLogger(__name__) diff --git a/splitio/sync/segment.py b/splitio/sync/segment.py index e2b08b25..37e453bf 100644 --- a/splitio/sync/segment.py +++ b/splitio/sync/segment.py @@ -1,13 +1,21 @@ import logging +import time from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.tasks.util import workerpool from splitio.models import segments +from splitio.util.backoff import Backoff _LOGGER = logging.getLogger(__name__) +_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds +_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute +_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 + + class SegmentSynchronizer(object): def __init__(self, segment_api, split_storage, segment_storage): """ @@ -28,6 +36,9 @@ def __init__(self, segment_api, split_storage, segment_storage): self._segment_storage = segment_storage self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment) self._worker_pool.start() + self._backoff = Backoff( + _ON_DEMAND_FETCH_BACKOFF_BASE, + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) def recreate(self): """ @@ -44,27 +55,33 @@ def shutdown(self): """ self._worker_pool.stop() - def synchronize_segment(self, segment_name, till=None): + def _fetch_until(self, segment_name, fetch_options, till=None): """ - Update a segment from queue + Hit endpoint, update storage and return when since==till. :param segment_name: Name of the segment to update. :type segment_name: str - :param till: ChangeNumber received. + :param fetch_options Fetch options for getting segment definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. :type till: int + :return: last change number + :rtype: int """ - while True: + while True: # Fetch until since==till change_number = self._segment_storage.get_change_number(segment_name) if change_number is None: change_number = -1 if till is not None and till < change_number: # the passed till is less than change_number, no need to perform updates - return + return change_number try: - segment_changes = self._api.fetch_segment(segment_name, change_number) + segment_changes = self._api.fetch_segment(segment_name, change_number, + fetch_options) except APIException as exc: _LOGGER.error('Exception raised while fetching segment %s', segment_name) _LOGGER.debug('Exception information: ', exc_info=True) @@ -82,7 +99,63 @@ def synchronize_segment(self, segment_name, till=None): ) if segment_changes['till'] == segment_changes['since']: - return + return segment_changes['till'] + + def _attempt_segment_sync(self, segment_name, fetch_options, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: Flags to check if it should perform bypass or operation ended + :rtype: bool, int, int + """ + self._backoff.reset() + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES + while True: + remaining_attempts -= 1 + change_number = self._fetch_until(segment_name, fetch_options, till) + if till is None or till <= change_number: + return True, remaining_attempts, change_number + elif remaining_attempts <= 0: + return False, remaining_attempts, change_number + how_long = self._backoff.get() + time.sleep(how_long) + + def synchronize_segment(self, segment_name, till=None): + """ + Update a segment from queue + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param till: ChangeNumber received. + :type till: int + + """ + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache + successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till) + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if successful_sync: # succedeed sync + _LOGGER.debug('Refresh completed in %d attempts.', attempts) + return + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN + without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till) + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if without_cdn_successful_sync: + _LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', + without_cdn_attempts) + return + else: + _LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', + without_cdn_attempts) def synchronize_segments(self): """ diff --git a/splitio/sync/split.py b/splitio/sync/split.py index a86ca9c9..5331f556 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -3,9 +3,12 @@ import re import itertools import yaml +import time from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.models import splits +from splitio.util.backoff import Backoff _LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$') @@ -15,6 +18,11 @@ _LOGGER = logging.getLogger(__name__) +_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds +_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute +_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 + + class SplitSynchronizer(object): """Split changes synchronizer.""" @@ -30,24 +38,33 @@ def __init__(self, split_api, split_storage): """ self._api = split_api self._split_storage = split_storage + self._backoff = Backoff( + _ON_DEMAND_FETCH_BACKOFF_BASE, + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) - def synchronize_splits(self, till=None): + def _fetch_until(self, fetch_options, till=None): """ - Hit endpoint, update storage and return True if sync is complete. + Hit endpoint, update storage and return when since==till. + + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions :param till: Passed till from Streaming. :type till: int + + :return: last change number + :rtype: int """ - while True: + while True: # Fetch until since==till change_number = self._split_storage.get_change_number() if change_number is None: change_number = -1 if till is not None and till < change_number: # the passed till is less than change_number, no need to perform updates - return + return change_number try: - split_changes = self._api.fetch_splits(change_number) + split_changes = self._api.fetch_splits(change_number, fetch_options) except APIException as exc: _LOGGER.error('Exception raised while fetching splits') _LOGGER.debug('Exception information: ', exc_info=True) @@ -60,9 +77,58 @@ def synchronize_splits(self, till=None): self._split_storage.remove(split['name']) self._split_storage.set_change_number(split_changes['till']) - if split_changes['till'] == split_changes['since'] \ - and (till is None or split_changes['till'] >= till): - return + if split_changes['till'] == split_changes['since']: + return split_changes['till'] + + def _attempt_split_sync(self, fetch_options, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: Flags to check if it should perform bypass or operation ended + :rtype: bool, int, int + """ + self._backoff.reset() + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES + while True: + remaining_attempts -= 1 + change_number = self._fetch_until(fetch_options, till) + if till is None or till <= change_number: + return True, remaining_attempts, change_number + elif remaining_attempts <= 0: + return False, remaining_attempts, change_number + how_long = self._backoff.get() + time.sleep(how_long) + + def synchronize_splits(self, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param till: Passed till from Streaming. + :type till: int + """ + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache + successful_sync, remaining_attempts, change_number = self._attempt_split_sync(fetch_options, + till) + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if successful_sync: # succedeed sync + _LOGGER.debug('Refresh completed in %d attempts.', attempts) + return + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN + without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till) + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if without_cdn_successful_sync: + _LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', + without_cdn_attempts) + return + else: + _LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', + without_cdn_attempts) def kill_split(self, split_name, default_treatment, change_number): """ diff --git a/splitio/util/backoff.py b/splitio/util/backoff.py index d26ffe50..f1e47324 100644 --- a/splitio/util/backoff.py +++ b/splitio/util/backoff.py @@ -6,14 +6,18 @@ class Backoff(object): MAX_ALLOWED_WAIT = 30 * 60 # half an hour - def __init__(self, base=1): + def __init__(self, base=1, max_allowed=MAX_ALLOWED_WAIT): """ Class constructor. :param base: basic unit to be multiplied on each iteration (seconds) :param base: float + + :param max_allowed: max seconds to wait + :param max_allowed: int """ self._base = base + self._max_allowed = max_allowed self._attempt = 0 def get(self): @@ -23,7 +27,7 @@ def get(self): :returns: time to wait until next retry. :rtype: float """ - to_return = min(self._base * (2 ** self._attempt), self.MAX_ALLOWED_WAIT) + to_return = min(self._base * (2 ** self._attempt), self._max_allowed) self._attempt += 1 return to_return diff --git a/splitio/version.py b/splitio/version.py index 33de8d16..8347f6c6 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '9.0.0' +__version__ = '9.1.0' diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 5f145a21..52213128 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -1,6 +1,7 @@ """Split API tests module.""" import pytest + from splitio.api import auth, client, APIException from splitio.client.util import get_metadata from splitio.client.config import DEFAULT_CONFIG diff --git a/tests/api/test_segments_api.py b/tests/api/test_segments_api.py index 0d08e343..1998469a 100644 --- a/tests/api/test_segments_api.py +++ b/tests/api/test_segments_api.py @@ -1,7 +1,9 @@ """Segment API tests module.""" import pytest + from splitio.api import segments, client, APIException +from splitio.api.commons import FetchOptions from splitio.client.util import SdkMetadata @@ -13,8 +15,8 @@ def test_fetch_segment_changes(self, mocker): httpclient = mocker.Mock(spec=client.HttpClient) httpclient.get.return_value = client.HttpResponse(200, '{"prop1": "value1"}') segment_api = segments.SegmentsAPI(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4')) - response = segment_api.fetch_segment('some_segment', 123) + response = segment_api.fetch_segment('some_segment', 123, FetchOptions()) assert response['prop1'] == 'value1' assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', extra_headers={ @@ -24,11 +26,35 @@ def test_fetch_segment_changes(self, mocker): }, query={'since': 123})] + httpclient.reset_mock() + response = segment_api.fetch_segment('some_segment', 123, FetchOptions(True)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123})] + + httpclient.reset_mock() + response = segment_api.fetch_segment('some_segment', 123, FetchOptions(True, 123)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123, 'till': 123})] + httpclient.reset_mock() def raise_exception(*args, **kwargs): raise client.HttpClientException('some_message') httpclient.get.side_effect = raise_exception with pytest.raises(APIException) as exc_info: - response = segment_api.fetch_segment('some_segment', 123) + response = segment_api.fetch_segment('some_segment', 123, FetchOptions()) assert exc_info.type == APIException assert exc_info.value.message == 'some_message' diff --git a/tests/api/test_splits_api.py b/tests/api/test_splits_api.py index 7a01483c..5e914712 100644 --- a/tests/api/test_splits_api.py +++ b/tests/api/test_splits_api.py @@ -1,7 +1,9 @@ """Split API tests module.""" import pytest + from splitio.api import splits, client, APIException +from splitio.api.commons import FetchOptions from splitio.client.util import SdkMetadata @@ -13,22 +15,46 @@ def test_fetch_split_changes(self, mocker): httpclient = mocker.Mock(spec=client.HttpClient) httpclient.get.return_value = client.HttpResponse(200, '{"prop1": "value1"}') split_api = splits.SplitsAPI(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4')) - response = split_api.fetch_splits(123) + response = split_api.fetch_splits(123, FetchOptions()) assert response['prop1'] == 'value1' - assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', extra_headers={ - 'SplitSDKVersion': '1.0', - 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', 'SplitSDKMachineName': 'some' }, query={'since': 123})] + httpclient.reset_mock() + response = split_api.fetch_splits(123, FetchOptions(True)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123})] + + httpclient.reset_mock() + response = split_api.fetch_splits(123, FetchOptions(True, 123)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123, 'till': 123})] + httpclient.reset_mock() def raise_exception(*args, **kwargs): raise client.HttpClientException('some_message') httpclient.get.side_effect = raise_exception with pytest.raises(APIException) as exc_info: - response = split_api.fetch_splits(123) + response = split_api.fetch_splits(123, FetchOptions()) assert exc_info.type == APIException assert exc_info.value.message == 'some_message' diff --git a/tests/api/test_util.py b/tests/api/test_util.py index a3c7a20b..c245c157 100644 --- a/tests/api/test_util.py +++ b/tests/api/test_util.py @@ -2,8 +2,7 @@ import pytest -from splitio.api import headers_from_metadata - +from splitio.api.commons import headers_from_metadata from splitio.client.util import SdkMetadata diff --git a/tests/sync/test_segments_synchronizer.py b/tests/sync/test_segments_synchronizer.py index 1f58f3d9..1b4c9539 100644 --- a/tests/sync/test_segments_synchronizer.py +++ b/tests/sync/test_segments_synchronizer.py @@ -1,13 +1,9 @@ """Split Worker tests.""" -import threading -import time -import pytest - +from splitio.util.backoff import Backoff from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.storage import SplitStorage, SegmentStorage -from splitio.models.splits import Split -from splitio.sync.segment import SegmentSynchronizer from splitio.models.segments import Segment @@ -28,6 +24,7 @@ def run(x): raise APIException("something broke") api.fetch_segment.side_effect = run + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) assert not segments_synchronizer.synchronize_segments() @@ -57,7 +54,7 @@ def change_number_mock(segment_name): storage.get_change_number.side_effect = change_number_mock # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -78,16 +75,17 @@ def fetch_segment_mock(segment_name, change_number): api = mocker.Mock() api.fetch_segment.side_effect = fetch_segment_mock + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) assert segments_synchronizer.synchronize_segments() api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentB', -1) in api_calls - assert mocker.call('segmentC', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls - assert mocker.call('segmentB', 123) in api_calls - assert mocker.call('segmentC', 123) in api_calls + assert mocker.call('segmentA', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentB', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentC', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api_calls + assert mocker.call('segmentB', 123, FetchOptions(True)) in api_calls + assert mocker.call('segmentC', 123, FetchOptions(True)) in api_calls segment_put_calls = storage.put.mock_calls segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) @@ -111,7 +109,7 @@ def change_number_mock(segment_name): change_number_mock._count_a = 0 storage.get_change_number.side_effect = change_number_mock - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -122,15 +120,68 @@ def fetch_segment_mock(segment_name, change_number): api = mocker.Mock() api.fetch_segment.side_effect = fetch_segment_mock + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) segments_synchronizer.synchronize_segment('segmentA') api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls + assert mocker.call('segmentA', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api_calls + + def test_synchronize_segment_cdn(self, mocker): + """Test particular segment update cdn bypass.""" + mocker.patch('splitio.sync.segment._ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES', new=3) + from splitio.sync.segment import SegmentSynchronizer + + split_storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=SegmentStorage) + + def change_number_mock(segment_name): + change_number_mock._count_a += 1 + if change_number_mock._count_a == 1: + return -1 + elif change_number_mock._count_a >= 2 and change_number_mock._count_a <= 3: + return 123 + elif change_number_mock._count_a <= 7: + return 1234 + return 12345 # Return proper cn for CDN Bypass + + change_number_mock._count_a = 0 + storage.get_change_number.side_effect = change_number_mock + + def fetch_segment_mock(segment_name, change_number, fetch_options): + fetch_segment_mock._count_a += 1 + if fetch_segment_mock._count_a == 1: + return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], + 'since': -1, 'till': 123} + elif fetch_segment_mock._count_a == 2: + return {'added': [], 'removed': [], 'since': 123, 'till': 123} + elif fetch_segment_mock._count_a == 3: + return {'added': [], 'removed': [], 'since': 123, 'till': 1234} + elif fetch_segment_mock._count_a >= 4 and fetch_segment_mock._count_a <= 6: + return {'added': [], 'removed': [], 'since': 1234, 'till': 1234} + elif fetch_segment_mock._count_a == 7: + return {'added': [], 'removed': [], 'since': 1234, 'till': 12345} + return {'added': [], 'removed': [], 'since': 12345, 'till': 12345} + fetch_segment_mock._count_a = 0 + + api = mocker.Mock() + api.fetch_segment.side_effect = fetch_segment_mock + + segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) + segments_synchronizer.synchronize_segment('segmentA') + + assert mocker.call('segmentA', -1, FetchOptions(True)) in api.fetch_segment.mock_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api.fetch_segment.mock_calls + + segments_synchronizer._backoff = Backoff(1, 0.1) + segments_synchronizer.synchronize_segment('segmentA', 12345) + assert mocker.call('segmentA', 12345, FetchOptions(True, 1234)) in api.fetch_segment.mock_calls + assert len(api.fetch_segment.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till) def test_recreate(self, mocker): """Test recreate logic.""" + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) current_pool = segments_synchronizer._worker_pool segments_synchronizer.recreate() diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 34074577..3b295d5b 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -1,14 +1,12 @@ """Split Worker tests.""" -import threading -import time import pytest +from splitio.util.backoff import Backoff from splitio.api import APIException -from splitio.tasks import split_sync +from splitio.api.commons import FetchOptions from splitio.storage import SplitStorage from splitio.models.splits import Split -from splitio.sync.split import SplitSynchronizer class SplitsSynchronizerTests(object): @@ -19,12 +17,13 @@ def test_synchronize_splits_error(self, mocker): storage = mocker.Mock(spec=SplitStorage) api = mocker.Mock() - def run(x): + def run(x, c): raise APIException("something broke") run._calls = 0 api.fetch_splits.side_effect = run storage.get_change_number.return_value = -1 + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) with pytest.raises(APIException): @@ -94,13 +93,14 @@ def get_changes(*args, **kwargs): 'till': 123 } get_changes.called = 0 - api.fetch_splits.side_effect = get_changes + + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer.synchronize_splits() - assert mocker.call(-1) in api.fetch_splits.mock_calls - assert mocker.call(123) in api.fetch_splits.mock_calls + assert mocker.call(-1, FetchOptions(True)) in api.fetch_splits.mock_calls + assert mocker.call(123, FetchOptions(True)) in api.fetch_splits.mock_calls inserted_split = storage.put.mock_calls[0][1][0] assert isinstance(inserted_split, Split) @@ -123,7 +123,95 @@ def get_changes(*args, **kwargs): api = mocker.Mock() api.fetch_splits.side_effect = get_changes + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer.synchronize_splits(1) assert get_changes.called == 0 + + def test_synchronize_splits_cdn(self, mocker): + """Test split sync with bypassing cdn.""" + mocker.patch('splitio.sync.split._ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES', new=3) + from splitio.sync.split import SplitSynchronizer + + storage = mocker.Mock(spec=SplitStorage) + + def change_number_mock(): + change_number_mock._calls += 1 + if change_number_mock._calls == 1: + return -1 + elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3: + return 123 + elif change_number_mock._calls <= 7: + return 1234 + return 12345 # Return proper cn for CDN Bypass + change_number_mock._calls = 0 + storage.get_change_number.side_effect = change_number_mock + + api = mocker.Mock() + splits = [{ + 'changeNumber': 123, + 'trafficTypeName': 'user', + 'name': 'some_name', + 'trafficAllocation': 100, + 'trafficAllocationSeed': 123456, + 'seed': 321654, + 'status': 'ACTIVE', + 'killed': False, + 'defaultTreatment': 'off', + 'algo': 2, + 'conditions': [ + { + 'partitions': [ + {'treatment': 'on', 'size': 50}, + {'treatment': 'off', 'size': 50} + ], + 'contitionType': 'WHITELIST', + 'label': 'some_label', + 'matcherGroup': { + 'matchers': [ + { + 'matcherType': 'WHITELIST', + 'whitelistMatcherData': { + 'whitelist': ['k1', 'k2', 'k3'] + }, + 'negate': False, + } + ], + 'combiner': 'AND' + } + } + ] + }] + + def get_changes(*args, **kwargs): + get_changes.called += 1 + if get_changes.called == 1: + return { 'splits': splits, 'since': -1, 'till': 123 } + elif get_changes.called == 2: + return { 'splits': [], 'since': 123, 'till': 123 } + elif get_changes.called == 3: + return { 'splits': [], 'since': 123, 'till': 1234 } + elif get_changes.called >= 4 and get_changes.called <= 6: + return { 'splits': [], 'since': 1234, 'till': 1234 } + elif get_changes.called == 7: + return { 'splits': [], 'since': 1234, 'till': 12345 } + return { 'splits': [], 'since': 12345, 'till': 12345 } + get_changes.called = 0 + api.fetch_splits.side_effect = get_changes + + split_synchronizer = SplitSynchronizer(api, storage) + split_synchronizer._backoff = Backoff(1, 1) + split_synchronizer.synchronize_splits() + + assert mocker.call(-1, FetchOptions(True)) in api.fetch_splits.mock_calls + assert mocker.call(123, FetchOptions(True)) in api.fetch_splits.mock_calls + + split_synchronizer._backoff = Backoff(1, 0.1) + split_synchronizer.synchronize_splits(12345) + assert mocker.call(12345, FetchOptions(True, 1234)) in api.fetch_splits.mock_calls + assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till) + + inserted_split = storage.put.mock_calls[0][1][0] + assert isinstance(inserted_split, Split) + assert inserted_split.name == 'some_name' diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 0a2f7c3b..216e8d77 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -24,7 +24,7 @@ def test_sync_all_failed_splits(self, mocker): api = mocker.Mock() storage = mocker.Mock() - def run(x): + def run(x, c): raise APIException("something broke") api.fetch_splits.side_effect = run diff --git a/tests/tasks/test_segment_sync.py b/tests/tasks/test_segment_sync.py index aaad4673..91482a40 100644 --- a/tests/tasks/test_segment_sync.py +++ b/tests/tasks/test_segment_sync.py @@ -2,7 +2,7 @@ import threading import time -from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.tasks import segment_sync from splitio.storage import SegmentStorage, SplitStorage from splitio.models.splits import Split @@ -41,7 +41,7 @@ def change_number_mock(segment_name): storage.get_change_number.side_effect = change_number_mock # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -60,6 +60,7 @@ def fetch_segment_mock(segment_name, change_number): fetch_segment_mock._count_c = 0 api = mocker.Mock() + fetch_options = FetchOptions(True) api.fetch_segment.side_effect = fetch_segment_mock segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) @@ -76,12 +77,12 @@ def fetch_segment_mock(segment_name, change_number): assert not task.is_running() api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentB', -1) in api_calls - assert mocker.call('segmentC', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls - assert mocker.call('segmentB', 123) in api_calls - assert mocker.call('segmentC', 123) in api_calls + assert mocker.call('segmentA', -1, fetch_options) in api_calls + assert mocker.call('segmentB', -1, fetch_options) in api_calls + assert mocker.call('segmentC', -1, fetch_options) in api_calls + assert mocker.call('segmentA', 123, fetch_options) in api_calls + assert mocker.call('segmentB', 123, fetch_options) in api_calls + assert mocker.call('segmentC', 123, fetch_options) in api_calls segment_put_calls = storage.put.mock_calls segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) diff --git a/tests/tasks/test_split_sync.py b/tests/tasks/test_split_sync.py index 81693f76..adc90724 100644 --- a/tests/tasks/test_split_sync.py +++ b/tests/tasks/test_split_sync.py @@ -3,6 +3,7 @@ import threading import time from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.tasks import split_sync from splitio.storage import SplitStorage from splitio.models.splits import Split @@ -77,6 +78,7 @@ def get_changes(*args, **kwargs): } get_changes.called = 0 + fetch_options = FetchOptions(True) api.fetch_splits.side_effect = get_changes split_synchronizer = SplitSynchronizer(api, storage) task = split_sync.SplitSynchronizationTask(split_synchronizer.synchronize_splits, 0.5) @@ -87,8 +89,8 @@ def get_changes(*args, **kwargs): task.stop(stop_event) stop_event.wait() assert not task.is_running() - assert mocker.call(-1) in api.fetch_splits.mock_calls - assert mocker.call(123) in api.fetch_splits.mock_calls + assert mocker.call(-1, fetch_options) in api.fetch_splits.mock_calls + assert mocker.call(123, fetch_options) in api.fetch_splits.mock_calls inserted_split = storage.put.mock_calls[0][1][0] assert isinstance(inserted_split, Split)