Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
- BREAKING CHANGE: Deprecated Python2 support.
- Removed six, future and futures libs for compatibility between Python2 and Python3.
- Updated strings encoding to utf-8 by default for Redis.
- BREAKING CHANGE: Deprecated `redisCharset` config.
- Added SDK Metadata headers to streaming client.

8.4.1 (Apr 16, 2021)
- Bumped mmh3cffi dependency which now requires c99 flag to build.
Expand Down
13 changes: 11 additions & 2 deletions splitio/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@ def status_code(self):
return self._status_code


def headers_from_metadata(sdk_metadata):
def headers_from_metadata(sdk_metadata, client_key=None):
"""
Generate a dict with headers required by data-recording API endpoints.

:param sdk_metadata: SDK Metadata object, generated at sdk initialization time.
:type sdk_metadata: splitio.client.util.SdkMetadata

:param client_key: client key.
:type client_key: str

:return: A dictionary with headers.
:rtype: dict
"""
return {

metadata = {
'SplitSDKVersion': sdk_metadata.sdk_version,
'SplitSDKMachineIP': sdk_metadata.instance_ip,
'SplitSDKMachineName': sdk_metadata.instance_name
} if sdk_metadata.instance_ip != 'NA' and sdk_metadata.instance_ip != 'unknown' else {
'SplitSDKVersion': sdk_metadata.sdk_version,
}

if client_key is not None:
metadata['SplitSDKClientKey'] = client_key

return metadata
1 change: 0 additions & 1 deletion splitio/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
'redisUnixSocketPath': None,
'redisEncoding': 'utf-8',
'redisEncodingErrors': 'strict',
'redisCharset': 'utf-8',
'redisErrors': None,
'redisDecodeResponses': True,
'redisRetryOnTimeout': False,
Expand Down
5 changes: 3 additions & 2 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl

sdk_ready_flag = threading.Event() if not preforked_initialization else None
manager = Manager(sdk_ready_flag, synchronizer, apis['auth'], cfg['streamingEnabled'],
streaming_api_base_url)
sdk_metadata, streaming_api_base_url, api_key[-4:])

storages['events'].set_queue_full_hook(tasks.events_task.flush)
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)
Expand Down Expand Up @@ -439,9 +439,10 @@ def _build_localhost_factory(cfg):
), None, None, None, None, None,
)

sdk_metadata = util.get_metadata(cfg)
ready_event = threading.Event()
synchronizer = LocalhostSynchronizer(synchronizers, tasks)
manager = Manager(ready_event, synchronizer, None, False)
manager = Manager(ready_event, synchronizer, None, False, sdk_metadata)
manager.start()
recorder = StandardRecorder(
ImpressionsManager(cfg['impressionsMode'], True, None),
Expand Down
12 changes: 9 additions & 3 deletions splitio/push/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class PushManager(object): # pylint:disable=too-many-instance-attributes
"""Push notifications susbsytem manager."""

def __init__(self, auth_api, synchronizer, feedback_loop, sse_url=None):
def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, sse_url=None, client_key=None):
"""
Class constructor.

Expand All @@ -33,8 +33,14 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sse_url=None):
:param feedback_loop: queue where push status updates are published.
:type feedback_loop: queue.Queue

:param sdk_metadata: SDK version & machine name & IP.
:type sdk_metadata: splitio.client.util.SdkMetadata

:param sse_url: streaming base url.
:type sse_url: str

:param client_key: client key.
:type client_key: str
"""
self._auth_api = auth_api
self._feedback_loop = feedback_loop
Expand All @@ -52,8 +58,8 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sse_url=None):
}

kwargs = {} if sse_url is None else {'base_url': sse_url}
self._sse_client = SplitSSEClient(self._event_handler, self._handle_connection_ready,
self._handle_connection_end, **kwargs)
self._sse_client = SplitSSEClient(self._event_handler, sdk_metadata, self._handle_connection_ready,
self._handle_connection_end, client_key, **kwargs)
self._running = False
self._next_refresh = Timer(0, lambda: 0)

