Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
8.3.2 (Dec XX, 2020)
- Added RecordStats for supporting pipelined recording in redis when treatment call is made.
- Added hooks support for UWSGI.

8.3.1 (Nov 20, 2020)
- Fixed error handling when split server fails, so that it doesn't bring streaming down.
Expand Down
9 changes: 9 additions & 0 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def _make_evaluation(self, key, feature, attributes, method_name, metric_name):
if self.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible")
return CONTROL, None
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return CONTROL, None

start = int(round(time.time() * 1000))

Expand Down Expand Up @@ -143,6 +146,9 @@ def _make_evaluations(self, key, features, attributes, method_name, metric_name)
if self.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible")
return input_validator.generate_control_treatments(features, method_name)
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return input_validator.generate_control_treatments(features, method_name)

start = int(round(time.time() * 1000))

Expand Down Expand Up @@ -363,6 +369,9 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
if self.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible")
return False
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return False

key = input_validator.validate_track_key(key)
event_type = input_validator.validate_event_type(event_type)
Expand Down
3 changes: 2 additions & 1 deletion splitio/client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
'redisMaxConnections': None,
'machineName': None,
'machineIp': None,
'splitFile': os.path.join(os.path.expanduser('~'), '.split')
'splitFile': os.path.join(os.path.expanduser('~'), '.split'),
'preforkedInitialization': False,
}


Expand Down
70 changes: 64 additions & 6 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Status(Enum):
NOT_INITIALIZED = 'NOT_INITIALIZED'
READY = 'READY'
DESTROYED = 'DESTROYED'
WAITING_FORK = 'WAITING_FORK'


class TimeoutException(Exception):
Expand All @@ -90,6 +91,7 @@ def __init__( # pylint: disable=too-many-arguments
recorder,
sync_manager=None,
sdk_ready_flag=None,
preforked_initialization=False,
):
"""
Class constructor.
Expand All @@ -106,18 +108,29 @@ def __init__( # pylint: disable=too-many-arguments
:type sdk_ready_flag: threading.Event
:param recorder: StatsRecorder instance
:type recorder: StatsRecorder
:param preforked_initialization: Whether should be instantiated as preforked or not.
:type preforked_initialization: bool
"""
self._apikey = apikey
self._storages = storages
self._labels_enabled = labels_enabled
self._sync_manager = sync_manager
self._sdk_internal_ready_flag = sdk_ready_flag
self._sdk_ready_flag = threading.Event()
self._recorder = recorder
self._preforked_initialization = preforked_initialization
self._start_status_updater()

def _start_status_updater(self):
"""
Perform status updater
"""
if self._preforked_initialization:
self._status = Status.WAITING_FORK
return
# If we have a ready flag, it means we have sync tasks that need to finish
# before the SDK client becomes ready.
if self._sdk_internal_ready_flag is not None:
self._sdk_ready_flag = threading.Event()
self._status = Status.NOT_INITIALIZED
# add a listener that updates the status to READY once the flag is set.
ready_updater = threading.Thread(target=self._update_status_when_ready,
Expand Down Expand Up @@ -232,6 +245,38 @@ def destroyed(self):
"""
return self._status == Status.DESTROYED

def _waiting_fork(self):
"""
Return whether the factory is waiting to be recreated by forking or not.

:return: True if the factory is waiting to be recreated by forking. False otherwise.
:rtype: bool
"""
return self._status == Status.WAITING_FORK

def handle_post_fork(self):
"""
Function in charge of starting periodic/realtime synchronization after a fork.
"""
if not self._waiting_fork():
_LOGGER.warning('Cannot call handle_post_fork')
return
self._sync_manager.recreate()
sdk_ready_flag = threading.Event()
self._sdk_internal_ready_flag = sdk_ready_flag
self._sync_manager._ready_flag = sdk_ready_flag
self._get_storage('telemetry').clear()
self._get_storage('impressions').clear()
self._get_storage('events').clear()
initialization_thread = threading.Thread(
target=self._sync_manager.start,
name="SDKInitializer",
)
initialization_thread.setDaemon(True)
initialization_thread.start()
self._preforked_initialization = False # reset for status updater
self._start_status_updater()


def _wrap_impression_listener(listener, metadata):
"""
Expand Down Expand Up @@ -319,14 +364,12 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl

synchronizer = Synchronizer(synchronizers, tasks)

sdk_ready_flag = threading.Event()
preforked_initialization = cfg.get('preforkedInitialization', False)

sdk_ready_flag = threading.Event() if not preforked_initialization else None
manager = Manager(sdk_ready_flag, synchronizer, apis['auth'], cfg['streamingEnabled'],
streaming_api_base_url)

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
initialization_thread.setDaemon(True)
initialization_thread.start()

storages['events'].set_queue_full_hook(tasks.events_task.flush)
storages['impressions'].set_queue_full_hook(tasks.impressions_task.flush)

Expand All @@ -336,6 +379,17 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
storages['events'],
storages['impressions'],
)

if preforked_initialization:
synchronizer.sync_all()
synchronizer._split_synchronizers._segment_sync.shutdown()
return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, preforked_initialization=preforked_initialization)

initialization_thread = threading.Thread(target=manager.start, name="SDKInitializer")
initialization_thread.setDaemon(True)
initialization_thread.start()

return SplitFactory(api_key, storages, cfg['labelsEnabled'],
recorder, manager, sdk_ready_flag)

