Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream loses randomly it's row filter #604

Closed
KirioXX opened this issue Aug 24, 2023 · 7 comments
Closed

Stream loses randomly it's row filter #604

KirioXX opened this issue Aug 24, 2023 · 7 comments
Labels
bug Something isn't working realtime This issue or pull request is related to realtime

Comments

@KirioXX
Copy link

KirioXX commented Aug 24, 2023

Describe the bug
We experience randomly that the stream is losing it's row filter witch causes all events to be returned.
What seams to be related is a exception that we get pretty much at the same time our app starts to behave strange.
I included the stack trace to the "Additional context" section.

To Reproduce
I was not able to reproduce this issue in development but we seen it today in production when a user opened the app up from the background.

Expected behavior
Stream only returns the filtered rows changes.

Screenshots
If applicable, add screenshots to help explain your problem.

Version (please complete the following information):
├── supabase_flutter 1.10.14
│ ├── supabase 1.11.1
│ │ ├── functions_client 1.3.2
│ │ ├── gotrue 1.11.2
│ │ ├── postgrest 1.5.0
│ │ ├── realtime_client 1.2.1
│ │ ├── storage_client 1.5.1

Additional context

data:text/text;charset=utf-8,
{}

When the exception was thrown, this was the stack:
0		SupabaseStreamBuilder._addException (package:supabase/src/supabase_stream_builder.dart:442:0)
1		SupabaseStreamBuilder._getStreamData.<fn> (package:supabase/src/supabase_stream_builder.dart:348:0)
2		RealtimeChannel.subscribe.<fn> (package:realtime_client/src/realtime_channel.dart:202:0)
3		RealtimeChannel.onError.<fn> (package:realtime_client/src/realtime_channel.dart:331:0)
4		RealtimeChannel.trigger (package:realtime_client/src/realtime_channel.dart:569:0)
5		RealtimeClient.onConnMessage.<fn>.<fn> (package:realtime_client/src/realtime_client.dart:316:0)
6		Iterable.forEach (dart:core/iterable.dart:347:0)
7		RealtimeClient.onConnMessage.<fn> (package:realtime_client/src/realtime_client.dart:316:0)
8		new RealtimeClient.<fn> (package:realtime_client/src/realtime_client.dart:116:0)
9		RealtimeClient.onConnMessage (package:realtime_client/src/realtime_client.dart:299:0)
10		RealtimeClient.connect.<fn> (package:realtime_client/src/realtime_client.dart:146:0)
11		_rootRunUnary (dart:async/zone.dart:1407:0)
12		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
13		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
14		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
15		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
16		_ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:123:0)
17		_HandleErrorStream._handleData (dart:async/stream_pipe.dart:253:0)
18		_ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:153:0)
19		_rootRunUnary (dart:async/zone.dart:1407:0)
20		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
21		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
22		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
23		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
24		_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:0)
25		_StreamController._add (dart:async/stream_controller.dart:658:0)
26		_rootRunUnary (dart:async/zone.dart:1407:0)
27		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
28		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
29		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
30		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
31		_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:0)
32		_StreamController._add (dart:async/stream_controller.dart:658:0)
33		_StreamController.add (dart:async/stream_controller.dart:606:0)
34		new _WebSocketImpl._fromSocket.<fn> (dart:_http/websocket_impl.dart:1144:0)
35		_rootRunUnary (dart:async/zone.dart:1407:0)
36		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
37		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
38		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
39		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
40		_SinkTransformerStreamSubscription._add (dart:async/stream_transformers.dart:63:0)
41		_EventSinkWrapper.add (dart:async/stream_transformers.dart:13:0)
42		_WebSocketProtocolTransformer._messageFrameEnd (dart:_http/websocket_impl.dart:332:0)
43		_WebSocketProtocolTransformer.add (dart:_http/websocket_impl.dart:226:0)
44		_SinkTransformerStreamSubscription._handleData (dart:async/stream_transformers.dart:111:0)
45		_rootRunUnary (dart:async/zone.dart:1407:0)
46		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
47		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
48		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
49		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
50		_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:0)
51		_StreamController._add (dart:async/stream_controller.dart:658:0)
52		_StreamController.add (dart:async/stream_controller.dart:606:0)
53		_Socket._onData (dart:io-patch/socket_patch.dart:2445:0)
54		_rootRunUnary (dart:async/zone.dart:1407:0)
55		_CustomZone.runUnary (dart:async/zone.dart:1308:0)
56		_CustomZone.runUnaryGuarded (dart:async/zone.dart:1217:0)
57		_BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:339:0)
58		_BufferingStreamSubscription._add (dart:async/stream_impl.dart:271:0)
59		_SyncStreamControllerDispatch._sendData (dart:async/stream_controller.dart:784:0)
60		_StreamController._add (dart:async/stream_controller.dart:658:0)
61		_StreamController.add (dart:async/stream_controller.dart:606:0)
62		_RawSecureSocket._sendReadEvent (dart:io/secure_socket.dart:1114:0)
63		_rootRun (dart:async/zone.dart:1391:0)
64		_CustomZone.run (dart:async/zone.dart:1301:0)
65		_CustomZone.runGuarded (dart:async/zone.dart:1209:0)
66		_CustomZone.bindCallbackGuarded.<fn> (dart:async/zone.dart:1249:0)
67		_rootRun (dart:async/zone.dart:1399:0)
68		_CustomZone.run (dart:async/zone.dart:1301:0)
69		_CustomZone.bindCallback.<fn> (dart:async/zone.dart:1233:0)
70		Timer._createTimer.<fn> (dart:async-patch/timer_patch.dart:18:0)
71		_Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:0)
72		_Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:0)
73		_RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:189:0)
@KirioXX KirioXX added the bug Something isn't working label Aug 24, 2023
@KirioXX KirioXX changed the title Stream loses randomly it's filter Stream loses randomly it's row filter Aug 24, 2023
@dshukertjr dshukertjr added the realtime This issue or pull request is related to realtime label Aug 25, 2023
@dshukertjr
Copy link
Member