Expand Down
16 changes: 13 additions & 3 deletions splitio/push/splitsse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from enum import Enum
from splitio.push.sse import SSEClient, SSE_EVENT_ERROR
from splitio.util.threadutil import EventGroup
from splitio.api import headers_from_metadata


_LOGGER = logging.getLogger(__name__)
Expand All @@ -20,14 +21,18 @@ class _Status(Enum):
ERRORED = 2
CONNECTED = 3

def __init__(self, event_callback, first_event_callback=None,
connection_closed_callback=None, base_url='https://streaming.split.io'):
def __init__(self, event_callback, sdk_metadata, first_event_callback=None,
connection_closed_callback=None, client_key=None,
base_url='https://streaming.split.io'):
"""
Construct a split sse client.

:param callback: fuction to call when an event is received.
:type callback: callable

:param sdk_metadata: SDK version & machine name & IP.
:type sdk_metadata: splitio.client.util.SdkMetadata

:param first_event_callback: function to call when the first event is received.
:type first_event_callback: callable

Expand All @@ -36,6 +41,9 @@ def __init__(self, event_callback, first_event_callback=None,

:param base_url: scheme + :// + host
:type base_url: str

:param client_key: client key.
:type client_key: str
"""
self._client = SSEClient(self._raw_event_handler)
self._callback = event_callback
Expand All @@ -45,6 +53,7 @@ def __init__(self, event_callback, first_event_callback=None,
self._status = SplitSSEClient._Status.IDLE
self._sse_first_event = None
self._sse_connection_closed = None
self._metadata = headers_from_metadata(sdk_metadata, client_key)

def _raw_event_handler(self, event):
"""
Expand Down Expand Up @@ -117,7 +126,8 @@ def start(self, token):
def connect(url):
"""Connect to sse in a blocking manner."""
try:
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT)
self._client.start(url, timeout=self.KEEPALIVE_TIMEOUT,
extra_headers=self._metadata)
finally:
self._status = SplitSSEClient._Status.IDLE
self._sse_connection_closed.set()
Expand Down
2 changes: 0 additions & 2 deletions splitio/storage/adapters/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ def _build_default_client(config): # pylint: disable=too-many-locals
unix_socket_path = config.get('redisUnixSocketPath', None)
encoding = config.get('redisEncoding', 'utf-8')
encoding_errors = config.get('redisEncodingErrors', 'strict')
charset = config.get('redisCharset', 'utf-8')
errors = config.get('redisErrors', None)
decode_responses = config.get('redisDecodeResponses', True)
retry_on_timeout = config.get('redisRetryOnTimeout', False)
Expand All @@ -378,7 +377,6 @@ def _build_default_client(config): # pylint: disable=too-many-locals
unix_socket_path=unix_socket_path,
encoding=encoding,
encoding_errors=encoding_errors,
charset=charset,
errors=errors,
decode_responses=decode_responses,
retry_on_timeout=retry_on_timeout,
Expand Down
13 changes: 11 additions & 2 deletions splitio/sync/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Manager(object): # pylint:disable=too-many-instance-attributes

_CENTINEL_EVENT = object()

def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_url=None): # pylint:disable=too-many-arguments
def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sdk_metadata, sse_url=None, client_key=None): # pylint:disable=too-many-arguments
"""
Construct Manager.

Expand All @@ -29,8 +29,17 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_ur
:param auth_api: Authentication api client
:type auth_api: splitio.api.auth.AuthAPI

:param sdk_metadata: SDK version & machine name & IP.
:type sdk_metadata: splitio.client.util.SdkMetadata

:param streaming_enabled: whether to use streaming or not
:type streaming_enabled: bool

:param sse_url: streaming base url.
:type sse_url: str

:param client_key: client key.
:type client_key: str
"""
self._streaming_enabled = streaming_enabled
self._ready_flag = ready_flag
Expand All @@ -39,7 +48,7 @@ def __init__(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_ur
self._push_status_handler_active = True
self._backoff = Backoff()
self._queue = Queue()
self._push = PushManager(auth_api, synchronizer, self._queue, sse_url)
self._push = PushManager(auth_api, synchronizer, self._queue, sdk_metadata, sse_url, client_key)
self._push_status_handler = Thread(target=self._streaming_feedback_handler,
name='PushStatusHandler')
self._push_status_handler.setDaemon(True)
Expand Down
2 changes: 1 addition & 1 deletion splitio/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '9.0.0-all'
__version__ = '9.0.0-rc1'
39 changes: 39 additions & 0 deletions tests/api/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Split API tests module."""

import pytest

from splitio.api import headers_from_metadata

from splitio.client.util import SdkMetadata


class UtilTests(object):
"""Util test cases."""

def test_headers_from_metadata(self, mocker):
"""Test headers from metadata call."""
metadata = headers_from_metadata(SdkMetadata('1.0', 'some', '1.2.3.4'))
assert metadata['SplitSDKVersion'] == '1.0'
assert metadata['SplitSDKMachineIP'] == '1.2.3.4'
assert metadata['SplitSDKMachineName'] == 'some'
assert 'SplitSDKClientKey' not in metadata

metadata = headers_from_metadata(SdkMetadata('1.0', 'some', '1.2.3.4'), 'abcd')
assert metadata['SplitSDKVersion'] == '1.0'
assert metadata['SplitSDKMachineIP'] == '1.2.3.4'
assert metadata['SplitSDKMachineName'] == 'some'
assert metadata['SplitSDKClientKey'] == 'abcd'

metadata = headers_from_metadata(SdkMetadata('1.0', 'some', 'NA'))
assert metadata['SplitSDKVersion'] == '1.0'
assert 'SplitSDKMachineIP' not in metadata
assert 'SplitSDKMachineName' not in metadata
assert 'SplitSDKClientKey' not in metadata

metadata = headers_from_metadata(SdkMetadata('1.0', 'some', 'unknown'))
assert metadata['SplitSDKVersion'] == '1.0'
assert 'SplitSDKMachineIP' not in metadata
assert 'SplitSDKMachineName' not in metadata
assert 'SplitSDKClientKey' not in metadata


7 changes: 3 additions & 4 deletions tests/client/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_inmemory_client_creation_streaming_false(self, mocker):
"""Test that a client with in-memory storage is created correctly."""

# Setup synchronizer
def _split_synchronizer(self, ready_flag, synchronizer, auth_api, streaming_enabled, sse_url=None):
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, sse_url=None, client_key=None):
synchronizer = mocker.Mock(spec=Synchronizer)
synchronizer.sync_all.return_values = None
self._ready_flag = ready_flag
Expand Down Expand Up @@ -120,7 +120,6 @@ def test_redis_client_creation(self, mocker):
unix_socket_path='/some_path',
encoding='utf-8',
encoding_errors='non-strict',
charset='utf-8',
errors=True,
decode_responses=True,
retry_on_timeout=True,
Expand Down Expand Up @@ -233,7 +232,7 @@ def _imppression_count_task_init_mock(self, synchronize_counters):
imp_count_async_task_mock)