Expand Down Expand Up @@ -387,6 +441,10 @@ def _build_uwsgi_factory(api_key, cfg):
storages['events'],
storages['impressions'],
)
_LOGGER.warning(
"Beware: uwsgi-cache based operation mode is soon to be deprecated. Please consider " +
"redis if you need a centralized point of syncrhonization, or in-memory (with preforking " +
"support enabled) if running uwsgi with a master and several http workers)")
return SplitFactory(
api_key,
storages,
Expand Down
12 changes: 12 additions & 0 deletions splitio/client/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass

def clear(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass


class LocalhostEventsStorage(EventStorage):
"""Impression storage that doesn't cache anything."""
Expand All @@ -42,6 +46,10 @@ def pop_many(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass

def clear(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass


class LocalhostTelemetryStorage(TelemetryStorage):
"""Impression storage that doesn't cache anything."""
Expand Down Expand Up @@ -69,3 +77,7 @@ def pop_counters(self, *_, **__): # pylint: disable=arguments-differ
def pop_gauges(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass

def clear(self, *_, **__): # pylint: disable=arguments-differ
"""Accept any arguments and do nothing."""
pass
11 changes: 10 additions & 1 deletion splitio/client/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def split_names(self):
if self._factory.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible.")
return []
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return []

if not self._factory.ready:
_LOGGER.warning(
Expand All @@ -51,6 +54,9 @@ def splits(self):
if self._factory.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible.")
return []
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return []

if not self._factory.ready:
_LOGGER.warning(
Expand All @@ -72,7 +78,10 @@ def split(self, feature_name):
"""
if self._factory.destroyed:
_LOGGER.error("Client has already been destroyed - no calls possible.")
return []
return None
if self._factory._waiting_fork():
_LOGGER.error("Client is not ready - no calls possible")
return None

feature_name = input_validator.validate_manager_feature_name(
feature_name,
Expand Down
21 changes: 21 additions & 0 deletions splitio/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ def pop_many(self, count):
"""
pass

@abc.abstractmethod
def clear(self):
"""
Clear data.
"""
pass


@add_metaclass(abc.ABCMeta)
class ImpressionPipelinedStorage(object):
Expand Down Expand Up @@ -279,6 +286,13 @@ def pop_many(self, count):
"""
pass

@abc.abstractmethod
def clear(self):
"""
Clear data.
"""
pass


@add_metaclass(abc.ABCMeta)
class TelemetryStorage(object):
Expand Down Expand Up @@ -346,6 +360,13 @@ def pop_latencies(self):
"""
pass

@abc.abstractmethod
def clear(self):
"""
Clear data.
"""
pass


@add_metaclass(abc.ABCMeta)
class TelemetryPipelinedStorage(object):
Expand Down
27 changes: 27 additions & 0 deletions splitio/storage/inmemmory.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def __init__(self, queue_size):

:param eventsQueueSize: How many events to queue before forcing a submission
"""
self._queue_size = queue_size
self._impressions = queue.Queue(maxsize=queue_size)
self._lock = threading.Lock()
self._queue_full_hook = None
Expand Down Expand Up @@ -339,6 +340,13 @@ def pop_many(self, count):
count -= 1
return impressions

def clear(self):
"""
Clear data.
"""
with self._lock:
self._impressions = queue.Queue(maxsize=self._queue_size)


class InMemoryEventStorage(EventStorage):
"""
Expand All @@ -353,6 +361,7 @@ def __init__(self, eventsQueueSize):

:param eventsQueueSize: How many events to queue before forcing a submission
"""
self._queue_size = eventsQueueSize
self._lock = threading.Lock()
self._events = queue.Queue(maxsize=eventsQueueSize)
self._queue_full_hook = None
Expand Down Expand Up @@ -407,6 +416,13 @@ def pop_many(self, count):
self._size = 0
return events

def clear(self):
"""
Clear data.
"""
with self._lock:
self._events = queue.Queue(maxsize=self._queue_size)


class InMemoryTelemetryStorage(TelemetryStorage):
"""In-Memory implementation of telemetry storage interface."""
Expand Down Expand Up @@ -498,3 +514,14 @@ def pop_latencies(self):
return self._latencies
finally:
self._latencies = {}

def clear(self):
"""
Clear data.
"""
with self._latencies_lock:
self._latencies = {}
with self._gauges_lock:
self._gauges = {}
with self._counters_lock:
self._counters = {}
18 changes: 18 additions & 0 deletions splitio/storage/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ def pop_many(self, count):
"""
raise NotImplementedError('Only redis-consumer mode is supported.')

def clear(self):
"""
Clear data.
"""
raise NotImplementedError('Not supported for redis.')


class RedisEventsStorage(EventStorage):
"""Redis based event storage class."""
Expand Down Expand Up @@ -517,6 +523,12 @@ def pop_many(self, count):
"""
raise NotImplementedError('Only redis-consumer mode is supported.')

def clear(self):
"""
Clear data.
"""
raise NotImplementedError('Not supported for redis.')


class RedisTelemetryStorage(TelemetryStorage, TelemetryPipelinedStorage):
"""Redis-based Telemetry storage."""
Expand Down Expand Up @@ -694,3 +706,9 @@ def pop_latencies(self):
:rtype: list
"""
raise NotImplementedError('Only redis-consumer mode is supported.')

def clear(self):
"""
Clear data.
"""
raise NotImplementedError('Not supported for redis.')
Loading