Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ class RealtimeChannel {
}
}

bool _shouldEnablePresence() {
return (_bindings['presence']?.isNotEmpty == true) ||
(params['config']['presence']['enabled'] == true);
}

void _handlePresenceUpdate() {
if (joinedOnce && isJoined) {
final currentPresenceEnabled = params['config']['presence']['enabled'];
final shouldEnablePresence = _shouldEnablePresence();

if (!currentPresenceEnabled && shouldEnablePresence) {
final config = Map<String, dynamic>.from(params['config']);
config['presence'] = Map<String, dynamic>.from(config['presence']);
config['presence']['enabled'] = true;
params['config'] = config;
updateJoinPayload({'config': config});
rejoin();
}
}
}

/// Subscribes to receive real-time changes
///
/// Pass a [callback] to react to different status changes.
Expand All @@ -130,10 +151,12 @@ class RealtimeChannel {
if (callback != null) callback(RealtimeSubscribeStatus.closed, null);
});

final presenceEnabled = _shouldEnablePresence();

final accessTokenPayload = <String, String>{};
final config = <String, dynamic>{
'broadcast': broadcast,
'presence': presence,
'presence': {...presence, 'enabled': presenceEnabled},
'postgres_changes':
_bindings['postgres_changes']?.map((r) => r.filter).toList() ?? [],
'private': isPrivate == true,
Expand Down Expand Up @@ -348,7 +371,7 @@ class RealtimeChannel {
RealtimeChannel onPresenceSync(
void Function(RealtimePresenceSyncPayload payload) callback,
) {
return onEvents(
final result = onEvents(
'presence',
ChannelFilter(
event: PresenceEvent.sync.name,
Expand All @@ -358,6 +381,8 @@ class RealtimeChannel {
Map<String, dynamic>.from(payload)));
},
);
_handlePresenceUpdate();
return result;
}

/// Sets up a listener for realtime presence join event.
Expand All @@ -374,7 +399,7 @@ class RealtimeChannel {
RealtimeChannel onPresenceJoin(
void Function(RealtimePresenceJoinPayload payload) callback,
) {
return onEvents(
final result = onEvents(
'presence',
ChannelFilter(
event: PresenceEvent.join.name,
Expand All @@ -384,6 +409,8 @@ class RealtimeChannel {
Map<String, dynamic>.from(payload)));
},
);
_handlePresenceUpdate();
return result;
}

/// Sets up a listener for realtime presence leave event.
Expand All @@ -400,7 +427,7 @@ class RealtimeChannel {
RealtimeChannel onPresenceLeave(
void Function(RealtimePresenceLeavePayload payload) callback,
) {
return onEvents(
final result = onEvents(
'presence',
ChannelFilter(
event: PresenceEvent.leave.name,
Expand All @@ -410,6 +437,8 @@ class RealtimeChannel {
Map<String, dynamic>.from(payload)));
},
);
_handlePresenceUpdate();
return result;
}

/// Sets up a listener for realtime system events for debugging purposes.
Expand Down
5 changes: 5 additions & 0 deletions packages/realtime_client/lib/src/types.dart
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,17 @@ class RealtimeChannelConfig {
/// [key] option is used to track presence payload across clients
final String key;

/// Enables presence even without presence bindings
final bool enabled;

/// Defines if the channel is private or not and if RLS policies will be used to check data
final bool private;

const RealtimeChannelConfig({
this.ack = false,
this.self = false,
this.key = '',
this.enabled = false,
this.private = false,
});

Expand All @@ -167,6 +171,7 @@ class RealtimeChannelConfig {
},
'presence': {
'key': key,
'enabled': enabled,
},
'private': private,
}
Expand Down
234 changes: 232 additions & 2 deletions packages/realtime_client/test/channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''},
'presence': {'key': '', 'enabled': false},
'private': false,
}
});
Expand All @@ -54,7 +54,7 @@ void main() {
expect(joinPush.payload, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''},
'presence': {'key': '', 'enabled': false},
'private': true,
},
});
Expand Down Expand Up @@ -386,4 +386,234 @@ void main() {
expect(leaveCalled, isTrue);
});
});

group('presence enabled', () {
setUp(() {
socket = RealtimeClient('', timeout: const Duration(milliseconds: 1234));
});

test(
'should enable presence when config.presence.enabled is true even without bindings',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(enabled: true),
);

channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isTrue);
});

test('should enable presence when presence listeners exist', () {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.onPresenceSync((payload) {});
channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isTrue);
});

test(
'should enable presence when both bindings exist and config.presence.enabled is true',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(enabled: true),
);

channel.onPresenceSync((payload) {});
channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isTrue);
});

test(
'should not enable presence when neither bindings exist nor config.presence.enabled is true',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isFalse);
});

test('should enable presence when join listener exists', () {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.onPresenceJoin((payload) {});
channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isTrue);
});

test('should enable presence when leave listener exists', () {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.onPresenceLeave((payload) {});
channel.subscribe();

final joinPayload = channel.joinPush.payload;
expect(joinPayload['config']['presence']['enabled'], isTrue);
});
});

group('presence resubscription', () {
setUp(() {
socket = RealtimeClient('', timeout: const Duration(milliseconds: 1234));
});

test(
'should resubscribe when presence callback added to subscribed channel without initial presence',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});
expect(channel.params['config']['presence']['enabled'], isFalse);

channel.onPresenceSync((payload) {});

expect(channel.params['config']['presence']['enabled'], isTrue);
});

test(
'should not resubscribe when presence callback added to channel with existing presence',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(enabled: true),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});
final initialPayload = Map.from(channel.params);

channel.onPresenceSync((payload) {});

expect(channel.params['config']['presence']['enabled'], isTrue);
expect(channel.params, equals(initialPayload));
});

test('should only resubscribe once when multiple presence callbacks added',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});
expect(channel.params['config']['presence']['enabled'], isFalse);

channel.onPresenceSync((payload) {});
expect(channel.params['config']['presence']['enabled'], isTrue);

final payloadAfterFirst = Map.from(channel.params);

channel.onPresenceJoin((payload) {});
channel.onPresenceLeave((payload) {});

expect(channel.params, equals(payloadAfterFirst));
});

test(
'should not resubscribe when presence callback added to unsubscribed channel',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

expect(channel.joinedOnce, isFalse);

channel.onPresenceSync((payload) {});

expect(channel.params['config']['presence']['enabled'], isFalse);
});

test(
'should receive presence events after resubscription triggered by adding callback',
() {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});

bool syncCalled = false;
channel.onPresenceSync((payload) {
syncCalled = true;
});

channel.trigger('presence', {'event': 'sync'}, '1');

expect(syncCalled, isTrue);
});

test('should handle presence join callback resubscription', () {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});
expect(channel.params['config']['presence']['enabled'], isFalse);

channel.onPresenceJoin((payload) {});

expect(channel.params['config']['presence']['enabled'], isTrue);
});

test('should handle presence leave callback resubscription', () {
channel = RealtimeChannel(
'topic',
socket,
params: const RealtimeChannelConfig(),
);

channel.subscribe();
channel.joinPush.trigger('ok', {});
expect(channel.params['config']['presence']['enabled'], isFalse);

channel.onPresenceLeave((payload) {});

expect(channel.params['config']['presence']['enabled'], isTrue);
});
});
}
2 changes: 1 addition & 1 deletion packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ void main() {
expect(channel.params, {
'config': {
'broadcast': {'ack': false, 'self': false},
'presence': {'key': ''},
'presence': {'key': '', 'enabled': false},
'private': false,
}
});
Expand Down
Loading