Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: re-enable sync outbox #1759

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/services/window_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import 'dart:ui';

import 'package:lotti/database/settings_db.dart';
import 'package:lotti/get_it.dart';
import 'package:lotti/sync/outbox/outbox_service.dart';
import 'package:lotti/utils/platform.dart';
import 'package:window_manager/window_manager.dart';

Expand Down Expand Up @@ -50,7 +49,7 @@ class WindowService implements WindowListener {

@override
void onWindowFocus() {
getIt<OutboxService>().restartRunner();
//getIt<OutboxService>().restartRunner();
}

@override
Expand Down
15 changes: 8 additions & 7 deletions lib/sync/matrix/matrix_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,16 @@ class MatrixService {
Future<void> startKeyVerificationListener() =>
listenForKeyVerificationRequests(service: this);

Future<void> sendMatrixMsg(
Future<bool> sendMatrixMsg(
SyncMessage syncMessage, {
String? myRoomId,
}) =>
sendMessage(
syncMessage,
service: this,
myRoomId: myRoomId,
);
}) async {
return sendMessage(
syncMessage,
service: this,
myRoomId: myRoomId,
);
}

Future<void> logout() async {
if (_client.isLogged()) {
Expand Down
8 changes: 5 additions & 3 deletions lib/sync/matrix/send_message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import 'package:matrix/matrix.dart';
/// Also updates some stats on sent message counts on the [service].
/// The send function will terminate early (and thus refuse to send anything)
/// when there are users with unverified device in the room.
Future<void> sendMessage(
Future<bool> sendMessage(
SyncMessage syncMessage, {
required MatrixService service,
required String? myRoomId,
Expand Down Expand Up @@ -51,7 +51,7 @@ Future<void> sendMessage(
domain: 'MATRIX_SERVICE',
subDomain: 'sendMatrixMsg',
);
return;
return false;
}

if (roomId == null) {
Expand All @@ -60,7 +60,7 @@ Future<void> sendMessage(
domain: 'MATRIX_SERVICE',
subDomain: 'sendMatrixMsg',
);
return;
return false;
}

loggingDb.captureEvent(
Expand Down Expand Up @@ -158,6 +158,7 @@ Future<void> sendMessage(
orElse: () {},
);
}
return true;
} catch (e, stackTrace) {
debugPrint('MATRIX: Error sending message: $e');
loggingDb.captureException(
Expand All @@ -167,4 +168,5 @@ Future<void> sendMessage(
stackTrace: stackTrace,
);
}
return false;
}
115 changes: 100 additions & 15 deletions lib/sync/outbox/outbox_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';
import 'dart:io';

import 'package:drift/drift.dart';
import 'package:flutter/foundation.dart';
import 'package:lotti/blocs/sync/outbox_state.dart';
import 'package:lotti/classes/journal_entities.dart';
import 'package:lotti/classes/sync_message.dart';
Expand All @@ -12,21 +11,27 @@ import 'package:lotti/database/logging_db.dart';
import 'package:lotti/database/sync_db.dart';
import 'package:lotti/get_it.dart';
import 'package:lotti/services/vector_clock_service.dart';
import 'package:lotti/sync/client_runner.dart';
import 'package:lotti/sync/matrix/matrix_service.dart';
import 'package:lotti/utils/audio_utils.dart';
import 'package:lotti/utils/consts.dart';
import 'package:lotti/utils/file_utils.dart';
import 'package:lotti/utils/image_utils.dart';

class OutboxService {
OutboxService() {
_startRunner();
}
final LoggingDb _loggingDb = getIt<LoggingDb>();
final SyncDatabase _syncDatabase = getIt<SyncDatabase>();

Future<void> restartRunner() async {
_loggingDb.captureEvent(
'Restarting',
domain: 'OUTBOX',
subDomain: 'restartRunner()',
late ClientRunner<int> _clientRunner;

void _startRunner() {
_clientRunner = ClientRunner<int>(
callback: (event) async {
await sendNext();
},
);
}

Expand All @@ -36,14 +41,6 @@ class OutboxService {

Future<void> enqueueMessage(SyncMessage syncMessage) async {
try {
final enableMatrix = await getIt<JournalDb>().getConfigFlag(
enableMatrixFlag,
);

if (enableMatrix) {
await getIt<MatrixService>().sendMatrixMsg(syncMessage);
}

final vectorClockService = getIt<VectorClockService>();
final hostHash = await vectorClockService.getHostHash();
final host = await vectorClockService.getHost();
Expand Down Expand Up @@ -111,8 +108,8 @@ class OutboxService {
),
);
}
await enqueueNextSendRequest();
} catch (exception, stackTrace) {
debugPrint('enqueueMessage $exception \n$stackTrace');
_loggingDb.captureException(
exception,
domain: 'OUTBOX',
Expand All @@ -121,4 +118,92 @@ class OutboxService {
);
}
}

Future<void> sendNext() async {
try {
final enableMatrix = await getIt<JournalDb>().getConfigFlag(
enableMatrixFlag,
);

if (!enableMatrix) {
return;
}

final unprocessed = await getNextItems();
if (unprocessed.isNotEmpty) {
final nextPending = unprocessed.first;

_loggingDb.captureEvent(
'trying ${nextPending.subject} ',
domain: 'OUTBOX',
subDomain: 'sendNext()',
);

try {
final syncMessage = SyncMessage.fromJson(
json.decode(nextPending.message) as Map<String, dynamic>,
);

final success =
await getIt<MatrixService>().sendMatrixMsg(syncMessage);

if (success) {
await _syncDatabase.updateOutboxItem(
OutboxCompanion(
id: Value(nextPending.id),
status: Value(OutboxStatus.sent.index),
updatedAt: Value(DateTime.now()),
),
);
if (unprocessed.length > 1) {
await enqueueNextSendRequest();
}
} else {
await enqueueNextSendRequest(
delay: const Duration(seconds: 15),
);
}

_loggingDb.captureEvent(
'${nextPending.subject} done',
domain: 'OUTBOX',
subDomain: 'sendNext()',
);
} catch (e) {
await _syncDatabase.updateOutboxItem(
OutboxCompanion(
id: Value(nextPending.id),
status: Value(
nextPending.retries < 10
? OutboxStatus.pending.index
: OutboxStatus.error.index,
),
retries: Value(nextPending.retries + 1),
updatedAt: Value(DateTime.now()),
),
);
await enqueueNextSendRequest(delay: const Duration(seconds: 15));
}
}
} catch (exception, stackTrace) {
_loggingDb.captureException(
exception,
domain: 'OUTBOX',
subDomain: 'sendNext',
stackTrace: stackTrace,
);
await enqueueNextSendRequest(delay: const Duration(seconds: 15));
}
}

Future<void> enqueueNextSendRequest({
Duration delay = const Duration(milliseconds: 1),
}) async {
unawaited(
Future<void>.delayed(delay).then((_) {
_clientRunner.enqueueRequest(DateTime.now().millisecondsSinceEpoch);
_loggingDb.captureEvent('enqueueRequest() done', domain: 'OUTBOX');
}),
);
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: lotti
description: Achieve your goals and keep your data private with Lotti.
publish_to: 'none'
version: 0.9.470+2540
version: 0.9.471+2541

msix_config:
display_name: LottiApp
Expand Down
Loading