Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[submodule "providers/openfeature-provider-flagd/test-harness"]
path = providers/openfeature-provider-flagd/openfeature/test-harness
url = git@github.com:open-feature/flagd-testbed.git
branch = v2.2.0
branch = v2.5.0
[submodule "providers/openfeature-provider-flagd/spec"]
path = providers/openfeature-provider-flagd/openfeature/spec
url = https://github.com/open-feature/spec
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import warnings

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.flag_evaluation import FlagResolutionDetails
from openfeature.hook import Hook
from openfeature.provider import AbstractProvider
from openfeature.provider.metadata import Metadata

from .config import CacheType, Config, ResolverType
from .resolvers import AbstractResolver, GrpcResolver, InProcessResolver
from .sync_metadata_hook import SyncMetadataHook

T = typing.TypeVar("T")

Expand Down Expand Up @@ -96,8 +99,16 @@ def __init__( # noqa: PLR0913
max_cache_size=max_cache_size,
cert_path=cert_path,
)
self.enriched_context: dict = {}

self.resolver = self.setup_resolver()
self.hooks: list[Hook] = [SyncMetadataHook(self.get_enriched_context)]

def get_enriched_context(self) -> EvaluationContext:
return EvaluationContext(attributes=self.enriched_context)

def get_provider_hooks(self) -> list[Hook]:
return self.hooks

def setup_resolver(self) -> AbstractResolver:
if self.config.resolver == ResolverType.RPC:
Expand All @@ -114,7 +125,7 @@ def setup_resolver(self) -> AbstractResolver:
):
return InProcessResolver(
self.config,
self.emit_provider_ready,
self.emit_provider_ready_with_context,
self.emit_provider_error,
self.emit_provider_stale,
self.emit_provider_configuration_changed,
Expand Down Expand Up @@ -184,3 +195,10 @@ def resolve_object_details(
return self.resolver.resolve_object_details(
key, default_value, evaluation_context
)

def emit_provider_ready_with_context(
self, details: ProviderEventDetails, context: dict
) -> None:
self.enriched_context = context
self.emit_provider_ready(details)
pass
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def monitor(self) -> None:

def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if new_state == ChannelConnectivity.READY:
if (
new_state == grpc.ChannelConnectivity.READY
or new_state == grpc.ChannelConnectivity.IDLE
):
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.listen,
Expand Down Expand Up @@ -276,7 +279,7 @@ def _resolve( # noqa: PLR0915 C901
return cached_flag

context = self._convert_context(evaluation_context)
call_args = {"timeout": self.deadline}
call_args = {"timeout": self.deadline, "wait_for_ready": True}
try:
request: Message
if flag_type == FlagType.BOOLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class InProcessResolver:
def __init__(
self,
config: Config,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
emit_provider_configuration_changed: typing.Callable[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
self,
config: Config,
flag_store: FlagStore,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
):
if config.offline_flag_source_path is None:
Expand Down Expand Up @@ -94,7 +94,8 @@ def _load_data(self, modified_time: typing.Optional[float] = None) -> None:
self.emit_provider_ready(
ProviderEventDetails(
message="Reloading file contents recovered from error state"
)
),
{},
)
self.should_emit_ready_on_success = False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing

import grpc
from google.protobuf.json_format import MessageToDict

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
Expand All @@ -26,7 +27,7 @@ def __init__(
self,
config: Config,
flag_store: FlagStore,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_ready: typing.Callable[[ProviderEventDetails, dict], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
):
Expand Down Expand Up @@ -106,7 +107,10 @@ def monitor(self) -> None:

def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if new_state == grpc.ChannelConnectivity.READY:
if (
new_state == grpc.ChannelConnectivity.READY
or new_state == grpc.ChannelConnectivity.IDLE
):
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.listen,
Expand Down Expand Up @@ -157,6 +161,12 @@ def listen(self) -> None:

while self.active:
try:
context_values_request = sync_pb2.GetMetadataRequest()
context_values_response: sync_pb2.GetMetadataResponse = (
self.stub.GetMetadata(context_values_request, wait_for_ready=True)
)
context_values = MessageToDict(context_values_response)

request = sync_pb2.SyncFlagsRequest(**request_args)

logger.debug("Setting up gRPC sync flags connection")
Expand All @@ -173,7 +183,8 @@ def listen(self) -> None:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
),
context_values["metadata"],
)
self.connected = True

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import typing

from openfeature.evaluation_context import EvaluationContext
from openfeature.hook import Hook, HookContext, HookHints


class SyncMetadataHook(Hook):
def __init__(self, context_supplier: typing.Callable[[], EvaluationContext]):
self.context_supplier = context_supplier

def before(
self, hook_context: HookContext, hints: HookHints
) -> typing.Optional[EvaluationContext]:
return self.context_supplier()
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"~sync",
"~caching",
"~grace",
"~contextEnrichment",
}


Expand Down