dshukertjr commented Aug 25, 2023

Hi @KirioXX

Would you be able to add the code you are using on client side to listen to realtime streams, and the table definition of the table you are listening to?

Also regarding

We experience randomly that the stream is losing it's row filter witch causes all events to be returned.

How are you observing that all rows are being returned here? Do you have logs that shows rows that doesn't meet the filtering criteria being returned from the stream?

@KirioXX
Copy link
Author

KirioXX commented Aug 25, 2023

Hi @dshukertjr,

Thanks for the quick response.
This is the code that we use to stream job changes:

import 'dart:async';

import 'package:feature_jobs/src/domain/i_jobs_repository.dart';
import 'package:feature_jobs/src/domain/job_card.dart';
import 'package:feature_jobs/src/domain/jobs_failure.dart';
import 'package:feature_process_checks/process_checks.dart';
import 'package:flutter/material.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:hydrated_bloc/hydrated_bloc.dart';
import 'package:package_core/core.dart';
import 'package:package_auth/auth.dart';
import 'package:supabase_flutter/supabase_flutter.dart';

part 'job_cubit.freezed.dart';
part 'job_cubit.g.dart';

part 'job_state.dart';

class JobCubit extends HydratedCubit<JobState> {
  JobCubit({
    required this.repository,
    required this.inProcessChecksRepository,
    required this.authenticationRepository,
  }) : super(JobState());

  final IJobsRepository repository;
  final IInProcessChecksRepository inProcessChecksRepository;
  final AuthenticationRepository authenticationRepository;
  StreamSubscription<List<Map<String, dynamic>>>? _streamSubscription;

  @override
  JobState? fromJson(Map<String, dynamic> json) =>
      json.containsKey('job') ? JobState.fromJson(json['job']) : null;

  @override
  Map<String, dynamic>? toJson(JobState state) => {'job': state.toJson()};

  handleRequest(String jobCardId) async {
    // Only fetch if new job card id passed
    if (jobCardId != state.jobCard?.id) {
      emit(state.copyWith(
        jobCard: null,
        status: JobCubitStatus.loading,
        streamStatus: JobCubitStreamStatus.initial,
      ));
      final response = await repository.getJobCard(jobCardId);
      response.fold(
        (failure) => emit(state.copyWith(
          status: JobCubitStatus.failure,
          errorMessage: failure.message,
        )),
        (jobCard) {
          emit(state.copyWith(
            jobCard: jobCard,
            status: JobCubitStatus.success,
          ));
        },
      );
    }
    // Start the stream allways
    handleStreamStart(jobCardId);
  }

