Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions packages/common_client/lib/src/data_sources/fdv2/endpoints.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// FDv2 endpoint paths.
///
/// These paths are uniform across mobile and browser SDKs; FDv2 does
/// not distinguish between platforms at the endpoint level.
abstract final class FDv2Endpoints {
/// Polling path. Used as-is for POST requests (context sent in the
/// request body) and as the prefix for GET requests via [pollingGet].
static const String polling = '/sdk/poll/eval';

/// Streaming path. Used as-is for POST requests (context sent in the
/// request body) and as the prefix for GET requests via [streamingGet].
static const String streaming = '/sdk/stream/eval';

/// Builds the polling GET path with the base64url-encoded context
/// embedded in the URL path.
static String pollingGet(String encodedContext) => '$polling/$encodedContext';

/// Builds the streaming GET path with the base64url-encoded context
/// embedded in the URL path.
static String streamingGet(String encodedContext) =>
'$streaming/$encodedContext';
}
181 changes: 181 additions & 0 deletions packages/common_client/lib/src/data_sources/fdv2/polling_base.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import 'dart:convert';

import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';

import 'flag_eval_mapper.dart';
import 'payload.dart';
import 'protocol_handler.dart';
import 'protocol_types.dart';
import 'requestor.dart';
import 'selector.dart';
import 'source_result.dart';

/// Performs a single FDv2 poll and translates the response into an
/// [FDv2SourceResult].
///
/// Wraps an [FDv2Requestor] with FDv2 protocol semantics:
///
/// - Network errors --> [SourceState.interrupted] with a sanitized
/// message; the full error detail is logged at warn level.
/// - `x-ld-fd-fallback: true` header --> terminal error with
/// `fdv1Fallback: true`. This check takes precedence over the body
/// and over the status code: if the server signals fallback, the
/// SDK switches to FDv1 regardless of whether a `200`, `304`, or
/// error response carries the header.
/// - HTTP `304 Not Modified` --> an empty change set with
/// [PayloadType.none], confirming the cached data is current.
/// - Other 4xx/5xx --> interrupted (recoverable) or terminalError
/// (non-recoverable) based on [isHttpGloballyRecoverable].
/// - `200` --> body is parsed as an [FDv2EventsCollection] and fed
/// through an [FDv2ProtocolHandler]. The first emitted action
/// determines the result.
final class FDv2PollingBase {
final LDLogger _logger;
final FDv2Requestor _requestor;
final DateTime Function() _now;

FDv2PollingBase({
required LDLogger logger,
required FDv2Requestor requestor,
DateTime Function()? now,
}) : _logger = logger.subLogger('FDv2PollingBase'),
_requestor = requestor,
_now = now ?? DateTime.now;

/// Performs a single poll. Never throws; all failures, including
/// malformed response bodies, are reported as [StatusResult]s.
Future<FDv2SourceResult> pollOnce({Selector basis = Selector.empty}) async {
final RequestorResponse response;
try {
response = await _requestor.request(basis: basis);
} catch (err) {
_logger.warn('Polling request failed: $err');
return FDv2SourceResults.interrupted(message: _describeError(err));
}
return _processResponse(response);
}

FDv2SourceResult _processResponse(RequestorResponse response) {
// Match `x-ld-fd-fallback` case-insensitively. Servers shouldn't send
// mixed case, but it costs nothing to be lenient on input.
final fdv1Fallback =
response.headers['x-ld-fd-fallback']?.toLowerCase() == 'true';
final environmentId = response.headers['x-ld-envid'];

if (fdv1Fallback) {
return FDv2SourceResults.terminalError(
statusCode: response.status,
message: 'Server requested FDv1 fallback',
fdv1Fallback: true,
);
}

// 304 Not Modified means the SDK's cached data is confirmed current.
if (response.status == 304) {
return ChangeSetResult(
payload: const Payload(type: PayloadType.none, updates: []),
environmentId: environmentId,
freshness: _now(),
persist: true,
);
}

if (response.status >= 400) {
final message = 'Received unexpected status code: ${response.status}';
if (isHttpGloballyRecoverable(response.status)) {
_logger.warn('$message; will retry');
return FDv2SourceResults.interrupted(
statusCode: response.status,
message: message,
);
}
_logger.error('$message; will not retry');
return FDv2SourceResults.terminalError(
statusCode: response.status,
message: message,
);
}

return _parseBody(response, environmentId: environmentId);
}

FDv2SourceResult _parseBody(
RequestorResponse response, {
String? environmentId,
}) {
// The whole parse path is wrapped: jsonDecode plus the structural
// casts inside FDv2EventsCollection.fromJson and the per-event
// PutObjectEvent/DeleteObjectEvent/PayloadIntent/etc. fromJson calls
// can all throw on shapes the protocol types don't accept.
try {
final decoded = jsonDecode(response.body);
if (decoded is! Map<String, dynamic>) {
return FDv2SourceResults.interrupted(
statusCode: response.status,
message: 'Polling response was not a JSON object',
);
}

final collection = FDv2EventsCollection.fromJson(decoded);
final handler = FDv2ProtocolHandler(
objProcessors: {flagEvalKind: processFlagEval},
logger: _logger,
);

for (final event in collection.events) {
final action = handler.processEvent(event);
switch (action) {
case ActionPayload(:final payload):
return ChangeSetResult(
payload: payload,
environmentId: environmentId,
freshness: _now(),
persist: true,
);
case ActionGoodbye(:final reason):
return FDv2SourceResults.goodbyeResult(message: reason);
case ActionServerError(:final reason):
return FDv2SourceResults.interrupted(message: reason);
case ActionError(:final message):
return FDv2SourceResults.interrupted(message: message);
case ActionNone():
// Continue accumulating events until a payload-transferred or
// terminal action is reached.
break;
}
}

// The response had no payload-transferred event. The protocol
// handler is left in a partial state with nothing to emit, which
// is a protocol violation for a polling response.
return FDv2SourceResults.interrupted(
statusCode: response.status,
message: 'Polling response did not include a complete payload',
);
} catch (err) {
_logger.error('Failed to parse polling response: $err');
return FDv2SourceResults.interrupted(
statusCode: response.status,
message: 'Polling response body was malformed',
);
}
}

/// Categorizes a network exception into a fixed, sanitized message.
/// The full exception (which for `SocketException` / `TlsException`
/// can include remote address, certificate detail, and OS error
/// strings) stays in the warn log, not in the public status surface.
String _describeError(Object err) {
final type = err.runtimeType.toString();
if (type.contains('Timeout')) {
return 'Polling request timed out';
}
if (type.contains('Tls') || type.contains('Handshake')) {
return 'TLS error during polling request';
}
if (type.contains('Socket') || type.contains('HttpException')) {
return 'Network error during polling request';
}
return 'Polling request failed';
}
}
133 changes: 133 additions & 0 deletions packages/common_client/lib/src/data_sources/fdv2/requestor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import 'package:launchdarkly_dart_common/launchdarkly_dart_common.dart';