# Setup synchronizer
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None):
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, sse_url=None, client_key=None):
synchronizer = Synchronizer(syncs, tasks)
self._ready_flag = ready_flag
self._synchronizer = synchronizer
Expand Down Expand Up @@ -327,7 +326,7 @@ def _imppression_count_task_init_mock(self, synchronize_counters):
imp_count_async_task_mock)

# Setup synchronizer
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sse_url=None):
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, sse_url=None, client_key=None):
synchronizer = Synchronizer(syncs, tasks)
self._ready_flag = ready_flag
self._synchronizer = synchronizer
Expand Down
24 changes: 14 additions & 10 deletions tests/push/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
#pylint:disable=no-self-use,protected-access
from threading import Thread
from queue import Queue

from splitio.api.auth import APIException
from splitio.push.sse import SSEEvent

from splitio.models.token import Token

from splitio.push.sse import SSEEvent
from splitio.push.parser import parse_incoming_event, EventType, ControlType, ControlMessage, \
OccupancyMessage, SplitChangeUpdate, SplitKillUpdate, SegmentChangeUpdate
from splitio.push.processor import MessageProcessor
from splitio.push.status_tracker import PushStatusTracker
from splitio.push.manager import PushManager, _TOKEN_REFRESH_GRACE_PERIOD
from splitio.push.splitsse import SplitSSEClient
from splitio.push.status_tracker import Status

