Skip to content

Commit

Permalink
feat: re-enable sync outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
matthiasn committed May 30, 2024
1 parent 56d409d commit bc17b84
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 28 deletions.
3 changes: 1 addition & 2 deletions lib/services/window_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import 'dart:ui';

import 'package:lotti/database/settings_db.dart';
import 'package:lotti/get_it.dart';
import 'package:lotti/sync/outbox/outbox_service.dart';
import 'package:lotti/utils/platform.dart';
import 'package:window_manager/window_manager.dart';

Expand Down Expand Up @@ -50,7 +49,7 @@ class WindowService implements WindowListener {

@override
void onWindowFocus() {
getIt<OutboxService>().restartRunner();
//getIt<OutboxService>().restartRunner();
}

@override
Expand Down
15 changes: 8 additions & 7 deletions lib/sync/matrix/matrix_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,16 @@ class MatrixService {
Future<void> startKeyVerificationListener() =>
listenForKeyVerificationRequests(service: this);

Future<void> sendMatrixMsg(
Future<bool> sendMatrixMsg(
SyncMessage syncMessage, {
String? myRoomId,
}) =>
sendMessage(
syncMessage,
service: this,
myRoomId: myRoomId,
);
}) async {
return sendMessage(
syncMessage,
service: this,
myRoomId: myRoomId,
);
}

Future<void> logout() async {
if (_client.isLogged()) {
Expand Down
8 changes: 5 additions & 3 deletions lib/sync/matrix/send_message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import 'package:matrix/matrix.dart';
/// Also updates some stats on sent message counts on the [service].
/// The send function will terminate early (and thus refuse to send anything)
/// when there are users with unverified device in the room.
Future<void> sendMessage(
Future<bool> sendMessage(
SyncMessage syncMessage, {
required MatrixService service,
required String? myRoomId,
Expand Down Expand Up @@ -51,7 +51,7 @@ Future<void> sendMessage(
domain: 'MATRIX_SERVICE',
subDomain: 'sendMatrixMsg',
);
return;
return false;
}

if (roomId == null) {
Expand All @@ -60,7 +60,7 @@ Future<void> sendMessage(
domain: 'MATRIX_SERVICE',
subDomain: 'sendMatrixMsg',
);
return;
return false;
}

loggingDb.captureEvent(
Expand Down Expand Up @@ -158,6 +158,7 @@ Future<void> sendMessage(
orElse: () {},
);
}
return true;
} catch (e, stackTrace) {
debugPrint('MATRIX: Error sending message: $e');
loggingDb.captureException(
Expand All @@ -167,4 +168,5 @@ Future<void> sendMessage(
stackTrace: stackTrace,
);
}
return false;
}
115 changes: 100 additions & 15 deletions lib/sync/outbox/outbox_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';
import 'dart:io';

import 'package:drift/drift.dart';
import 'package:flutter/foundation.dart';
import 'package:lotti/blocs/sync/outbox_state.dart';
import 'package:lotti/classes/journal_entities.dart';
import 'package:lotti/classes/sync_message.dart';
Expand All @@ -12,21 +11,27 @@ import 'package:lotti/database/logging_db.dart';
import 'package:lotti/database/sync_db.dart';
import 'package:lotti/get_it.dart';
import 'package:lotti/services/vector_clock_service.dart';
import 'package:lotti/sync/client_runner.dart';
import 'package:lotti/sync/matrix/matrix_service.dart';
import 'package:lotti/utils/audio_utils.dart';
import 'package:lotti/utils/consts.dart';
import 'package:lotti/utils/file_utils.dart';
import 'package:lotti/utils/image_utils.dart';

class OutboxService {
OutboxService() {
_startRunner();
}
final LoggingDb _loggingDb = getIt<LoggingDb>();
final SyncDatabase _syncDatabase = getIt<SyncDatabase>();

Future<void> restartRunner() async {
_loggingDb.captureEvent(
'Restarting',
domain: 'OUTBOX',
subDomain: 'restartRunner()',
late ClientRunner<int> _clientRunner;

void _startRunner() {
_clientRunner = ClientRunner<int>(
callback: (event) async {
await sendNext();
},
);
}

Expand All @@ -36,14 +41,6 @@ class OutboxService {

Future<void> enqueueMessage(SyncMessage syncMessage) async {
try {
final enableMatrix = await getIt<JournalDb>().getConfigFlag(
enableMatrixFlag,
);

if (enableMatrix) {
await getIt<MatrixService>().sendMatrixMsg(syncMessage);
}

final vectorClockService = getIt<VectorClockService>();
final hostHash = await vectorClockService.getHostHash();
final host = await vectorClockService.getHost();
Expand Down Expand Up @@ -111,8 +108,8 @@ class OutboxService {
),
);
}
await enqueueNextSendRequest();
} catch (exception, stackTrace) {
debugPrint('enqueueMessage $exception \n$stackTrace');
_loggingDb.captureException(
exception,
domain: 'OUTBOX',
Expand All @@ -121,4 +118,92 @@ class OutboxService {
);
}
}

Future<void> sendNext() async {
try {
final enableMatrix = await getIt<JournalDb>().getConfigFlag(
enableMatrixFlag,
);

if (!enableMatrix) {
return;
}

final unprocessed = await getNextItems();
if (unprocessed.isNotEmpty) {
final nextPending = unprocessed.first;

_loggingDb.captureEvent(
'trying ${nextPending.subject} ',
domain: 'OUTBOX',
subDomain: 'sendNext()',
);

try {
final syncMessage = SyncMessage.fromJson(
json.decode(nextPending.message) as Map<String, dynamic>,
);

final success =
await getIt<MatrixService>().sendMatrixMsg(syncMessage);

if (success) {
await _syncDatabase.updateOutboxItem(
OutboxCompanion(
id: Value(nextPending.id),
status: Value(OutboxStatus.sent.index),
updatedAt: Value(DateTime.now()),
),
);
if (unprocessed.length > 1) {
await enqueueNextSendRequest();
}
} else {
await enqueueNextSendRequest(
delay: const Duration(seconds: 15),
);
}

_loggingDb.captureEvent(
'${nextPending.subject} done',
domain: 'OUTBOX',
subDomain: 'sendNext()',
);
} catch (e) {
await _syncDatabase.updateOutboxItem(
OutboxCompanion(
id: Value(nextPending.id),
status: Value(
nextPending.retries < 10
? OutboxStatus.pending.index
: OutboxStatus.error.index,
),
retries: Value(nextPending.retries + 1),
updatedAt: Value(DateTime.now()),
),
);
await enqueueNextSendRequest(delay: const Duration(seconds: 15));
}
}
} catch (exception, stackTrace) {
_loggingDb.captureException(
exception,
domain: 'OUTBOX',
subDomain: 'sendNext',
stackTrace: stackTrace,
);
await enqueueNextSendRequest(delay: const Duration(seconds: 15));
}
}

Future<void> enqueueNextSendRequest({
Duration delay = const Duration(milliseconds: 1),
}) async {
unawaited(
Future<void>.delayed(delay).then((_) {
_clientRunner.enqueueRequest(DateTime.now().millisecondsSinceEpoch);
_loggingDb.captureEvent('enqueueRequest() done', domain: 'OUTBOX');
}),
);
}
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: lotti
description: Achieve your goals and keep your data private with Lotti.
publish_to: 'none'
version: 0.9.470+2540
version: 0.9.471+2541

msix_config:
display_name: LottiApp
Expand Down

0 comments on commit bc17b84

Please sign in to comment.