import 'endpoints.dart';
import 'selector.dart';

typedef HttpClientFactory = HttpClient Function(HttpProperties httpProperties);

HttpClient _defaultHttpClientFactory(HttpProperties httpProperties) {
return HttpClient(httpProperties: httpProperties);
}

/// The shape of a completed HTTP response from the FDv2 polling endpoint.
typedef RequestorResponse = ({
int status,
Map<String, String> headers,
String body,
});

/// Issues a single HTTP poll against the FDv2 polling endpoint.
///
/// Pure HTTP layer: builds the URL, sends the request, tracks `ETag`
/// across calls on the same instance, and returns the raw response. It
/// does no FDv2 protocol parsing or error classification -- that is the
/// responsibility of the caller (see [FDv2PollingBase]).
///
/// One [FDv2Requestor] is bound to a single evaluation context. Switching
/// contexts requires a fresh instance so a previous context's `ETag`
/// can never leak into a request for a different context.
///
/// Calls to [request] are not safe to interleave on a single instance --
/// `ETag` tracking assumes serial requests. Callers (the polling
/// synchronizer) must wait for each [request] to complete before issuing
/// the next.
final class FDv2Requestor {
final LDLogger _logger;
final HttpClient _client;
final Uri _baseUri;
final String _contextEncoded;
final String _contextJson;
final bool _usePost;
final bool _withReasons;
String? _lastEtag;

FDv2Requestor({
required LDLogger logger,
required ServiceEndpoints endpoints,
required String contextEncoded,
required String contextJson,
required bool usePost,
required bool withReasons,
required HttpProperties httpProperties,
HttpClientFactory httpClientFactory = _defaultHttpClientFactory,
}) : _logger = logger.subLogger('FDv2Requestor'),
_baseUri = Uri.parse(endpoints.polling),
_contextEncoded = contextEncoded,
_contextJson = contextJson,
_usePost = usePost,
_withReasons = withReasons,
_client = httpClientFactory(usePost
? httpProperties.withHeaders({'content-type': 'application/json'})
: httpProperties);

/// Sends a single poll request, optionally including a [basis] selector
/// for delta updates. Throws on network errors; otherwise returns the
/// response. Tracks `ETag` across successful (`200`) responses on this
/// instance.
Future<RequestorResponse> request({Selector basis = Selector.empty}) async {
final uri = _buildUri(basis: basis);
final method = _usePost ? RequestMethod.post : RequestMethod.get;
final additionalHeaders = <String, String>{};
if (_lastEtag != null) {
additionalHeaders['if-none-match'] = _lastEtag!;
}

// Avoid logging the full URI -- in GET mode it embeds the
// base64url-encoded context, which is reversible PII.
_logger.debug('FDv2 poll: method=$method, hasEtag=${_lastEtag != null}, '
'hasBasis=${basis.isNotEmpty}');

final response = await _client.request(
method,
uri,
additionalHeaders: additionalHeaders.isEmpty ? null : additionalHeaders,
body: _usePost ? _contextJson : null,
);

// Only persist the ETag from a successful response. Non-200 responses
// could carry stale or hostile ETag values that would taint future
// conditional requests. A 304 confirms the existing ETag still matches,
// so leaving the stored value alone is correct.
if (response.statusCode == 200) {
final etag = response.headers['etag'];
if (etag != null) {
_lastEtag = etag;
}
}

return (
status: response.statusCode,
headers: response.headers,
body: response.body,
);
}

Uri _buildUri({required Selector basis}) {
final addedPath = _usePost
? FDv2Endpoints.polling
: FDv2Endpoints.pollingGet(_contextEncoded);

// Compose against the parsed base URI so a custom polling URL
// carrying its own query parameters (e.g. a relay proxy with a token)
// is preserved correctly. String concatenation against `_baseUri`
// would land the appended path inside the query component.
final basePath = _baseUri.path.endsWith('/')
? _baseUri.path.substring(0, _baseUri.path.length - 1)
: _baseUri.path;
final mergedPath = '$basePath$addedPath';

final mergedQuery = <String, String>{};
mergedQuery.addAll(_baseUri.queryParameters);
if (_withReasons) {
mergedQuery['withReasons'] = 'true';
}
if (basis.isNotEmpty && basis.state!.isNotEmpty) {
mergedQuery['basis'] = basis.state!;
}

return _baseUri.replace(
path: mergedPath,
queryParameters: mergedQuery.isEmpty ? null : mergedQuery,
);
}
}
44 changes: 44 additions & 0 deletions packages/common_client/lib/src/data_sources/fdv2/source.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import 'selector.dart';
import 'source_result.dart';

