Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dart/lib/leancode_pipe/pipe_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class PipeClient {
for (final topicSub in _registeredTopicSubscriptions)
topicSub.close(),
]);
_registeredTopicSubscriptions.clear();
onClose?.call(error);
})
..on('subscriptionResult', _onSubscriptionResult)
Expand Down Expand Up @@ -120,9 +121,15 @@ class PipeClient {
Topic<N> topic, {
void Function()? onReconnect,
}) async {
final thisTopicSubscription =
var thisTopicSubscription =
_registeredTopicSubscriptions.firstWhereOrNull((e) => e.topic == topic);

// Defensive check: if subscription is closed, remove it and treat as non-existent
if (thisTopicSubscription != null && thisTopicSubscription.isClosed) {
_registeredTopicSubscriptions.remove(thisTopicSubscription);
thisTopicSubscription = null;
}

if (thisTopicSubscription != null) {
final state = thisTopicSubscription.stateSubject.value;
switch (state) {
Expand Down
59 changes: 59 additions & 0 deletions dart/test/leancode_pipe/lean_pipe_client_connection_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,64 @@ void main() {
expect(isSubscriptionCanceled, true);
},
);

test(
'PipeClient allows subscribing to a topic after disconnect and reconnect',
() async {
// Prepare general config
prepareConnect(connection);
prepareTopic(topic);
when(() => connection.state)
.thenAnswer((_) => HubConnectionState.disconnected);

// Connect to catch on subscription result method
final onSubscriptionResultMethod = await captureOnSubscriptionResult(
connect: client.connect,
connection: connection,
);

// Prepare connection to simulate backend sending subscription confirmation
prepareSubscribeToAnswerWithData(
connection: connection,
uuid: uuid,
answerCallback: (subscribeResult) =>
onSubscriptionResultMethod([subscribeResult.toJson()]),
);

// First subscription
final subscription1 = await client.subscribe(topic);
verifySubscribeCalled(connection);
expect(subscription1, isA<PipeSubscription>());

// Simulate connected state before disconnect
when(() => connection.state)
.thenAnswer((_) => HubConnectionState.connected);

// Capture and trigger onClose
final onClose = await captureOnClose(
connect: client.connect,
connection: connection,
);
when(() => connection.stop()).thenAnswer((_) async {
await onClose(null);
});

// Disconnect
await client.disconnect();
verify(() => connection.stop()).called(1);

// Simulate disconnected state after disconnect
when(() => connection.state)
.thenAnswer((_) => HubConnectionState.disconnected);

// Reconnect
await client.connect();

// Subscribe again to the same topic - this should work
final subscription2 = await client.subscribe(topic);
verifySubscribeCalled(connection);
expect(subscription2, isA<PipeSubscription>());
},
);
});
}