Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mixpanel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .flags.remote_feature_flags import RemoteFeatureFlagsProvider
from .flags.types import LocalFlagsConfig, RemoteFlagsConfig

__version__ = '5.0.0b2'
__version__ = '5.0.0'

logger = logging.getLogger(__name__)

Expand Down
84 changes: 58 additions & 26 deletions mixpanel/flags/local_feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def start_polling_for_definitions(self):
)
self._sync_polling_task.start()
else:
logging.warning("A polling task is already running")
logger.warning("A polling task is already running")

def stop_polling_for_definitions(self):
"""
Expand All @@ -90,7 +90,7 @@ def stop_polling_for_definitions(self):
self._sync_stop_event.set()
self._sync_polling_task = None
else:
logging.info("There is no polling task to cancel.")
logger.info("There is no polling task to cancel.")

async def astart_polling_for_definitions(self):
"""
Expand All @@ -105,7 +105,7 @@ async def astart_polling_for_definitions(self):
self._astart_continuous_polling()
)
else:
logging.error("A polling task is already running")
logger.error("A polling task is already running")

async def astop_polling_for_definitions(self):
"""
Expand All @@ -115,21 +115,21 @@ async def astop_polling_for_definitions(self):
self._async_polling_task.cancel()
self._async_polling_task = None
else:
logging.info("There is no polling task to cancel.")
logger.info("There is no polling task to cancel.")

async def _astart_continuous_polling(self):
logging.info(
logger.info(
f"Initialized async polling for flag definition updates every '{self._config.polling_interval_in_seconds}' seconds"
)
try:
while True:
await asyncio.sleep(self._config.polling_interval_in_seconds)
await self._afetch_flag_definitions()
except asyncio.CancelledError:
logging.info("Async polling was cancelled")
logger.info("Async polling was cancelled")

def _start_continuous_polling(self):
logging.info(
logger.info(
f"Initialized sync polling for flag definition updates every '{self._config.polling_interval_in_seconds}' seconds"
)
while not self._sync_stop_event.is_set():
Expand All @@ -146,6 +146,22 @@ def are_flags_ready(self) -> bool:
"""
return self._are_flags_ready

def get_all_variants(self, context: Dict[str, Any]) -> Dict[str, SelectedVariant]:
"""
Gets the selected variant for all feature flags that the current user context is in the rollout for.
Exposure events are not automatically tracked when this method is used.
:param Dict[str, Any] context: The user context to evaluate against the feature flags
"""
variants: Dict[str, SelectedVariant] = {}
fallback = SelectedVariant(variant_key=None, variant_value=None)

for flag_key in self._flag_definitions.keys():
variant = self.get_variant(flag_key, fallback, context, report_exposure=False)
if variant.variant_key is not None:
variants[flag_key] = variant

return variants

def get_variant_value(
self, flag_key: str, fallback_value: Any, context: Dict[str, Any]
) -> Any:
Expand Down Expand Up @@ -206,16 +222,28 @@ def get_variant(
flag_definition, context_value, flag_key, rollout
)

if report_exposure and selected_variant is not None:
end_time = time.perf_counter()
self._track_exposure(flag_key, selected_variant, end_time - start_time, context)
if selected_variant is not None:
if report_exposure:
end_time = time.perf_counter()
self._track_exposure(flag_key, selected_variant, context, end_time - start_time)
return selected_variant

logger.info(
logger.debug(
f"{flag_definition.context} context {context_value} not eligible for any rollout for flag: {flag_key}"
)
return fallback_value

def track_exposure_event(self, flag_key: str, variant: SelectedVariant, context: Dict[str, Any]):
"""
Manually tracks a feature flagging exposure event to Mixpanel.
This is intended to provide flexibility for when individual exposure events are reported when using `get_all_variants` for the user at once with exposure event reporting

:param str flag_key: The key of the feature flag
:param SelectedVariant variant: The selected variant for the feature flag
:param Dict[str, Any] context: The user context used to evaluate the feature flag
"""
self._track_exposure(flag_key, variant, context)

def _get_variant_override_for_test_user(
self, flag_definition: ExperimentationFlag, context: Dict[str, Any]
) -> Optional[SelectedVariant]:
Expand Down Expand Up @@ -244,10 +272,9 @@ def _get_assigned_variant(
):
return variant


hash_input = str(context_value) + flag_name

variant_hash = normalized_hash(hash_input, "variant")
stored_salt = flag_definition.hash_salt if flag_definition.hash_salt is not None else ""
salt = flag_name + stored_salt + "variant"
variant_hash = normalized_hash(str(context_value), salt)

variants = [variant.model_copy(deep=True) for variant in flag_definition.ruleset.variants]
if rollout.variant_splits:
Expand Down Expand Up @@ -275,13 +302,16 @@ def _get_assigned_rollout(
context_value: Any,
context: Dict[str, Any],
) -> Optional[Rollout]:
hash_input = str(context_value) + flag_definition.key
for index, rollout in enumerate(flag_definition.ruleset.rollout):
salt = None
if flag_definition.hash_salt is not None:
salt = flag_definition.key + flag_definition.hash_salt + str(index)
else:
salt = flag_definition.key + "rollout"

rollout_hash = normalized_hash(hash_input, "rollout")
rollout_hash = normalized_hash(str(context_value), salt)

for rollout in flag_definition.ruleset.rollout:
if (
rollout_hash < rollout.rollout_percentage
if (rollout_hash < rollout.rollout_percentage
and self._is_runtime_evaluation_satisfied(rollout, context)
):
return rollout
Expand Down Expand Up @@ -352,7 +382,7 @@ def _handle_response(
self, response: httpx.Response, start_time: datetime, end_time: datetime
) -> None:
request_duration: timedelta = end_time - start_time
logging.info(
logger.debug(
f"Request started at '{start_time.isoformat()}', completed at '{end_time.isoformat()}', duration: '{request_duration.total_seconds():.3f}s'"
)

Expand All @@ -378,24 +408,26 @@ def _track_exposure(
self,
flag_key: str,
variant: SelectedVariant,
latency_in_seconds: float,
context: Dict[str, Any],
latency_in_seconds: Optional[float]=None,
):
if distinct_id := context.get("distinct_id"):
properties = {
"Experiment name": flag_key,
"Variant name": variant.variant_key,
"$experiment_type": "feature_flag",
"Flag evaluation mode": "local",
"Variant fetch latency (ms)": latency_in_seconds * 1000,
"$experiment_id": variant.experiment_id,
"$is_experiment_active": variant.is_experiment_active,
"$is_qa_tester": variant.is_qa_tester,
}

if latency_in_seconds is not None:
properties["Variant fetch latency (ms)"] = latency_in_seconds * 1000

self._tracker(distinct_id, EXPOSURE_EVENT, properties)
else:
logging.error(
logger.error(
"Cannot track exposure event without a distinct_id in the context"
)

Expand All @@ -406,11 +438,11 @@ def __enter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting the LocalFeatureFlagsProvider and cleaning up resources")
logger.info("Exiting the LocalFeatureFlagsProvider and cleaning up resources")
await self.astop_polling_for_definitions()
await self._async_client.aclose()

def __exit__(self, exc_type, exc_val, exc_tb):
logging.info("Exiting the LocalFeatureFlagsProvider and cleaning up resources")
logger.info("Exiting the LocalFeatureFlagsProvider and cleaning up resources")
self.stop_polling_for_definitions()
self._sync_client.close()
Loading