Skip to content

Commit

Permalink
feat: add refreshed feature for sync task
Browse files Browse the repository at this point in the history
  • Loading branch information
wuchuheng committed Nov 3, 2022
1 parent cf01971 commit abb1a1c
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 145 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,7 @@ _ feat: extended 2 parameters: hasLocalCache and hasOnlineCache in the connector
## 1.0.20

- chore: update isolate_channel dependence.

## 1.0.21

- feat: add refreshed feature for sync task.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import ` 'package:wuchuheng_imap_cache/wuchuheng_imap_cache.dart'`;
## Usage

```dart
import 'package:wuchuheng_imap_cache/index.dart';
import 'package:wuchuheng_imap_cache/sync_service.dart';
void main() async {
final ImapCache cacheServiceInstance = await ImapCache().connectToServer(
Expand Down
2 changes: 1 addition & 1 deletion lib/src/middleware/index.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'package:wuchuheng_imap_cache/src/dto/before_unset/result_data/index.dart
import 'package:wuchuheng_imap_cache/src/dto/callback_data/index.dart';
import 'package:wuchuheng_imap_cache/src/dto/channel_name.dart';
import 'package:wuchuheng_imap_cache/src/dto/set_data/index.dart';
import 'package:wuchuheng_imap_cache/src/service/imap_cache_service/index.dart';
import 'package:wuchuheng_imap_cache/src/service/imap_cache_service/imap_cache_service.dart';
import 'package:wuchuheng_isolate_channel/wuchuheng_isolate_channel.dart';

import '../../wuchuheng_imap_cache.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import 'package:wuchuheng_imap_cache/src/subscription/subscription_imp.dart';
import 'package:wuchuheng_logger/wuchuheng_logger.dart';

import '../../dao/db.dart';
import '../sync_service/index.dart';
import '../sync_service/sync_service.dart';
import '../sync_service/sync_service_abstract.dart';

class ImapCacheServiceI implements ImapCacheService {
hook.SubjectHook<Duration> afterSyncSubject = hook.SubjectHook();
Expand Down Expand Up @@ -60,6 +60,7 @@ class ImapCacheServiceI implements ImapCacheService {
value = await _subscriptionImp.beforeSetSubscribeConsume(key: key, value: value, from: from);
await _localCacheService.set(key: key, value: value);
_subscriptionImp.afterSetSubscribeConsume(key: key, value: value, from: from);
if (from == From.local) _syncService.refresh();
Logger.info('After setting the cache. key:$key value: $value');
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/service/sync_service/IMAP_sync_service/index.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import 'package:wuchuheng_logger/wuchuheng_logger.dart';

import '../../../../wuchuheng_imap_cache.dart';
import '../../../dao/db.dart';
import '../../imap_cache_service/index.dart';
import '../../imap_cache_service/imap_cache_service.dart';
import '../../imap_directory_service/index.dart';
import 'IMAP_sync_service.dart';

Expand Down
134 changes: 0 additions & 134 deletions lib/src/service/sync_service/index.dart

This file was deleted.

2 changes: 0 additions & 2 deletions lib/src/service/sync_service/set_sync_interval.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'sync_event.dart';

abstract class SetSyncInterval {
Future<void> setSyncInterval(int second);
}
156 changes: 152 additions & 4 deletions lib/src/service/sync_service/sync_service.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,156 @@
import 'set_sync_interval.dart';
import 'dart:async';

import 'package:wuchuheng_hooks/wuchuheng_hooks.dart';
import 'package:wuchuheng_imap_cache/src/service/imap_client_service/index.dart';
import 'package:wuchuheng_imap_cache/src/service/imap_directory_service/index.dart';
import 'package:wuchuheng_imap_cache/src/service/sync_service/sync_service_abstract.dart';
import 'package:wuchuheng_logger/wuchuheng_logger.dart';

import '../../dao/db.dart';
import '../../dto/connect_config/index.dart';
import '../imap_cache_service/imap_cache_service.dart';
import 'IMAP_sync_service/index.dart';
import 'sync_event.dart';

abstract class SyncService implements SyncEvent, SetSyncInterval {
Future<void> start();
/// 同步状态
enum SyncStatus {
processing,
pending,
}

class SyncServiceI implements SyncService {
late ImapDirectoryService _imapDirectoryService;
final ConnectConfig _config;
final LocalSQLite _localSQLite;
late final ImapCacheServiceI _imapCache;
final beforeStartSubject = SubjectHook<Duration>();
final afterCompletedSubject = SubjectHook<Duration>();
final onUpdateSubject = SubjectHook<void>();
final onUpdatedSubject = SubjectHook<void>();
final onDownloadSubject = SubjectHook<void>();
final onDownloadedSubject = SubjectHook<void>();
bool _isInit = false;
bool _isRunning = false;
late ImapClientService _imapClientService;
late int _syncIntervalSeconds;
Completer<void> _syncBlock = Completer();
Timer? _syncTimer;
SyncStatus _syncStatus = SyncStatus.pending;

SyncServiceI(this._config, this._localSQLite, this._imapCache) {
_syncIntervalSeconds = _config.syncIntervalSeconds;
}

Future<void> _init(ConnectConfig config) async {
_imapClientService = ImapClientService(
userName: config.userName,
password: config.password,
imapServerHost: config.imapServerHost,
imapServerPort: config.imapServerPort,
isImapServerSecure: config.isImapServerSecure,
);
final imapDirectoryService = ImapDirectoryService(
path: config.boxName,
imapClientService: _imapClientService,
localSQLite: _localSQLite,
);
_imapDirectoryService = imapDirectoryService;
if (!await _imapDirectoryService.exists()) {
await _imapDirectoryService.create();
await Future.delayed(Duration(seconds: 5));
}
}

/// Start synchronizing data
@override
Future<void> start() async {
_isRunning = true;
Completer<void> completer = Completer();
if (!_isInit) {
await _init(_config);
_isInit = true;
completer.complete();
await _imapDirectoryService.selectPath();
}
syncData() async {
if (!_isRunning) return;
try {
_syncStatus = SyncStatus.processing;
beforeStartSubject.next(Duration(seconds: _syncIntervalSeconds));
await IMAPSyncServiceI(
imapDirectoryService: _imapDirectoryService,
localSQLite: _localSQLite,
imapCache: _imapCache,
onUpdate: () => onUpdateSubject.next(null),
onUpdated: () => onUpdatedSubject.next(null),
onDownload: () => onDownloadSubject.next(null),
onDownloaded: () => onDownloadedSubject.next(null),
).start();
afterCompletedSubject.next(Duration(seconds: _syncIntervalSeconds));
} catch (e, stack) {
Logger.error(e.toString());
try {
Logger.info('Try to re-establish the connection.');
_imapCache.disconnect();
await Future.delayed(Duration(seconds: 1));
await _imapCache.connectToServer(_config);
} catch (e) {
Logger.error('Retry connection failed.');
// TODO: the callback that triggered the connection failure is reported here.
}
}
_syncBlock = Completer();
_syncTimer?.cancel();
_syncTimer = Timer(Duration(seconds: _syncIntervalSeconds), () => _syncBlock.complete());
_syncStatus = SyncStatus.pending;
await _syncBlock.future;
await syncData();
}

syncData();

return completer.future;
}

@override
Future<void> stop() async {
_isRunning = false;
_syncTimer?.cancel();
_isInit = false;
final client = await _imapClientService.getClient();
client.disconnect();
Logger.info('Stop data synchronization');
}

@override
Unsubscribe afterSync(AfterSyncCallback callback) => afterCompletedSubject.subscribe((v, _) => callback(v));

@override
Unsubscribe beforeSync(BeforeSyncCallback callback) {
return beforeStartSubject.subscribe((value, cancel) => callback(value));
}

@override
Future<void> setSyncInterval(int second) async => _syncIntervalSeconds = second;

@override
Unsubscribe onUpdate(void Function() callback) => onUpdateSubject.subscribe((value, _) => callback());

@override
Unsubscribe onUpdated(void Function() callback) => onUpdatedSubject.subscribe((value, _) => callback());

@override
Unsubscribe onDownload(void Function() callback) => onDownloadSubject.subscribe((value, _) => callback());

@override
Unsubscribe onDownloaded(void Function() callback) => onDownloadedSubject.subscribe((value, _) => callback());

Future<void> stop();
@override
void refresh() {
if (_syncStatus == SyncStatus.pending) {
Logger.info('Refresh data synchronization.');
_syncTimer?.cancel();
_syncBlock.complete();
}
}
}
14 changes: 14 additions & 0 deletions lib/src/service/sync_service/sync_service_abstract.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import 'set_sync_interval.dart';
import 'sync_event.dart';

abstract class SyncService implements SyncEvent, SetSyncInterval {
/// Start syncing tasks.
Future<void> start();

/// Stop syncing tasks.
Future<void> stop();

/// Refreshing synchronization tasks.
/// Works when the sync task is blocking.
void refresh();
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: wuchuheng_imap_cache
description: wuchuheng_imap_cache is a data-driven caching library based on the IMAP protocol.
version: 1.0.20
version: 1.0.21
homepage: https://github.com/wuchuheng/imap_cache_dart

environment:
Expand Down

0 comments on commit abb1a1c

Please sign in to comment.