  handleStreamStart(String jobCardId) async {
    _streamSubscription?.cancel();
    await Future.delayed(const Duration(seconds: 1));
    _streamSubscription = Supabase.instance.client
        .from('job_cards')
        .stream(primaryKey: ['job_card_id'])
        .eq('job_card_id', jobCardId)
        .limit(1)
        .listen((data) {
          if (data.isNotEmpty) {
            final newJobCard = JobCard.fromJson(
              data.last,
            );
            // TODO: This is a hack to prevent the stream from updating the job card when the checks are to big
            // the better solution would be to fetch the checks separately
            // supabase issue https://github.com/supabase/supabase-flutter/issues/272
            // trunk issue: https://github.com/teamtrunk/trunk-apps/issues/468#issuecomment-1438202824
            emit(state.copyWith(
              jobCard: state.jobCard == null
                  ? newJobCard.copyWith(
                      workOrder: state.jobCard?.workOrder,
                    )
                  : newJobCard.copyWith(
                      inProcessChecks: (newJobCard.inProcessChecks == null ||
                                  newJobCard.inProcessChecks!.isEmpty) &&
                              state.jobCard?.inProcessChecks != null
                          ? state.jobCard?.inProcessChecks
                          : newJobCard.inProcessChecks,
                      workOrder: state.jobCard?.workOrder,
                    ),
              status: JobCubitStatus.updated,
              streamStatus: JobCubitStreamStatus.open,
            ));
          }
        }, onError: (error, stackTrace) async {
          debugPrintStack(
            stackTrace: stackTrace,
            label: error.toString(),
          );
          super.onError(error, stackTrace);
          // restart the stream
          emit(state.copyWith(streamStatus: JobCubitStreamStatus.error));
          _streamSubscription?.cancel();
          await Future.delayed(const Duration(seconds: 1));
          handleStreamStart(jobCardId);
        });
  }

