From bd7e52b15728046e15d48a2253180da713caf3d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Lopez=20Zelaya?= Date: Fri, 11 Jun 2021 16:33:55 -0300 Subject: [PATCH 1/7] Update LICENSE.txt --- LICENSE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE.txt b/LICENSE.txt index fb8a05ab..18dec1b3 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,4 +1,4 @@ -Copyright © 2020 Split Software, Inc. +Copyright © 2021 Split Software, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From c2b280173208f4bf4f6e67b2ffadaeb757fdb74e Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Mon, 12 Jul 2021 14:54:27 -0300 Subject: [PATCH 2/7] added logic for bypassing cdn --- CHANGES.txt | 4 + splitio/api/__init__.py | 35 ++++++++ splitio/api/splits.py | 41 +++++++-- splitio/sync/split.py | 103 ++++++++++++++++------ splitio/util/backoff.py | 8 +- splitio/version.py | 2 +- tests/api/test_splits_api.py | 36 ++++++-- tests/sync/test_splits_synchronizer.py | 113 +++++++++++++++++++++++-- tests/sync/test_synchronizer.py | 2 +- tests/tasks/test_split_sync.py | 7 +- 10 files changed, 302 insertions(+), 49 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2a5076d5..8086ad3f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +9.1.0 (Jul XX, 2021) +- Added Cache-Control header for on-demand requests to sdk-server. +- Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced. + 9.0.0 (May 3, 2021) - BREAKING CHANGE: Removed splitSdkMachineIp and splitSdkMachineName configs. - BREAKING CHANGE: Deprecated `redisCharset` config. diff --git a/splitio/api/__init__.py b/splitio/api/__init__.py index aff06a51..33349504 100644 --- a/splitio/api/__init__.py +++ b/splitio/api/__init__.py @@ -41,3 +41,38 @@ def headers_from_metadata(sdk_metadata, client_key=None): metadata['SplitSDKClientKey'] = client_key return metadata + + +class FetchOptions(object): + """Fetch Options object.""" + + def __init__(self, cache_control_headers=False, change_number=None): + """ + Class constructor. + + :param cache_control_headers: Flag for Cache-Control header + :type cache_control_headers: bool + + :param change_number: ChangeNumber to use for bypassing CDN in request. + :type change_number: int + """ + self._cache_control_headers = cache_control_headers + self._change_number = change_number + + @property + def cache_control_headers(self): + """Return cache control headers.""" + return self._cache_control_headers + + @property + def change_number(self): + """Return change number.""" + return self._change_number + + def __eq__(self, other): + """Match between other options.""" + if self._cache_control_headers != other._cache_control_headers: + return False + if self._change_number != other._change_number: + return False + return True diff --git a/splitio/api/splits.py b/splitio/api/splits.py index 84e3dcfc..51b7a260 100644 --- a/splitio/api/splits.py +++ b/splitio/api/splits.py @@ -10,6 +10,10 @@ _LOGGER = logging.getLogger(__name__) +_CACHE_CONTROL = 'Cache-Control' +_CACHE_CONTROL_NO_CACHE = 'no-cache' + + class SplitsAPI(object): # pylint: disable=too-few-public-methods """Class that uses an httpClient to communicate with the splits API.""" @@ -28,23 +32,50 @@ def __init__(self, client, apikey, sdk_metadata): self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) - def fetch_splits(self, change_number): + def _build_fetch(self, change_number, fetch_options): + """ + Build fetch with new flags if that is the case. + + :param change_number: Last known timestamp of a split modification. + :type change_number: int + + :param fetch_options: Fetch options for getting split definitions. + :type fetch_options: splitio.api.FetchOptions + + :return: Objects for fetch + :rtype: dict, dict + """ + query = {'since': change_number} + extra_headers = self._metadata + if fetch_options is None: + return query, extra_headers + if fetch_options.cache_control_headers: + extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE + if fetch_options.change_number is not None: + query['till'] = fetch_options.change_number + return query, extra_headers + + def fetch_splits(self, change_number, fetch_options): """ Fetch splits from backend. - :param changeNumber: Last known timestamp of a split modification. - :type changeNumber: int + :param change_number: Last known timestamp of a split modification. + :type change_number: int + + :param fetch_options: Fetch options for getting split definitions. + :type fetch_options: splitio.api.FetchOptions :return: Json representation of a splitChanges response. :rtype: dict """ try: + query, extra_headers = self._build_fetch(change_number, fetch_options) response = self._client.get( 'sdk', '/splitChanges', self._apikey, - extra_headers=self._metadata, - query={'since': change_number} + extra_headers=extra_headers, + query=query, ) if 200 <= response.status_code < 300: return json.loads(response.body) diff --git a/splitio/sync/split.py b/splitio/sync/split.py index a86ca9c9..2c59bc90 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -3,9 +3,11 @@ import re import itertools import yaml +import time -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.models import splits +from splitio.util.backoff import Backoff _LEGACY_COMMENT_LINE_RE = re.compile(r'^#.*$') @@ -15,6 +17,11 @@ _LOGGER = logging.getLogger(__name__) +_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds +_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute +_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 + + class SplitSynchronizer(object): """Split changes synchronizer.""" @@ -30,39 +37,83 @@ def __init__(self, split_api, split_storage): """ self._api = split_api self._split_storage = split_storage + self._backoff = Backoff( + _ON_DEMAND_FETCH_BACKOFF_BASE, + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) - def synchronize_splits(self, till=None): + def attempt_split_sync(self, fetch_options, till=None): """ Hit endpoint, update storage and return True if sync is complete. + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions + :param till: Passed till from Streaming. :type till: int + + :return: Flags to check if it should perform bypass or operation ended + :rtype: bool, int, int """ + self._backoff.reset() + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES while True: - change_number = self._split_storage.get_change_number() - if change_number is None: - change_number = -1 - if till is not None and till < change_number: - # the passed till is less than change_number, no need to perform updates - return - - try: - split_changes = self._api.fetch_splits(change_number) - except APIException as exc: - _LOGGER.error('Exception raised while fetching splits') - _LOGGER.debug('Exception information: ', exc_info=True) - raise exc - - for split in split_changes.get('splits', []): - if split['status'] == splits.Status.ACTIVE.value: - self._split_storage.put(splits.from_raw(split)) - else: - self._split_storage.remove(split['name']) - - self._split_storage.set_change_number(split_changes['till']) - if split_changes['till'] == split_changes['since'] \ - and (till is None or split_changes['till'] >= till): - return + remaining_attempts -= 1 + while True: # Fetch until since==till + change_number = self._split_storage.get_change_number() + if change_number is None: + change_number = -1 + if till is not None and till < change_number: + # the passed till is less than change_number, no need to perform updates + break + + try: + split_changes = self._api.fetch_splits(change_number, fetch_options) + except APIException as exc: + _LOGGER.error('Exception raised while fetching splits') + _LOGGER.debug('Exception information: ', exc_info=True) + raise exc + + for split in split_changes.get('splits', []): + if split['status'] == splits.Status.ACTIVE.value: + self._split_storage.put(splits.from_raw(split)) + else: + self._split_storage.remove(split['name']) + + self._split_storage.set_change_number(split_changes['till']) + if split_changes['till'] == split_changes['since']: + break + + if till is None or till <= change_number: + return True, remaining_attempts, change_number + elif remaining_attempts <= 0: + return False, remaining_attempts, change_number + how_long = self._backoff.get() + time.sleep(how_long) + + def synchronize_splits(self, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param till: Passed till from Streaming. + :type till: int + """ + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache + successful_sync, remaining_attempts, change_number = self.attempt_split_sync(fetch_options, + till) + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if successful_sync: # succedeed sync + _LOGGER.debug('Refresh completed in %s attempts.', attempts) + return + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN + without_cdn_successful_sync, remaining_attempts, change_number = self.attempt_split_sync(with_cdn_bypass, till) + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if without_cdn_successful_sync: + _LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.', + without_cdn_attempts) + return + else: + _LOGGER.debug('No changes fetched after %s attempts with CDN bypassed.', + without_cdn_attempts) def kill_split(self, split_name, default_treatment, change_number): """ diff --git a/splitio/util/backoff.py b/splitio/util/backoff.py index d26ffe50..f1e47324 100644 --- a/splitio/util/backoff.py +++ b/splitio/util/backoff.py @@ -6,14 +6,18 @@ class Backoff(object): MAX_ALLOWED_WAIT = 30 * 60 # half an hour - def __init__(self, base=1): + def __init__(self, base=1, max_allowed=MAX_ALLOWED_WAIT): """ Class constructor. :param base: basic unit to be multiplied on each iteration (seconds) :param base: float + + :param max_allowed: max seconds to wait + :param max_allowed: int """ self._base = base + self._max_allowed = max_allowed self._attempt = 0 def get(self): @@ -23,7 +27,7 @@ def get(self): :returns: time to wait until next retry. :rtype: float """ - to_return = min(self._base * (2 ** self._attempt), self.MAX_ALLOWED_WAIT) + to_return = min(self._base * (2 ** self._attempt), self._max_allowed) self._attempt += 1 return to_return diff --git a/splitio/version.py b/splitio/version.py index 33de8d16..a8c6e862 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '9.0.0' +__version__ = '9.1.0-rc1' diff --git a/tests/api/test_splits_api.py b/tests/api/test_splits_api.py index 7a01483c..ac96e12a 100644 --- a/tests/api/test_splits_api.py +++ b/tests/api/test_splits_api.py @@ -1,7 +1,7 @@ """Split API tests module.""" import pytest -from splitio.api import splits, client, APIException +from splitio.api import splits, client, APIException, FetchOptions from splitio.client.util import SdkMetadata @@ -13,22 +13,46 @@ def test_fetch_split_changes(self, mocker): httpclient = mocker.Mock(spec=client.HttpClient) httpclient.get.return_value = client.HttpResponse(200, '{"prop1": "value1"}') split_api = splits.SplitsAPI(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4')) - response = split_api.fetch_splits(123) + response = split_api.fetch_splits(123, FetchOptions()) assert response['prop1'] == 'value1' - assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', extra_headers={ - 'SplitSDKVersion': '1.0', - 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', 'SplitSDKMachineName': 'some' }, query={'since': 123})] + httpclient.reset_mock() + response = split_api.fetch_splits(123, FetchOptions(True)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123})] + + httpclient.reset_mock() + response = split_api.fetch_splits(123, FetchOptions(True, 123)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/splitChanges', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123, 'till': 123})] + httpclient.reset_mock() def raise_exception(*args, **kwargs): raise client.HttpClientException('some_message') httpclient.get.side_effect = raise_exception with pytest.raises(APIException) as exc_info: - response = split_api.fetch_splits(123) + response = split_api.fetch_splits(123, FetchOptions()) assert exc_info.type == APIException assert exc_info.value.message == 'some_message' diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 34074577..c53d868d 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -1,14 +1,14 @@ """Split Worker tests.""" +from splitio.util.backoff import Backoff import threading import time import pytest -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.tasks import split_sync from splitio.storage import SplitStorage from splitio.models.splits import Split -from splitio.sync.split import SplitSynchronizer class SplitsSynchronizerTests(object): @@ -19,12 +19,13 @@ def test_synchronize_splits_error(self, mocker): storage = mocker.Mock(spec=SplitStorage) api = mocker.Mock() - def run(x): + def run(x, c): raise APIException("something broke") run._calls = 0 api.fetch_splits.side_effect = run storage.get_change_number.return_value = -1 + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) with pytest.raises(APIException): @@ -96,11 +97,12 @@ def get_changes(*args, **kwargs): get_changes.called = 0 api.fetch_splits.side_effect = get_changes + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer.synchronize_splits() - assert mocker.call(-1) in api.fetch_splits.mock_calls - assert mocker.call(123) in api.fetch_splits.mock_calls + assert mocker.call(-1, FetchOptions(True)) in api.fetch_splits.mock_calls + assert mocker.call(123, FetchOptions(True)) in api.fetch_splits.mock_calls inserted_split = storage.put.mock_calls[0][1][0] assert isinstance(inserted_split, Split) @@ -123,7 +125,108 @@ def get_changes(*args, **kwargs): api = mocker.Mock() api.fetch_splits.side_effect = get_changes + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer.synchronize_splits(1) assert get_changes.called == 0 + + def test_synchronize_splits_cdn(self, mocker): + """Test split sync with bypassing cdn.""" + mocker.patch('splitio.sync.split._ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES', new=3) + from splitio.sync.split import SplitSynchronizer + + storage = mocker.Mock(spec=SplitStorage) + + def change_number_mock(): + change_number_mock._calls += 1 + if change_number_mock._calls == 1: + return -1 + elif change_number_mock._calls < 8: + return 123 + return 12345 # Return proper cn for CDN Bypass + change_number_mock._calls = 0 + storage.get_change_number.side_effect = change_number_mock + + api = mocker.Mock() + splits = [{ + 'changeNumber': 123, + 'trafficTypeName': 'user', + 'name': 'some_name', + 'trafficAllocation': 100, + 'trafficAllocationSeed': 123456, + 'seed': 321654, + 'status': 'ACTIVE', + 'killed': False, + 'defaultTreatment': 'off', + 'algo': 2, + 'conditions': [ + { + 'partitions': [ + {'treatment': 'on', 'size': 50}, + {'treatment': 'off', 'size': 50} + ], + 'contitionType': 'WHITELIST', + 'label': 'some_label', + 'matcherGroup': { + 'matchers': [ + { + 'matcherType': 'WHITELIST', + 'whitelistMatcherData': { + 'whitelist': ['k1', 'k2', 'k3'] + }, + 'negate': False, + } + ], + 'combiner': 'AND' + } + } + ] + }] + + def get_changes(*args, **kwargs): + get_changes.called += 1 + + if get_changes.called == 1: + return { + 'splits': splits, + 'since': -1, + 'till': 123 + } + elif get_changes.called == 2: + return { + 'splits': [], + 'since': 123, + 'till': 123 + } + elif get_changes.called > 2 and get_changes.called < 5: # Simulating condition to equal since and till + return { + 'splits': [], + 'since': 123, + 'till': 12345 + } + else: + return { + 'splits': [], + 'since': 12345, + 'till': 12345 + } + get_changes.called = 0 + + api.fetch_splits.side_effect = get_changes + split_synchronizer = SplitSynchronizer(api, storage) + split_synchronizer._backoff = Backoff(1, 1) + split_synchronizer.synchronize_splits() + + assert mocker.call(-1, FetchOptions(True)) in api.fetch_splits.mock_calls + assert mocker.call(123, FetchOptions(True)) in api.fetch_splits.mock_calls + + split_synchronizer._backoff = Backoff(1, 0.1) + split_synchronizer.synchronize_splits(12345) + + assert mocker.call(12345, FetchOptions(True, 123)) in api.fetch_splits.mock_calls + assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + 2 since == till + 3 backoff + 1 CDN + + inserted_split = storage.put.mock_calls[0][1][0] + assert isinstance(inserted_split, Split) + assert inserted_split.name == 'some_name' diff --git a/tests/sync/test_synchronizer.py b/tests/sync/test_synchronizer.py index 0a2f7c3b..216e8d77 100644 --- a/tests/sync/test_synchronizer.py +++ b/tests/sync/test_synchronizer.py @@ -24,7 +24,7 @@ def test_sync_all_failed_splits(self, mocker): api = mocker.Mock() storage = mocker.Mock() - def run(x): + def run(x, c): raise APIException("something broke") api.fetch_splits.side_effect = run diff --git a/tests/tasks/test_split_sync.py b/tests/tasks/test_split_sync.py index 81693f76..dc58bc56 100644 --- a/tests/tasks/test_split_sync.py +++ b/tests/tasks/test_split_sync.py @@ -2,7 +2,7 @@ import threading import time -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.tasks import split_sync from splitio.storage import SplitStorage from splitio.models.splits import Split @@ -77,6 +77,7 @@ def get_changes(*args, **kwargs): } get_changes.called = 0 + fetch_options = FetchOptions(True) api.fetch_splits.side_effect = get_changes split_synchronizer = SplitSynchronizer(api, storage) task = split_sync.SplitSynchronizationTask(split_synchronizer.synchronize_splits, 0.5) @@ -87,8 +88,8 @@ def get_changes(*args, **kwargs): task.stop(stop_event) stop_event.wait() assert not task.is_running() - assert mocker.call(-1) in api.fetch_splits.mock_calls - assert mocker.call(123) in api.fetch_splits.mock_calls + assert mocker.call(-1, fetch_options) in api.fetch_splits.mock_calls + assert mocker.call(123, fetch_options) in api.fetch_splits.mock_calls inserted_split = storage.put.mock_calls[0][1][0] assert isinstance(inserted_split, Split) From 2049b14699e9e7233f6213115f9639cdfd7117d2 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Mon, 12 Jul 2021 16:04:25 -0300 Subject: [PATCH 3/7] improvements in attempting splits sync --- splitio/sync/split.py | 72 +++++++++++++++----------- tests/sync/test_splits_synchronizer.py | 28 +++++++--- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/splitio/sync/split.py b/splitio/sync/split.py index 2c59bc90..ac677b1b 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -41,7 +41,45 @@ def __init__(self, split_api, split_storage): _ON_DEMAND_FETCH_BACKOFF_BASE, _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) - def attempt_split_sync(self, fetch_options, till=None): + def _fetch_until(self, fetch_options, till=None): + """ + Hit endpoint, update storage and return when since==till. + + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: last change number + :rtype: int + """ + while True: # Fetch until since==till + change_number = self._split_storage.get_change_number() + if change_number is None: + change_number = -1 + if till is not None and till < change_number: + # the passed till is less than change_number, no need to perform updates + return change_number + + try: + split_changes = self._api.fetch_splits(change_number, fetch_options) + except APIException as exc: + _LOGGER.error('Exception raised while fetching splits') + _LOGGER.debug('Exception information: ', exc_info=True) + raise exc + + for split in split_changes.get('splits', []): + if split['status'] == splits.Status.ACTIVE.value: + self._split_storage.put(splits.from_raw(split)) + else: + self._split_storage.remove(split['name']) + + self._split_storage.set_change_number(split_changes['till']) + if split_changes['till'] == split_changes['since']: + return split_changes['till'] + + def _attempt_split_sync(self, fetch_options, till=None): """ Hit endpoint, update storage and return True if sync is complete. @@ -58,31 +96,7 @@ def attempt_split_sync(self, fetch_options, till=None): remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES while True: remaining_attempts -= 1 - while True: # Fetch until since==till - change_number = self._split_storage.get_change_number() - if change_number is None: - change_number = -1 - if till is not None and till < change_number: - # the passed till is less than change_number, no need to perform updates - break - - try: - split_changes = self._api.fetch_splits(change_number, fetch_options) - except APIException as exc: - _LOGGER.error('Exception raised while fetching splits') - _LOGGER.debug('Exception information: ', exc_info=True) - raise exc - - for split in split_changes.get('splits', []): - if split['status'] == splits.Status.ACTIVE.value: - self._split_storage.put(splits.from_raw(split)) - else: - self._split_storage.remove(split['name']) - - self._split_storage.set_change_number(split_changes['till']) - if split_changes['till'] == split_changes['since']: - break - + change_number = self._fetch_until(fetch_options, till) if till is None or till <= change_number: return True, remaining_attempts, change_number elif remaining_attempts <= 0: @@ -98,14 +112,14 @@ def synchronize_splits(self, till=None): :type till: int """ fetch_options = FetchOptions(True) # Set Cache-Control to no-cache - successful_sync, remaining_attempts, change_number = self.attempt_split_sync(fetch_options, - till) + successful_sync, remaining_attempts, change_number = self._attempt_split_sync(fetch_options, + till) attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if successful_sync: # succedeed sync _LOGGER.debug('Refresh completed in %s attempts.', attempts) return with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN - without_cdn_successful_sync, remaining_attempts, change_number = self.attempt_split_sync(with_cdn_bypass, till) + without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till) without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if without_cdn_successful_sync: _LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.', diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index c53d868d..2713a0e1 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -142,8 +142,10 @@ def change_number_mock(): change_number_mock._calls += 1 if change_number_mock._calls == 1: return -1 - elif change_number_mock._calls < 8: + elif change_number_mock._calls >= 2 and change_number_mock._calls <= 3: return 123 + elif change_number_mock._calls <= 7: + return 1234 return 12345 # Return proper cn for CDN Bypass change_number_mock._calls = 0 storage.get_change_number.side_effect = change_number_mock @@ -199,18 +201,29 @@ def get_changes(*args, **kwargs): 'since': 123, 'till': 123 } - elif get_changes.called > 2 and get_changes.called < 5: # Simulating condition to equal since and till + elif get_changes.called == 3: return { 'splits': [], 'since': 123, - 'till': 12345 + 'till': 1234 } - else: + elif get_changes.called >= 4 and get_changes.called <= 6: + return { + 'splits': [], + 'since': 1234, + 'till': 1234 + } + elif get_changes.called == 7: return { 'splits': [], - 'since': 12345, + 'since': 1234, 'till': 12345 } + return { + 'splits': [], + 'since': 12345, + 'till': 12345 + } get_changes.called = 0 api.fetch_splits.side_effect = get_changes @@ -223,9 +236,8 @@ def get_changes(*args, **kwargs): split_synchronizer._backoff = Backoff(1, 0.1) split_synchronizer.synchronize_splits(12345) - - assert mocker.call(12345, FetchOptions(True, 123)) in api.fetch_splits.mock_calls - assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + 2 since == till + 3 backoff + 1 CDN + assert mocker.call(12345, FetchOptions(True, 1234)) in api.fetch_splits.mock_calls + assert len(api.fetch_splits.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till) inserted_split = storage.put.mock_calls[0][1][0] assert isinstance(inserted_split, Split) From 4abe8d08d5b028cfa23f782b93c5f37542c36eec Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 13 Jul 2021 15:24:17 -0300 Subject: [PATCH 4/7] added cdn bypass segments --- splitio/api/__init__.py | 31 +++++++++ splitio/api/segments.py | 15 ++-- splitio/api/splits.py | 31 +-------- splitio/client/input_validator.py | 4 +- splitio/sync/segment.py | 88 +++++++++++++++++++++--- tests/api/test_segments_api.py | 30 +++++++- tests/sync/test_segments_synchronizer.py | 84 +++++++++++++++++----- tests/sync/test_splits_synchronizer.py | 46 +++---------- tests/tasks/test_segment_sync.py | 17 ++--- 9 files changed, 237 insertions(+), 109 deletions(-) diff --git a/splitio/api/__init__.py b/splitio/api/__init__.py index 33349504..fe698d23 100644 --- a/splitio/api/__init__.py +++ b/splitio/api/__init__.py @@ -1,6 +1,10 @@ """Split API module.""" +_CACHE_CONTROL = 'Cache-Control' +_CACHE_CONTROL_NO_CACHE = 'no-cache' + + class APIException(Exception): """Exception to raise when an API call fails.""" @@ -76,3 +80,30 @@ def __eq__(self, other): if self._change_number != other._change_number: return False return True + + +def build_fetch(change_number, fetch_options, metadata): + """ + Build fetch with new flags if that is the case. + + :param change_number: Last known timestamp of definition. + :type change_number: int + + :param fetch_options: Fetch options for getting definitions. + :type fetch_options: splitio.api.FetchOptions + + :param metadata: Metadata Headers. + :type metadata: dict + + :return: Objects for fetch + :rtype: dict, dict + """ + query = {'since': change_number} + extra_headers = metadata + if fetch_options is None: + return query, extra_headers + if fetch_options.cache_control_headers: + extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE + if fetch_options.change_number is not None: + query['till'] = fetch_options.change_number + return query, extra_headers diff --git a/splitio/api/segments.py b/splitio/api/segments.py index 3221d0d1..591ab838 100644 --- a/splitio/api/segments.py +++ b/splitio/api/segments.py @@ -3,7 +3,7 @@ import json import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException, headers_from_metadata, build_fetch from splitio.api.client import HttpClientException @@ -29,25 +29,30 @@ def __init__(self, http_client, apikey, sdk_metadata): self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) - def fetch_segment(self, segment_name, change_number): + def fetch_segment(self, segment_name, change_number, fetch_options): """ Fetch splits from backend. :param segment_name: Name of the segment to fetch changes for. :type segment_name: str - :param change_number: Last known timestamp of a split modification. + + :param change_number: Last known timestamp of a segment modification. :type change_number: int + :param fetch_options: Fetch options for getting segment definitions. + :type fetch_options: splitio.api.FetchOptions + :return: Json representation of a segmentChange response. :rtype: dict """ try: + query, extra_headers = build_fetch(change_number, fetch_options, self._metadata) response = self._client.get( 'sdk', '/segmentChanges/{segment_name}'.format(segment_name=segment_name), self._apikey, - extra_headers=self._metadata, - query={'since': change_number} + extra_headers=extra_headers, + query=query, ) if 200 <= response.status_code < 300: diff --git a/splitio/api/splits.py b/splitio/api/splits.py index 51b7a260..83378691 100644 --- a/splitio/api/splits.py +++ b/splitio/api/splits.py @@ -3,17 +3,13 @@ import logging import json -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException, headers_from_metadata, build_fetch from splitio.api.client import HttpClientException _LOGGER = logging.getLogger(__name__) -_CACHE_CONTROL = 'Cache-Control' -_CACHE_CONTROL_NO_CACHE = 'no-cache' - - class SplitsAPI(object): # pylint: disable=too-few-public-methods """Class that uses an httpClient to communicate with the splits API.""" @@ -32,29 +28,6 @@ def __init__(self, client, apikey, sdk_metadata): self._apikey = apikey self._metadata = headers_from_metadata(sdk_metadata) - def _build_fetch(self, change_number, fetch_options): - """ - Build fetch with new flags if that is the case. - - :param change_number: Last known timestamp of a split modification. - :type change_number: int - - :param fetch_options: Fetch options for getting split definitions. - :type fetch_options: splitio.api.FetchOptions - - :return: Objects for fetch - :rtype: dict, dict - """ - query = {'since': change_number} - extra_headers = self._metadata - if fetch_options is None: - return query, extra_headers - if fetch_options.cache_control_headers: - extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE - if fetch_options.change_number is not None: - query['till'] = fetch_options.change_number - return query, extra_headers - def fetch_splits(self, change_number, fetch_options): """ Fetch splits from backend. @@ -69,7 +42,7 @@ def fetch_splits(self, change_number, fetch_options): :rtype: dict """ try: - query, extra_headers = self._build_fetch(change_number, fetch_options) + query, extra_headers = build_fetch(change_number, fetch_options, self._metadata) response = self._client.get( 'sdk', '/splitChanges', diff --git a/splitio/client/input_validator.py b/splitio/client/input_validator.py index b2478c6c..3b1ef228 100644 --- a/splitio/client/input_validator.py +++ b/splitio/client/input_validator.py @@ -4,7 +4,7 @@ import re import math -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.client.key import Key from splitio.engine.evaluator import CONTROL @@ -458,7 +458,7 @@ def validate_apikey_type(segment_api): _logger = logging.getLogger('splitio.api.segments') try: _logger.addFilter(api_messages_filter) # pylint: disable=protected-access - segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1) + segment_api.fetch_segment('__SOME_INVALID_SEGMENT__', -1, FetchOptions()) except APIException as exc: if exc.status_code == 403: _LOGGER.error('factory instantiation: you passed a browser type ' diff --git a/splitio/sync/segment.py b/splitio/sync/segment.py index e2b08b25..15d89656 100644 --- a/splitio/sync/segment.py +++ b/splitio/sync/segment.py @@ -1,13 +1,20 @@ import logging +import time -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.tasks.util import workerpool from splitio.models import segments +from splitio.util.backoff import Backoff _LOGGER = logging.getLogger(__name__) +_ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds +_ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute +_ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10 + + class SegmentSynchronizer(object): def __init__(self, segment_api, split_storage, segment_storage): """ @@ -28,6 +35,9 @@ def __init__(self, segment_api, split_storage, segment_storage): self._segment_storage = segment_storage self._worker_pool = workerpool.WorkerPool(10, self.synchronize_segment) self._worker_pool.start() + self._backoff = Backoff( + _ON_DEMAND_FETCH_BACKOFF_BASE, + _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT) def recreate(self): """ @@ -44,27 +54,33 @@ def shutdown(self): """ self._worker_pool.stop() - def synchronize_segment(self, segment_name, till=None): + def _fetch_until(self, segment_name, fetch_options, till=None): """ - Update a segment from queue + Hit endpoint, update storage and return when since==till. :param segment_name: Name of the segment to update. :type segment_name: str - :param till: ChangeNumber received. + :param fetch_options Fetch options for getting segment definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. :type till: int + :return: last change number + :rtype: int """ - while True: + while True: # Fetch until since==till change_number = self._segment_storage.get_change_number(segment_name) if change_number is None: change_number = -1 if till is not None and till < change_number: # the passed till is less than change_number, no need to perform updates - return + return change_number try: - segment_changes = self._api.fetch_segment(segment_name, change_number) + segment_changes = self._api.fetch_segment(segment_name, change_number, + fetch_options) except APIException as exc: _LOGGER.error('Exception raised while fetching segment %s', segment_name) _LOGGER.debug('Exception information: ', exc_info=True) @@ -82,7 +98,63 @@ def synchronize_segment(self, segment_name, till=None): ) if segment_changes['till'] == segment_changes['since']: - return + return segment_changes['till'] + + def _attempt_segment_sync(self, segment_name, fetch_options, till=None): + """ + Hit endpoint, update storage and return True if sync is complete. + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param fetch_options Fetch options for getting split definitions. + :type fetch_options splitio.api.FetchOptions + + :param till: Passed till from Streaming. + :type till: int + + :return: Flags to check if it should perform bypass or operation ended + :rtype: bool, int, int + """ + self._backoff.reset() + remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES + while True: + remaining_attempts -= 1 + change_number = self._fetch_until(segment_name, fetch_options, till) + if till is None or till <= change_number: + return True, remaining_attempts, change_number + elif remaining_attempts <= 0: + return False, remaining_attempts, change_number + how_long = self._backoff.get() + time.sleep(how_long) + + def synchronize_segment(self, segment_name, till=None): + """ + Update a segment from queue + + :param segment_name: Name of the segment to update. + :type segment_name: str + + :param till: ChangeNumber received. + :type till: int + + """ + fetch_options = FetchOptions(True) # Set Cache-Control to no-cache + successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till) + attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if successful_sync: # succedeed sync + _LOGGER.debug('Refresh completed in %s attempts.', attempts) + return + with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN + without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till) + without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts + if without_cdn_successful_sync: + _LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.', + without_cdn_attempts) + return + else: + _LOGGER.debug('No changes fetched after %s attempts with CDN bypassed.', + without_cdn_attempts) def synchronize_segments(self): """ diff --git a/tests/api/test_segments_api.py b/tests/api/test_segments_api.py index 0d08e343..ed21c0d2 100644 --- a/tests/api/test_segments_api.py +++ b/tests/api/test_segments_api.py @@ -1,7 +1,7 @@ """Segment API tests module.""" import pytest -from splitio.api import segments, client, APIException +from splitio.api import segments, client, APIException, FetchOptions from splitio.client.util import SdkMetadata @@ -13,8 +13,8 @@ def test_fetch_segment_changes(self, mocker): httpclient = mocker.Mock(spec=client.HttpClient) httpclient.get.return_value = client.HttpResponse(200, '{"prop1": "value1"}') segment_api = segments.SegmentsAPI(httpclient, 'some_api_key', SdkMetadata('1.0', 'some', '1.2.3.4')) - response = segment_api.fetch_segment('some_segment', 123) + response = segment_api.fetch_segment('some_segment', 123, FetchOptions()) assert response['prop1'] == 'value1' assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', extra_headers={ @@ -24,11 +24,35 @@ def test_fetch_segment_changes(self, mocker): }, query={'since': 123})] + httpclient.reset_mock() + response = segment_api.fetch_segment('some_segment', 123, FetchOptions(True)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123})] + + httpclient.reset_mock() + response = segment_api.fetch_segment('some_segment', 123, FetchOptions(True, 123)) + assert response['prop1'] == 'value1' + assert httpclient.get.mock_calls == [mocker.call('sdk', '/segmentChanges/some_segment', 'some_api_key', + extra_headers={ + 'SplitSDKVersion': '1.0', + 'SplitSDKMachineIP': '1.2.3.4', + 'SplitSDKMachineName': 'some', + 'Cache-Control': 'no-cache' + }, + query={'since': 123, 'till': 123})] + httpclient.reset_mock() def raise_exception(*args, **kwargs): raise client.HttpClientException('some_message') httpclient.get.side_effect = raise_exception with pytest.raises(APIException) as exc_info: - response = segment_api.fetch_segment('some_segment', 123) + response = segment_api.fetch_segment('some_segment', 123, FetchOptions()) assert exc_info.type == APIException assert exc_info.value.message == 'some_message' diff --git a/tests/sync/test_segments_synchronizer.py b/tests/sync/test_segments_synchronizer.py index 1f58f3d9..2ff84205 100644 --- a/tests/sync/test_segments_synchronizer.py +++ b/tests/sync/test_segments_synchronizer.py @@ -1,13 +1,8 @@ """Split Worker tests.""" -import threading -import time -import pytest - -from splitio.api import APIException +from splitio.util.backoff import Backoff +from splitio.api import APIException, FetchOptions from splitio.storage import SplitStorage, SegmentStorage -from splitio.models.splits import Split -from splitio.sync.segment import SegmentSynchronizer from splitio.models.segments import Segment @@ -28,6 +23,7 @@ def run(x): raise APIException("something broke") api.fetch_segment.side_effect = run + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) assert not segments_synchronizer.synchronize_segments() @@ -57,7 +53,7 @@ def change_number_mock(segment_name): storage.get_change_number.side_effect = change_number_mock # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -78,16 +74,17 @@ def fetch_segment_mock(segment_name, change_number): api = mocker.Mock() api.fetch_segment.side_effect = fetch_segment_mock + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) assert segments_synchronizer.synchronize_segments() api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentB', -1) in api_calls - assert mocker.call('segmentC', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls - assert mocker.call('segmentB', 123) in api_calls - assert mocker.call('segmentC', 123) in api_calls + assert mocker.call('segmentA', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentB', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentC', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api_calls + assert mocker.call('segmentB', 123, FetchOptions(True)) in api_calls + assert mocker.call('segmentC', 123, FetchOptions(True)) in api_calls segment_put_calls = storage.put.mock_calls segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) @@ -111,7 +108,7 @@ def change_number_mock(segment_name): change_number_mock._count_a = 0 storage.get_change_number.side_effect = change_number_mock - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -122,15 +119,68 @@ def fetch_segment_mock(segment_name, change_number): api = mocker.Mock() api.fetch_segment.side_effect = fetch_segment_mock + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) segments_synchronizer.synchronize_segment('segmentA') api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls + assert mocker.call('segmentA', -1, FetchOptions(True)) in api_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api_calls + + def test_synchronize_segment_cdn(self, mocker): + """Test particular segment update cdn bypass.""" + mocker.patch('splitio.sync.segment._ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES', new=3) + from splitio.sync.segment import SegmentSynchronizer + + split_storage = mocker.Mock(spec=SplitStorage) + storage = mocker.Mock(spec=SegmentStorage) + + def change_number_mock(segment_name): + change_number_mock._count_a += 1 + if change_number_mock._count_a == 1: + return -1 + elif change_number_mock._count_a >= 2 and change_number_mock._count_a <= 3: + return 123 + elif change_number_mock._count_a <= 7: + return 1234 + return 12345 # Return proper cn for CDN Bypass + + change_number_mock._count_a = 0 + storage.get_change_number.side_effect = change_number_mock + + def fetch_segment_mock(segment_name, change_number, fetch_options): + fetch_segment_mock._count_a += 1 + if fetch_segment_mock._count_a == 1: + return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], + 'since': -1, 'till': 123} + elif fetch_segment_mock._count_a == 2: + return {'added': [], 'removed': [], 'since': 123, 'till': 123} + elif fetch_segment_mock._count_a == 3: + return {'added': [], 'removed': [], 'since': 123, 'till': 1234} + elif fetch_segment_mock._count_a >= 4 and fetch_segment_mock._count_a <= 6: + return {'added': [], 'removed': [], 'since': 1234, 'till': 1234} + elif fetch_segment_mock._count_a == 7: + return {'added': [], 'removed': [], 'since': 1234, 'till': 12345} + return {'added': [], 'removed': [], 'since': 12345, 'till': 12345} + fetch_segment_mock._count_a = 0 + + api = mocker.Mock() + api.fetch_segment.side_effect = fetch_segment_mock + + segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) + segments_synchronizer.synchronize_segment('segmentA') + + assert mocker.call('segmentA', -1, FetchOptions(True)) in api.fetch_segment.mock_calls + assert mocker.call('segmentA', 123, FetchOptions(True)) in api.fetch_segment.mock_calls + + segments_synchronizer._backoff = Backoff(1, 0.1) + segments_synchronizer.synchronize_segment('segmentA', 12345) + assert mocker.call('segmentA', 12345, FetchOptions(True, 1234)) in api.fetch_segment.mock_calls + assert len(api.fetch_segment.mock_calls) == 8 # 2 ok + BACKOFF(2 since==till + 2 re-attempts) + CDN(2 since==till) def test_recreate(self, mocker): """Test recreate logic.""" + from splitio.sync.segment import SegmentSynchronizer segments_synchronizer = SegmentSynchronizer(mocker.Mock(), mocker.Mock(), mocker.Mock()) current_pool = segments_synchronizer._worker_pool segments_synchronizer.recreate() diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 2713a0e1..1b05250c 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -1,12 +1,9 @@ """Split Worker tests.""" -from splitio.util.backoff import Backoff -import threading -import time import pytest +from splitio.util.backoff import Backoff from splitio.api import APIException, FetchOptions -from splitio.tasks import split_sync from splitio.storage import SplitStorage from splitio.models.splits import Split @@ -95,8 +92,8 @@ def get_changes(*args, **kwargs): 'till': 123 } get_changes.called = 0 - api.fetch_splits.side_effect = get_changes + from splitio.sync.split import SplitSynchronizer split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer.synchronize_splits() @@ -188,45 +185,20 @@ def change_number_mock(): def get_changes(*args, **kwargs): get_changes.called += 1 - if get_changes.called == 1: - return { - 'splits': splits, - 'since': -1, - 'till': 123 - } + return { 'splits': splits, 'since': -1, 'till': 123 } elif get_changes.called == 2: - return { - 'splits': [], - 'since': 123, - 'till': 123 - } + return { 'splits': [], 'since': 123, 'till': 123 } elif get_changes.called == 3: - return { - 'splits': [], - 'since': 123, - 'till': 1234 - } + return { 'splits': [], 'since': 123, 'till': 1234 } elif get_changes.called >= 4 and get_changes.called <= 6: - return { - 'splits': [], - 'since': 1234, - 'till': 1234 - } + return { 'splits': [], 'since': 1234, 'till': 1234 } elif get_changes.called == 7: - return { - 'splits': [], - 'since': 1234, - 'till': 12345 - } - return { - 'splits': [], - 'since': 12345, - 'till': 12345 - } + return { 'splits': [], 'since': 1234, 'till': 12345 } + return { 'splits': [], 'since': 12345, 'till': 12345 } get_changes.called = 0 - api.fetch_splits.side_effect = get_changes + split_synchronizer = SplitSynchronizer(api, storage) split_synchronizer._backoff = Backoff(1, 1) split_synchronizer.synchronize_splits() diff --git a/tests/tasks/test_segment_sync.py b/tests/tasks/test_segment_sync.py index aaad4673..ebe20465 100644 --- a/tests/tasks/test_segment_sync.py +++ b/tests/tasks/test_segment_sync.py @@ -2,7 +2,7 @@ import threading import time -from splitio.api import APIException +from splitio.api import APIException, FetchOptions from splitio.tasks import segment_sync from splitio.storage import SegmentStorage, SplitStorage from splitio.models.splits import Split @@ -41,7 +41,7 @@ def change_number_mock(segment_name): storage.get_change_number.side_effect = change_number_mock # Setup a mocked segment api to return segments mentioned before. - def fetch_segment_mock(segment_name, change_number): + def fetch_segment_mock(segment_name, change_number, fetch_options): if segment_name == 'segmentA' and fetch_segment_mock._count_a == 0: fetch_segment_mock._count_a = 1 return {'name': 'segmentA', 'added': ['key1', 'key2', 'key3'], 'removed': [], @@ -60,6 +60,7 @@ def fetch_segment_mock(segment_name, change_number): fetch_segment_mock._count_c = 0 api = mocker.Mock() + fetch_options = FetchOptions(True) api.fetch_segment.side_effect = fetch_segment_mock segments_synchronizer = SegmentSynchronizer(api, split_storage, storage) @@ -76,12 +77,12 @@ def fetch_segment_mock(segment_name, change_number): assert not task.is_running() api_calls = [call for call in api.fetch_segment.mock_calls] - assert mocker.call('segmentA', -1) in api_calls - assert mocker.call('segmentB', -1) in api_calls - assert mocker.call('segmentC', -1) in api_calls - assert mocker.call('segmentA', 123) in api_calls - assert mocker.call('segmentB', 123) in api_calls - assert mocker.call('segmentC', 123) in api_calls + assert mocker.call('segmentA', -1, fetch_options) in api_calls + assert mocker.call('segmentB', -1, fetch_options) in api_calls + assert mocker.call('segmentC', -1, fetch_options) in api_calls + assert mocker.call('segmentA', 123, fetch_options) in api_calls + assert mocker.call('segmentB', 123, fetch_options) in api_calls + assert mocker.call('segmentC', 123, fetch_options) in api_calls segment_put_calls = storage.put.mock_calls segments_to_validate = set(['segmentA', 'segmentB', 'segmentC']) From a9e8a8faff982b88aaceaccd093bc4dad48b0127 Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Tue, 13 Jul 2021 15:46:06 -0300 Subject: [PATCH 5/7] fixed logs format --- splitio/sync/segment.py | 6 +++--- splitio/sync/split.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/splitio/sync/segment.py b/splitio/sync/segment.py index 15d89656..02c7a181 100644 --- a/splitio/sync/segment.py +++ b/splitio/sync/segment.py @@ -143,17 +143,17 @@ def synchronize_segment(self, segment_name, till=None): successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, fetch_options, till) attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if successful_sync: # succedeed sync - _LOGGER.debug('Refresh completed in %s attempts.', attempts) + _LOGGER.debug('Refresh completed in %d attempts.', attempts) return with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_segment_sync(segment_name, with_cdn_bypass, till) without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if without_cdn_successful_sync: - _LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.', + _LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', without_cdn_attempts) return else: - _LOGGER.debug('No changes fetched after %s attempts with CDN bypassed.', + _LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', without_cdn_attempts) def synchronize_segments(self): diff --git a/splitio/sync/split.py b/splitio/sync/split.py index ac677b1b..4a8bc671 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -116,17 +116,17 @@ def synchronize_splits(self, till=None): till) attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if successful_sync: # succedeed sync - _LOGGER.debug('Refresh completed in %s attempts.', attempts) + _LOGGER.debug('Refresh completed in %d attempts.', attempts) return with_cdn_bypass = FetchOptions(True, change_number) # Set flag for bypassing CDN without_cdn_successful_sync, remaining_attempts, change_number = self._attempt_split_sync(with_cdn_bypass, till) without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts if without_cdn_successful_sync: - _LOGGER.debug('Refresh completed bypassing the CDN in %s attempts.', + _LOGGER.debug('Refresh completed bypassing the CDN in %d attempts.', without_cdn_attempts) return else: - _LOGGER.debug('No changes fetched after %s attempts with CDN bypassed.', + _LOGGER.debug('No changes fetched after %d attempts with CDN bypassed.', without_cdn_attempts) def kill_split(self, split_name, default_treatment, change_number): From 0700dbcde10827d90c13d8c0602e96fb1f0cc53a Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Wed, 14 Jul 2021 10:12:19 -0300 Subject: [PATCH 6/7] moved data to commons --- splitio/api/__init__.py | 94 ----------------------- splitio/api/auth.py | 3 +- splitio/api/commons.py | 95 ++++++++++++++++++++++++ splitio/api/events.py | 3 +- splitio/api/impressions.py | 3 +- splitio/api/segments.py | 5 +- splitio/api/splits.py | 5 +- splitio/api/telemetry.py | 3 +- splitio/client/input_validator.py | 3 +- splitio/push/splitsse.py | 2 +- splitio/sync/segment.py | 3 +- splitio/sync/split.py | 3 +- tests/api/test_auth.py | 1 + tests/api/test_segments_api.py | 4 +- tests/api/test_splits_api.py | 4 +- tests/api/test_util.py | 3 +- tests/sync/test_segments_synchronizer.py | 3 +- tests/sync/test_splits_synchronizer.py | 3 +- tests/tasks/test_segment_sync.py | 2 +- tests/tasks/test_split_sync.py | 3 +- 20 files changed, 131 insertions(+), 114 deletions(-) create mode 100644 splitio/api/commons.py diff --git a/splitio/api/__init__.py b/splitio/api/__init__.py index fe698d23..33f1e588 100644 --- a/splitio/api/__init__.py +++ b/splitio/api/__init__.py @@ -1,10 +1,6 @@ """Split API module.""" -_CACHE_CONTROL = 'Cache-Control' -_CACHE_CONTROL_NO_CACHE = 'no-cache' - - class APIException(Exception): """Exception to raise when an API call fails.""" @@ -17,93 +13,3 @@ def __init__(self, custom_message, status_code=None): def status_code(self): """Return HTTP status code.""" return self._status_code - - -def headers_from_metadata(sdk_metadata, client_key=None): - """ - Generate a dict with headers required by data-recording API endpoints. - - :param sdk_metadata: SDK Metadata object, generated at sdk initialization time. - :type sdk_metadata: splitio.client.util.SdkMetadata - - :param client_key: client key. - :type client_key: str - - :return: A dictionary with headers. - :rtype: dict - """ - - metadata = { - 'SplitSDKVersion': sdk_metadata.sdk_version, - 'SplitSDKMachineIP': sdk_metadata.instance_ip, - 'SplitSDKMachineName': sdk_metadata.instance_name - } if sdk_metadata.instance_ip != 'NA' and sdk_metadata.instance_ip != 'unknown' else { - 'SplitSDKVersion': sdk_metadata.sdk_version, - } - - if client_key is not None: - metadata['SplitSDKClientKey'] = client_key - - return metadata - - -class FetchOptions(object): - """Fetch Options object.""" - - def __init__(self, cache_control_headers=False, change_number=None): - """ - Class constructor. - - :param cache_control_headers: Flag for Cache-Control header - :type cache_control_headers: bool - - :param change_number: ChangeNumber to use for bypassing CDN in request. - :type change_number: int - """ - self._cache_control_headers = cache_control_headers - self._change_number = change_number - - @property - def cache_control_headers(self): - """Return cache control headers.""" - return self._cache_control_headers - - @property - def change_number(self): - """Return change number.""" - return self._change_number - - def __eq__(self, other): - """Match between other options.""" - if self._cache_control_headers != other._cache_control_headers: - return False - if self._change_number != other._change_number: - return False - return True - - -def build_fetch(change_number, fetch_options, metadata): - """ - Build fetch with new flags if that is the case. - - :param change_number: Last known timestamp of definition. - :type change_number: int - - :param fetch_options: Fetch options for getting definitions. - :type fetch_options: splitio.api.FetchOptions - - :param metadata: Metadata Headers. - :type metadata: dict - - :return: Objects for fetch - :rtype: dict, dict - """ - query = {'since': change_number} - extra_headers = metadata - if fetch_options is None: - return query, extra_headers - if fetch_options.cache_control_headers: - extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE - if fetch_options.change_number is not None: - query['till'] = fetch_options.change_number - return query, extra_headers diff --git a/splitio/api/auth.py b/splitio/api/auth.py index 652c241c..0c1a3d12 100644 --- a/splitio/api/auth.py +++ b/splitio/api/auth.py @@ -3,7 +3,8 @@ import logging import json -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata from splitio.api.client import HttpClientException from splitio.models.token import from_raw diff --git a/splitio/api/commons.py b/splitio/api/commons.py new file mode 100644 index 00000000..53019427 --- /dev/null +++ b/splitio/api/commons.py @@ -0,0 +1,95 @@ +"""Commons module.""" + + +_CACHE_CONTROL = 'Cache-Control' +_CACHE_CONTROL_NO_CACHE = 'no-cache' + + +def headers_from_metadata(sdk_metadata, client_key=None): + """ + Generate a dict with headers required by data-recording API endpoints. + + :param sdk_metadata: SDK Metadata object, generated at sdk initialization time. + :type sdk_metadata: splitio.client.util.SdkMetadata + + :param client_key: client key. + :type client_key: str + + :return: A dictionary with headers. + :rtype: dict + """ + + metadata = { + 'SplitSDKVersion': sdk_metadata.sdk_version, + 'SplitSDKMachineIP': sdk_metadata.instance_ip, + 'SplitSDKMachineName': sdk_metadata.instance_name + } if sdk_metadata.instance_ip != 'NA' and sdk_metadata.instance_ip != 'unknown' else { + 'SplitSDKVersion': sdk_metadata.sdk_version, + } + + if client_key is not None: + metadata['SplitSDKClientKey'] = client_key + + return metadata + + +class FetchOptions(object): + """Fetch Options object.""" + + def __init__(self, cache_control_headers=False, change_number=None): + """ + Class constructor. + + :param cache_control_headers: Flag for Cache-Control header + :type cache_control_headers: bool + + :param change_number: ChangeNumber to use for bypassing CDN in request. + :type change_number: int + """ + self._cache_control_headers = cache_control_headers + self._change_number = change_number + + @property + def cache_control_headers(self): + """Return cache control headers.""" + return self._cache_control_headers + + @property + def change_number(self): + """Return change number.""" + return self._change_number + + def __eq__(self, other): + """Match between other options.""" + if self._cache_control_headers != other._cache_control_headers: + return False + if self._change_number != other._change_number: + return False + return True + + +def build_fetch(change_number, fetch_options, metadata): + """ + Build fetch with new flags if that is the case. + + :param change_number: Last known timestamp of definition. + :type change_number: int + + :param fetch_options: Fetch options for getting definitions. + :type fetch_options: splitio.api.commons.FetchOptions + + :param metadata: Metadata Headers. + :type metadata: dict + + :return: Objects for fetch + :rtype: dict, dict + """ + query = {'since': change_number} + extra_headers = metadata + if fetch_options is None: + return query, extra_headers + if fetch_options.cache_control_headers: + extra_headers[_CACHE_CONTROL] = _CACHE_CONTROL_NO_CACHE + if fetch_options.change_number is not None: + query['till'] = fetch_options.change_number + return query, extra_headers diff --git a/splitio/api/events.py b/splitio/api/events.py index 84b0f52c..b8ddda36 100644 --- a/splitio/api/events.py +++ b/splitio/api/events.py @@ -1,8 +1,9 @@ """Events API module.""" import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException from splitio.api.client import HttpClientException +from splitio.api.commons import headers_from_metadata _LOGGER = logging.getLogger(__name__) diff --git a/splitio/api/impressions.py b/splitio/api/impressions.py index ffc1452c..02206a1e 100644 --- a/splitio/api/impressions.py +++ b/splitio/api/impressions.py @@ -3,8 +3,9 @@ import logging from itertools import groupby -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException from splitio.api.client import HttpClientException +from splitio.api.commons import headers_from_metadata from splitio.engine.impressions import ImpressionsMode diff --git a/splitio/api/segments.py b/splitio/api/segments.py index 591ab838..ebc65a7e 100644 --- a/splitio/api/segments.py +++ b/splitio/api/segments.py @@ -3,7 +3,8 @@ import json import logging -from splitio.api import APIException, headers_from_metadata, build_fetch +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata, build_fetch from splitio.api.client import HttpClientException @@ -40,7 +41,7 @@ def fetch_segment(self, segment_name, change_number, fetch_options): :type change_number: int :param fetch_options: Fetch options for getting segment definitions. - :type fetch_options: splitio.api.FetchOptions + :type fetch_options: splitio.api.commons.FetchOptions :return: Json representation of a segmentChange response. :rtype: dict diff --git a/splitio/api/splits.py b/splitio/api/splits.py index 83378691..e395d454 100644 --- a/splitio/api/splits.py +++ b/splitio/api/splits.py @@ -3,7 +3,8 @@ import logging import json -from splitio.api import APIException, headers_from_metadata, build_fetch +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata, build_fetch from splitio.api.client import HttpClientException @@ -36,7 +37,7 @@ def fetch_splits(self, change_number, fetch_options): :type change_number: int :param fetch_options: Fetch options for getting split definitions. - :type fetch_options: splitio.api.FetchOptions + :type fetch_options: splitio.api.commons.FetchOptions :return: Json representation of a splitChanges response. :rtype: dict diff --git a/splitio/api/telemetry.py b/splitio/api/telemetry.py index db4ae682..23ad4644 100644 --- a/splitio/api/telemetry.py +++ b/splitio/api/telemetry.py @@ -1,7 +1,8 @@ """Telemetry API Module.""" import logging -from splitio.api import APIException, headers_from_metadata +from splitio.api import APIException +from splitio.api.commons import headers_from_metadata from splitio.api.client import HttpClientException diff --git a/splitio/client/input_validator.py b/splitio/client/input_validator.py index 3b1ef228..2ca42e1f 100644 --- a/splitio/client/input_validator.py +++ b/splitio/client/input_validator.py @@ -4,7 +4,8 @@ import re import math -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.client.key import Key from splitio.engine.evaluator import CONTROL diff --git a/splitio/push/splitsse.py b/splitio/push/splitsse.py index e80e1549..f16a317f 100644 --- a/splitio/push/splitsse.py +++ b/splitio/push/splitsse.py @@ -4,7 +4,7 @@ from enum import Enum from splitio.push.sse import SSEClient, SSE_EVENT_ERROR from splitio.util.threadutil import EventGroup -from splitio.api import headers_from_metadata +from splitio.api.commons import headers_from_metadata _LOGGER = logging.getLogger(__name__) diff --git a/splitio/sync/segment.py b/splitio/sync/segment.py index 02c7a181..37e453bf 100644 --- a/splitio/sync/segment.py +++ b/splitio/sync/segment.py @@ -1,7 +1,8 @@ import logging import time -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.tasks.util import workerpool from splitio.models import segments from splitio.util.backoff import Backoff diff --git a/splitio/sync/split.py b/splitio/sync/split.py index 4a8bc671..5331f556 100644 --- a/splitio/sync/split.py +++ b/splitio/sync/split.py @@ -5,7 +5,8 @@ import yaml import time -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.models import splits from splitio.util.backoff import Backoff diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 5f145a21..52213128 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -1,6 +1,7 @@ """Split API tests module.""" import pytest + from splitio.api import auth, client, APIException from splitio.client.util import get_metadata from splitio.client.config import DEFAULT_CONFIG diff --git a/tests/api/test_segments_api.py b/tests/api/test_segments_api.py index ed21c0d2..1998469a 100644 --- a/tests/api/test_segments_api.py +++ b/tests/api/test_segments_api.py @@ -1,7 +1,9 @@ """Segment API tests module.""" import pytest -from splitio.api import segments, client, APIException, FetchOptions + +from splitio.api import segments, client, APIException +from splitio.api.commons import FetchOptions from splitio.client.util import SdkMetadata diff --git a/tests/api/test_splits_api.py b/tests/api/test_splits_api.py index ac96e12a..5e914712 100644 --- a/tests/api/test_splits_api.py +++ b/tests/api/test_splits_api.py @@ -1,7 +1,9 @@ """Split API tests module.""" import pytest -from splitio.api import splits, client, APIException, FetchOptions + +from splitio.api import splits, client, APIException +from splitio.api.commons import FetchOptions from splitio.client.util import SdkMetadata diff --git a/tests/api/test_util.py b/tests/api/test_util.py index a3c7a20b..c245c157 100644 --- a/tests/api/test_util.py +++ b/tests/api/test_util.py @@ -2,8 +2,7 @@ import pytest -from splitio.api import headers_from_metadata - +from splitio.api.commons import headers_from_metadata from splitio.client.util import SdkMetadata diff --git a/tests/sync/test_segments_synchronizer.py b/tests/sync/test_segments_synchronizer.py index 2ff84205..1b4c9539 100644 --- a/tests/sync/test_segments_synchronizer.py +++ b/tests/sync/test_segments_synchronizer.py @@ -1,7 +1,8 @@ """Split Worker tests.""" from splitio.util.backoff import Backoff -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.storage import SplitStorage, SegmentStorage from splitio.models.segments import Segment diff --git a/tests/sync/test_splits_synchronizer.py b/tests/sync/test_splits_synchronizer.py index 1b05250c..3b295d5b 100644 --- a/tests/sync/test_splits_synchronizer.py +++ b/tests/sync/test_splits_synchronizer.py @@ -3,7 +3,8 @@ import pytest from splitio.util.backoff import Backoff -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.storage import SplitStorage from splitio.models.splits import Split diff --git a/tests/tasks/test_segment_sync.py b/tests/tasks/test_segment_sync.py index ebe20465..91482a40 100644 --- a/tests/tasks/test_segment_sync.py +++ b/tests/tasks/test_segment_sync.py @@ -2,7 +2,7 @@ import threading import time -from splitio.api import APIException, FetchOptions +from splitio.api.commons import FetchOptions from splitio.tasks import segment_sync from splitio.storage import SegmentStorage, SplitStorage from splitio.models.splits import Split diff --git a/tests/tasks/test_split_sync.py b/tests/tasks/test_split_sync.py index dc58bc56..adc90724 100644 --- a/tests/tasks/test_split_sync.py +++ b/tests/tasks/test_split_sync.py @@ -2,7 +2,8 @@ import threading import time -from splitio.api import APIException, FetchOptions +from splitio.api import APIException +from splitio.api.commons import FetchOptions from splitio.tasks import split_sync from splitio.storage import SplitStorage from splitio.models.splits import Split From 843227a46bd05fc07df7cda7bbdde8030957beee Mon Sep 17 00:00:00 2001 From: Matias Melograno Date: Thu, 15 Jul 2021 13:29:23 -0300 Subject: [PATCH 7/7] prep release --- CHANGES.txt | 2 +- splitio/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8086ad3f..26899c33 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,4 @@ -9.1.0 (Jul XX, 2021) +9.1.0 (Jul 15, 2021) - Added Cache-Control header for on-demand requests to sdk-server. - Updated the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced. diff --git a/splitio/version.py b/splitio/version.py index a8c6e862..8347f6c6 100644 --- a/splitio/version.py +++ b/splitio/version.py @@ -1 +1 @@ -__version__ = '9.1.0-rc1' +__version__ = '9.1.0'