/// A function that returns the current selector for a data source.
///
/// The orchestrator owns the SDK's current selector. Sources read it
/// lazily on each request or reconnect via this getter, so they always
/// see the latest value across mode switches and recoveries.
typedef SelectorGetter = Selector Function();

/// A function that performs a single FDv2 poll and returns the result.
///
/// Used by streaming sources to handle legacy `ping` events: when a ping
/// is received, the streaming source invokes the ping handler to fetch
/// the current payload via polling.
typedef PingHandler = Future<FDv2SourceResult> Function();

/// A one-shot data source that produces a single result.
///
/// Used during initialization to bring the SDK into a usable state from
/// cache, polling, or a streaming connection's first payload.
abstract interface class Initializer {
/// Runs the initializer, producing a single result. If [close] is called
/// before a result is produced, the returned future completes with a
/// shutdown [StatusResult].
Future<FDv2SourceResult> run();

/// Cancels in-progress work. Idempotent.
void close();
}

/// A long-lived data source that produces a stream of results.
///
/// Used during steady-state operation to keep the SDK current via polling
/// or streaming.
abstract interface class Synchronizer {
/// Single-subscription stream of results. Cancelling the subscription
/// stops the synchronizer; starting a new subscription is not supported.
Stream<FDv2SourceResult> get results;

/// Cancels active work. Idempotent. A shutdown [StatusResult] is
/// emitted to any active subscriber before the stream closes.
void close();
}
Loading
Loading