  handleJobRestart() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response = await repository.restartJob(state.jobCard!, deviceAuthId);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  handleJobDowntime() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response =
        await repository.startJobDowntime(state.jobCard!, deviceAuthId);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  handleJobBreak() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response =
        await repository.startJobBreak(state.jobCard!, deviceAuthId);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  handleJobClockOut() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response = await repository.clockJobOut(state.jobCard!, deviceAuthId);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  handleJobCancel() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    _streamSubscription?.pause();
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response = await repository.cancelJob(state.jobCard!, deviceAuthId);
    response.fold(
      (f) {
        _streamSubscription?.resume();
        emit(
          state.copyWith(
            errorMessage: _mapFailureToMessage(f),
            status: JobCubitStatus.failure,
          ),
        );
      },
      (r) async {
        _streamSubscription?.cancel();
        await Future.delayed(const Duration(seconds: 1));
        emit(
          state.copyWith(
            jobCard: null,
            errorMessage: null,
            status: JobCubitStatus.success,
            streamStatus: JobCubitStreamStatus.initial,
          ),
        );
      },
    );
  }

  handleJobComplete() async {
    emit(state.copyWith(status: JobCubitStatus.jobCompletionInProgress));
    final deviceAuthId = authenticationRepository.currentUser?.id;
    if (deviceAuthId == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response = await repository.completeJob(state.jobCard!, deviceAuthId);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  handleJobPause() async {
    emit(state.copyWith(status: JobCubitStatus.loading));
    if (authenticationRepository.currentUser == null) {
      emit(state.copyWith(
        errorMessage: 'User is not logged in.',
        status: JobCubitStatus.loggedOut,
      ));
      return;
    }
    final response = await repository.pauseJob(state.jobCard!);
    response.fold(
      (f) => emit(
        state.copyWith(
          errorMessage: _mapFailureToMessage(f),
          status: JobCubitStatus.failure,
        ),
      ),
      (r) => emit(
        state.copyWith(
          jobCard: r,
          errorMessage: null,
          status: JobCubitStatus.optimisticUpdated,
        ),
      ),
    );
  }

  void submitNewInProcessCheck(ProcessCheck newProcessCheck) async {
    if (state.jobCard != null) {
      if (!state.inProcessCheckSubmissions.contains(newProcessCheck.order)) {
        emit(
          state.copyWith(
            inProcessCheckSubmissions: [
              ...state.inProcessCheckSubmissions,
              newProcessCheck.order
            ],
          ),
        );
      }
      final response =
          await inProcessChecksRepository.submitInProcessCheckChanged(
        state.jobCard!.id,
        newProcessCheck,
      );
      List<int> newInProcessCheckSubmissions =
          List.from(state.inProcessCheckSubmissions);
      newInProcessCheckSubmissions.remove(newProcessCheck.order);
      response.fold(
        (fail) {
          emit(state.copyWith(
            inProcessCheckSubmissions: newInProcessCheckSubmissions,
            errorMessage: fail.error,
            status: JobCubitStatus.failure,
          ));
        },
        (check) {
          List<ProcessCheck> newChecks =
              List.from(state.jobCard?.inProcessChecks ?? []);
          if (newChecks.isNotEmpty) {
            newChecks = newChecks.map((e) {
              if (e.order == check.order) {
                return check;
              }
              return e;
            }).toList();
          }
          emit(state.copyWith(
            jobCard: state.jobCard!.copyWith(inProcessChecks: newChecks),
            inProcessCheckSubmissions: newInProcessCheckSubmissions,
            errorMessage: null,
            status: JobCubitStatus.success,
          ));
        },
      );
    }
  }

  _mapFailureToMessage(JobsFailure failure) {
    return failure.map(
      unexpected: (f) {
        super.onError({
          "message": f.message,
          "code": f.code,
          "details": f.details,
          "hint": f.hint,
        }, StackTrace.current);
        return MessageConstants.unexpected(code: f.code);
      },
    );
  }
}

and the table defenition:

CREATE TABLE "public"."job_cards" (
    "job_card_id" uuid NOT NULL DEFAULT gen_random_uuid(),
    "operation_id" uuid NOT NULL,
    "work_order_id" uuid NOT NULL,
    "partner_name" text NOT NULL,
    "factory_name" text,
    "zone_name" text,
    "station_name" text,
    "job_card_order" int4 NOT NULL,
    "job_card_current_status" text NOT NULL DEFAULT 'scheduled'::text,
    "job_card_operation_name" text,
    "job_card_takt_time" interval NOT NULL DEFAULT '00:00:00'::interval,
    "job_card_cycle_time" interval NOT NULL DEFAULT '00:00:00'::interval,
    "job_card_downtime" interval NOT NULL DEFAULT '00:00:00'::interval,
    "job_card_overrun_time" interval NOT NULL DEFAULT '00:00:00'::interval,
    "job_card_created_at" timestamptz NOT NULL DEFAULT now(),
    "job_card_started_at" timestamptz,
    "job_card_completed_at" timestamptz,
    "job_card_expected_headcount" int4,
    "job_card_latest_headcount" int4,
    "operation_version" int4 NOT NULL,
    "job_card_average_actual_headcount" float8,
    "worker_id" uuid,
    "job_card_updated_at" timestamptz NOT NULL DEFAULT now(),
    "job_card_in_process_checks" jsonb,
    "device_auth_id" uuid,
    CONSTRAINT "job_cards_job_card_current_status_fkey" FOREIGN KEY ("job_card_current_status") REFERENCES "public"."job_status_types"("job_card_status_type"),
    CONSTRAINT "job_cards_work_order_id_fkey" FOREIGN KEY ("work_order_id") REFERENCES "public"."work_orders"("work_order_id") ON DELETE CASCADE,
    CONSTRAINT "job_cards_partner_name_factory_name_zone_name_station_name_fkey" FOREIGN KEY ("partner_name","factory_name","zone_name","station_name") REFERENCES "public"."stations"("partner_name","factory_name","zone_name","station_name") ON UPDATE CASCADE,
    CONSTRAINT "job_cards_operation_id_operation_version_fkey" FOREIGN KEY ("operation_id","operation_version") REFERENCES "public"."operation_versions"("operation_id","operation_version") ON UPDATE CASCADE,
    CONSTRAINT "job_cards_device_auth_id_fkey" FOREIGN KEY ("device_auth_id") REFERENCES "public"."devices"("device_auth_id"),
    CONSTRAINT "job_cards_worker_id_fkey" FOREIGN KEY ("worker_id") REFERENCES "public"."workers"("worker_id") ON DELETE SET NULL,
    PRIMARY KEY ("job_card_id")
);

and yes we have instabug bug reports that include the application logs.
In those logs we can see that at some point the app starts to update randomly with in a couple seconds what sends the navigation of the rails. What I can't see is the stream configuration and if the filter is still included.

@dshukertjr
Copy link
Member

dshukertjr commented Aug 25, 2023

Thanks for this @KirioXX

Could it be the

emit(state.copyWith(streamStatus: JobCubitStreamStatus.error));

inside onError that is causing the blinking behavior? Or does the UI suggest that JobCubitStreamStatus.error is not being shown?

Also, did this only happen after upgrading to a newer version of supabase_flutter, or is this page itself new and didn't exist before upgrading to a newer version of supabase_flutter?

@KirioXX
Copy link
Author

KirioXX commented Aug 25, 2023

The streamStatus is just a marker we set in the state for logging purposes, somehow I can't see the state at the moment in the logs.
The flickering is actually coming from the navigation that tries to keep up with the quick state updates.

We have not experienced this behaviour on the old supabase versions, but I can't tell you when it started since we haven had active users for quite some time. The screen and the cubit exists for quite some time.

@KirioXX
Copy link
Author

KirioXX commented Aug 25, 2023

What I just noticed it could be that we never experienced this bug because we used to refresh the stream on every user interaction to get around the 15 minute timeout.

@encima
Copy link
Member

encima commented Sep 5, 2023

@KirioXX this is a fairly complex one to reproduce and help debug. If you have a minimal, reproducible example, this would help a lot.

Have you tried setting distinct on the stream subscription or checking if multiple events are firing when they shouldn't be?

@dshukertjr
Copy link
Member

I'm going to close this issue as it is hard to conclude that the filter set on the .stream() method is dropped by the SDK. If anyone is experiencing something similar, feel free to comment and we can reopen this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working realtime This issue or pull request is related to realtime
Projects
None yet
Development

No branches or pull requests

3 participants