from tests.helpers import Any


Expand All @@ -30,7 +34,7 @@ def test_connection_success(self, mocker):
mocker.patch('splitio.push.manager.Timer', new=timer_mock)
mocker.patch('splitio.push.manager.SplitSSEClient', new=sse_constructor_mock)
feedback_loop = Queue()
manager = PushManager(api_mock, mocker.Mock(), feedback_loop)
manager = PushManager(api_mock, mocker.Mock(), feedback_loop, mocker.Mock())

def new_start(*args, **kwargs): # pylint: disable=unused-argument
"""splitsse.start mock."""
Expand Down Expand Up @@ -63,7 +67,7 @@ def test_connection_failure(self, mocker):
mocker.patch('splitio.push.manager.Timer', new=timer_mock)
mocker.patch('splitio.push.manager.SplitSSEClient', new=sse_constructor_mock)
feedback_loop = Queue()
manager = PushManager(api_mock, mocker.Mock(), feedback_loop)
manager = PushManager(api_mock, mocker.Mock(), feedback_loop, mocker.Mock())

def new_start(*args, **kwargs): # pylint: disable=unused-argument
"""splitsse.start mock."""
Expand All @@ -90,7 +94,7 @@ def test_push_disabled(self, mocker):
mocker.patch('splitio.push.manager.Timer', new=timer_mock)
mocker.patch('splitio.push.manager.SplitSSEClient', new=sse_constructor_mock)
feedback_loop = Queue()
manager = PushManager(api_mock, mocker.Mock(), feedback_loop)
manager = PushManager(api_mock, mocker.Mock(), feedback_loop, mocker.Mock())
manager.start()
assert feedback_loop.get() == Status.PUSH_NONRETRYABLE_ERROR
assert timer_mock.mock_calls == [mocker.call(0, Any())]
Expand All @@ -109,7 +113,7 @@ def test_auth_apiexception(self, mocker):
mocker.patch('splitio.push.manager.SplitSSEClient', new=sse_constructor_mock)

feedback_loop = Queue()
manager = PushManager(api_mock, mocker.Mock(), feedback_loop)
manager = PushManager(api_mock, mocker.Mock(), feedback_loop, mocker.Mock())
manager.start()
assert feedback_loop.get() == Status.PUSH_RETRYABLE_ERROR
assert timer_mock.mock_calls == [mocker.call(0, Any())]
Expand All @@ -126,7 +130,7 @@ def test_split_change(self, mocker):
processor_mock = mocker.Mock(spec=MessageProcessor)
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)

manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock())
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
manager._event_handler(sse_event)
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
assert processor_mock.mock_calls == [
Expand All @@ -145,7 +149,7 @@ def test_split_kill(self, mocker):
processor_mock = mocker.Mock(spec=MessageProcessor)
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)

manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock())
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
manager._event_handler(sse_event)
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
assert processor_mock.mock_calls == [
Expand All @@ -164,7 +168,7 @@ def test_segment_change(self, mocker):
processor_mock = mocker.Mock(spec=MessageProcessor)
mocker.patch('splitio.push.manager.MessageProcessor', new=processor_mock)

manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock())
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
manager._event_handler(sse_event)
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
assert processor_mock.mock_calls == [
Expand All @@ -183,7 +187,7 @@ def test_control_message(self, mocker):
status_tracker_mock = mocker.Mock(spec=PushStatusTracker)
mocker.patch('splitio.push.manager.PushStatusTracker', new=status_tracker_mock)

manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock())
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
manager._event_handler(sse_event)
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
assert status_tracker_mock.mock_calls == [
Expand All @@ -202,7 +206,7 @@ def test_occupancy_message(self, mocker):
status_tracker_mock = mocker.Mock(spec=PushStatusTracker)
mocker.patch('splitio.push.manager.PushStatusTracker', new=status_tracker_mock)

manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock())
manager = PushManager(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock())
manager._event_handler(sse_event)
assert parse_event_mock.mock_calls == [mocker.call(sse_event)]
assert status_tracker_mock.mock_calls == [
Expand Down
Loading