Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ldclient/datasystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
StreamingDataSource,
StreamingDataSourceBuilder
)
from ldclient.impl.integrations.files.file_data_sourcev2 import (
_FileDataSourceV2
)
from ldclient.interfaces import (
DataStoreMode,
FeatureStore,
Expand Down Expand Up @@ -125,6 +128,13 @@ def builder(config: LDConfig) -> StreamingDataSource:
return builder


def file_ds_builder(paths: List[str]) -> Builder[Initializer]:
def builder(_: LDConfig) -> Initializer:
return _FileDataSourceV2(paths)

return builder


def default() -> ConfigBuilder:
"""
Default is LaunchDarkly's recommended flag data acquisition strategy.
Expand Down
5 changes: 3 additions & 2 deletions ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,10 @@ def _run_initializers(self, set_on_ready: Event):
# Apply the basis to the store
self._store.apply(basis.change_set, basis.persist)

# Set ready event
if not set_on_ready.is_set():
# Set ready event if an only if a selector is defined for the changeset
if basis.change_set.selector is not None and basis.change_set.selector.is_defined():
set_on_ready.set()
return
except Exception as e:
log.error("Initializer failed with exception: %s", e)

Expand Down
113 changes: 113 additions & 0 deletions ldclient/testing/impl/datasystem/test_fdv2_datasystem.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# pylint: disable=missing-docstring

import os
import tempfile
from threading import Event
from typing import List

from mock import Mock

from ldclient.config import Config, DataSystemConfig
from ldclient.datasystem import file_ds_builder
from ldclient.impl.datasystem import DataAvailability
from ldclient.impl.datasystem.fdv2 import FDv2
from ldclient.integrations.test_datav2 import TestDataV2
Expand Down Expand Up @@ -432,3 +435,113 @@ def test_fdv2_stays_on_fdv1_after_fallback():
store = fdv2.store
flag = store.get(FEATURES, "fdv1-flag", lambda x: x)
assert flag is not None


def test_fdv2_initializer_should_run_until_success():
"""
Test that FDv2 initializers will run in order until a successful run. Then
the datasystem is expected to transition to run synchronizers.
"""
initial_flag_data = '''
{
"flags": {
"feature-flag": {
"key": "feature-flag",
"version": 0,
"on": false,
"fallthrough": {
"variation": 0
},
"variations": ["off", "on"]
}
}
}
'''
f, path = tempfile.mkstemp(suffix='.json')
try:
os.write(f, initial_flag_data.encode("utf-8"))
os.close(f)

td_initializer = TestDataV2.data_source()
td_initializer.update(td_initializer.flag("feature-flag").on(True))

# We actually do not care what this synchronizer does.
td_synchronizer = TestDataV2.data_source()

data_system_config = DataSystemConfig(
initializers=[file_ds_builder([path]), td_initializer.build_initializer],
primary_synchronizer=td_synchronizer.build_synchronizer,
)

set_on_ready = Event()
synchronizer_ran = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
count = 0

def listener(_: FlagChange):
nonlocal count
count += 1
if count == 3:
synchronizer_ran.set()

fdv2.flag_tracker.add_listener(listener)

fdv2.start(set_on_ready)
assert set_on_ready.wait(1), "Data system did not become ready in time"
assert synchronizer_ran.wait(1), "Data system did not transition to synchronizer"
finally:
os.remove(path)


def test_fdv2_should_finish_initialization_on_first_successful_initializer():
"""
Test that when a FDv2 initializer returns a basis and selector that the rest
of the intializers will be skipped and the client starts synchronizing phase.
"""
initial_flag_data = '''
{
"flags": {
"feature-flag": {
"key": "feature-flag",
"version": 0,
"on": false,
"fallthrough": {
"variation": 0
},
"variations": ["off", "on"]
}
}
}
'''
f, path = tempfile.mkstemp(suffix='.json')
try:
os.write(f, initial_flag_data.encode("utf-8"))
os.close(f)

td_initializer = TestDataV2.data_source()
td_initializer.update(td_initializer.flag("feature-flag").on(True))

# We actually do not care what this synchronizer does.
td_synchronizer = TestDataV2.data_source()

data_system_config = DataSystemConfig(
initializers=[td_initializer.build_initializer, file_ds_builder([path])],
primary_synchronizer=None,
)

set_on_ready = Event()
fdv2 = FDv2(Config(sdk_key="dummy"), data_system_config)
count = 0

def listener(_: FlagChange):
nonlocal count
count += 1

fdv2.flag_tracker.add_listener(listener)

fdv2.start(set_on_ready)
assert set_on_ready.wait(1), "Data system did not become ready in time"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. Is it possible the change handler could run twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So assuming that we don't count the synchronizer, the change handler should only run once as the file data source will be skipped in this instance so if the change handler runs more than once during the initialization phase then it is a problem.

assert count == 1, "Invalid initializer process"
fdv2.stop()
finally:
os.remove(path)