Skip to content
This repository has been archived by the owner on May 13, 2023. It is now read-only.

fix!: stream replaces the correct row #82

Merged
merged 4 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions example/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ Future<void> main() async {
// stream
final streamSubscription = client
.from('countries')
.stream()
.stream(['id'])
.order('name')
.limit(10)
.execute()
.listen((snapshot) {
print('snapshot: $snapshot');
});
print('snapshot: $snapshot');
});

// remember to remove subscription
streamSubscription.cancel();
Expand Down
15 changes: 10 additions & 5 deletions lib/src/supabase_query_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {

/// Notifies of data at the queried table
///
/// [uniqueColumns] can be either the primary key or a combination of unique columns.
///
/// ```dart
/// supabase.from('chats').stream().execute().listen(_onChatsReceived);
/// supabase.from('chats').stream(['my_primary_key']).execute().listen(_onChatsReceived);
/// ```
///
/// `eq`, `order`, `limit` filter are available to limit the data being queried.
///
/// ```dart
/// supabase.from('chats:room_id=eq.123').stream().order('created_at').limit(20).execute().listen(_onChatsReceived);
/// supabase.from('chats:room_id=eq.123').stream(['my_primary_key']).order('created_at').limit(20).execute().listen(_onChatsReceived);
/// ```
///
SupabaseStreamBuilder stream() {
return SupabaseStreamBuilder(this, streamFilter: _streamFilter);
SupabaseStreamBuilder stream(List<String> uniqueColumns) {
return SupabaseStreamBuilder(
this,
streamFilter: _streamFilter,
uniqueColumns: uniqueColumns,
);
}
}
9 changes: 8 additions & 1 deletion lib/src/supabase_realtime_payload.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class SupabaseRealtimePayload {
final Map<String, dynamic>? oldRecord;

/// List of columns that are set as primary key
@Deprecated(
"The new RLS real-time server no longer sends the required data",
)
final List<String> primaryKeys;

SupabaseRealtimePayload({
Expand All @@ -24,7 +27,10 @@ class SupabaseRealtimePayload {
required this.table,
required this.newRecord,
required this.oldRecord,
required this.primaryKeys,
@Deprecated(
"The new RLS real-time server no longer sends the required data",
)
required this.primaryKeys,
});

factory SupabaseRealtimePayload.fromJson(Map<String, dynamic> json) {
Expand Down Expand Up @@ -60,6 +66,7 @@ class SupabaseRealtimePayload {
eventType: eventType,
newRecord: newRecord,
oldRecord: oldRecord,
// ignore: deprecated_member_use_from_same_package
primaryKeys: primaryKeys,
);
}
Expand Down
23 changes: 11 additions & 12 deletions lib/src/supabase_stream_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ class SupabaseStreamBuilder {
/// `eq` filter used for both postgrest and realtime
late final StreamPostgrestFilter? _streamFilter;

/// Used to identify which row has changed
final List<String> _uniqueColumns;

SupabaseStreamBuilder(
SupabaseQueryBuilder queryBuilder, {
required StreamPostgrestFilter? streamFilter,
required List<String> uniqueColumns,
}) : _queryBuilder = queryBuilder,
_streamFilter = streamFilter;
_streamFilter = streamFilter,
_uniqueColumns = uniqueColumns;

/// Which column to order by and whether it's ascending
_Order? _orderBy;
Expand All @@ -57,7 +62,7 @@ class SupabaseStreamBuilder {
/// When `ascending` value is true, the result will be in ascending order.
///
/// ```dart
/// supabase.from('users').stream().order('username', ascending: false);
/// supabase.from('users').stream(['id']).order('username', ascending: false);
/// ```
SupabaseStreamBuilder order(String column, {bool ascending = false}) {
_orderBy = _Order(column: column, ascending: ascending);
Expand All @@ -67,7 +72,7 @@ class SupabaseStreamBuilder {
/// Limits the result with the specified `count`.
///
/// ```dart
/// supabase.from('users').stream().limit(10);
/// supabase.from('users').stream(['id']).limit(10);
/// ```
SupabaseStreamBuilder limit(int count) {
_limit = count;
Expand Down Expand Up @@ -145,7 +150,7 @@ class SupabaseStreamBuilder {
_addStream();
}

static bool _isTargetRecord({
bool _isTargetRecord({
required Map<String, dynamic> record,
required SupabaseRealtimePayload payload,
}) {
Expand All @@ -155,14 +160,8 @@ class SupabaseStreamBuilder {
} else if (payload.eventType == 'DELETE') {
targetRecord = payload.oldRecord!;
}

bool isTarget = true;
for (final primaryKey in payload.primaryKeys) {
if (record[primaryKey] != targetRecord[primaryKey]) {
isTarget = false;
}
}
return isTarget;
return _uniqueColumns
.every((column) => record[column] == targetRecord[column]);
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
}

void _sortData() {
Expand Down
14 changes: 6 additions & 8 deletions test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,16 @@ void main() {
'type': 'INSERT',
'columns': [
{
'flags': ['key'],
'name': 'id',
'type': 'int4',
'type_modifier': 4294967295
'type_modifier': 4294967295,
},
{
'flags': [],
'name': 'task',
'type': 'text',
'type_modifier': 4294967295
},
{
'flags': [],
'name': 'status',
'type': 'bool',
'type_modifier': 4294967295
Expand Down Expand Up @@ -139,7 +136,7 @@ void main() {
});

test('stream() emits data', () {
final stream = client.from('todos').stream().execute();
final stream = client.from('todos').stream(['id']).execute();
expect(
stream,
emitsInOrder([
Expand All @@ -157,7 +154,7 @@ void main() {
});

test('Can filter stream results with eq', () {
final stream = client.from('todos:status=eq.true').stream().execute();
final stream = client.from('todos:status=eq.true').stream(['id']).execute();
expect(
stream,
emitsInOrder([
Expand All @@ -173,7 +170,7 @@ void main() {
});

test('stream() with order', () {
final stream = client.from('todos').stream().order('id').execute();
final stream = client.from('todos').stream(['id']).order('id').execute();
expect(
stream,
emitsInOrder([
Expand All @@ -191,7 +188,8 @@ void main() {
});

test('stream() with limit', () {
final stream = client.from('todos').stream().order('id').limit(2).execute();
final stream =
client.from('todos').stream(['id']).order('id').limit(2).execute();
expect(
stream,
emitsInOrder([
Expand Down