Skip to content

Commit

Permalink
Refactor SessionHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsenko committed Apr 13, 2024
1 parent 1786d09 commit a8cecc6
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 98 deletions.
36 changes: 36 additions & 0 deletions packages/realm_dart/lib/src/native/object_handle.dart
Expand Up @@ -12,4 +12,40 @@ class ObjectHandle extends RootedHandleBase<realm_object> {
}

int getClassKey() => realmLib.realm_object_get_table(pointer);

/*
Object? getProperty(int propertyKey) {
return using((Arena arena) {
final realmValue = arena<realm_value_t>();
invokeGetBool(() => realmLib.realm_get_value(pointer, propertyKey, realmValue));
return realmValue.toDartValue(
object.realm,
() => realmLib.realm_get_list(pointer, propertyKey),
() => realmLib.realm_get_dictionary(pointer, propertyKey),
);
});
}
*/

void setProperty(int propertyKey, Object? value, bool isDefault) {
using((Arena arena) {
final realmValue = _toRealmValue(value, arena);
invokeGetBool(() => realmLib.realm_set_value(pointer, propertyKey, realmValue.ref, isDefault));
});
}

/*
void objectSetCollection(int propertyKey, RealmValue value) {
_createCollection(object.realm, value, () => realmLib.realm_set_list(pointer, propertyKey),
() => realmLib.realm_set_dictionary(pointer, propertyKey));
}
*/

String objectToString() {
return realmLib.realm_object_to_string(pointer).cast<Utf8>().toRealmDartString(freeRealmMemory: true)!;
}

void delete() {
invokeGetBool(() => realmLib.realm_object_delete(pointer));
}
}
91 changes: 2 additions & 89 deletions packages/realm_dart/lib/src/native/realm_core.dart
Expand Up @@ -53,6 +53,7 @@ part 'realm_handle.dart';
part 'results_handle.dart';
part 'rooted_handle.dart';
part 'schema_handle.dart';
part 'session_handle.dart';
part 'subscription_handle.dart';
part 'subscription_set_handle.dart';
part 'user_handle.dart';
Expand Down Expand Up @@ -615,18 +616,12 @@ class _RealmCore {
() => realmLib.realm_set_dictionary(object.handle.pointer, propertyKey));
}

String objectToString(RealmObjectBase object) {
return realmLib.realm_object_to_string(object.handle.pointer).cast<Utf8>().toRealmDartString(freeRealmMemory: true)!;
}
String objectToString(RealmObjectBase object) => object.handle.objectToString();

// For debugging
// ignore: unused_element
int get _threadId => realmLib.realm_dart_get_thread_id();

void deleteRealmObject(RealmObjectBase object) {
invokeGetBool(() => realmLib.realm_object_delete(object.handle.pointer));
}

ResultsHandle queryList(RealmList target, String query, List<Object?> args) {
return using((arena) {
final length = args.length;
Expand Down Expand Up @@ -1363,46 +1358,6 @@ class _RealmCore {
return AuthProviderTypeInternal.getByValue(provider);
}

String sessionGetPath(Session session) {
return realmLib.realm_sync_session_get_file_path(session.handle.pointer).cast<Utf8>().toRealmDartString()!;
}

SessionState sessionGetState(Session session) {
final value = realmLib.realm_sync_session_get_state(session.handle.pointer);
return _convertCoreSessionState(value);
}

ConnectionState sessionGetConnectionState(Session session) {
final value = realmLib.realm_sync_session_get_connection_state(session.handle.pointer);
return ConnectionState.values[value];
}

UserHandle sessionGetUser(Session session) {
return UserHandle._(realmLib.realm_sync_session_get_user(session.handle.pointer));
}

SessionState _convertCoreSessionState(int value) {
switch (value) {
case 0: // RLM_SYNC_SESSION_STATE_ACTIVE
case 1: // RLM_SYNC_SESSION_STATE_DYING
return SessionState.active;
case 2: // RLM_SYNC_SESSION_STATE_INACTIVE
case 3: // RLM_SYNC_SESSION_STATE_WAITING_FOR_ACCESS_TOKEN
case 4: // RLM_SYNC_SESSION_STATE_PAUSED
return SessionState.inactive;
default:
throw Exception("Unexpected SessionState: $value");
}
}

void sessionPause(Session session) {
realmLib.realm_sync_session_pause(session.handle.pointer);
}

void sessionResume(Session session) {
realmLib.realm_sync_session_resume(session.handle.pointer);
}

RealmSyncSessionConnectionStateNotificationTokenHandle sessionRegisterProgressNotifier(
Session session, ProgressDirection direction, ProgressMode mode, SessionProgressNotificationsController controller) {
final isStreaming = mode == ProgressMode.reportIndefinitely;
Expand Down Expand Up @@ -1442,41 +1397,6 @@ class _RealmCore {
controller.onConnectionStateChange(ConnectionState.values[oldState], ConnectionState.values[newState]);
}

Future<void> sessionWaitForUpload(Session session, [CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_sessionWaitCompletionCallback);
final userdata = realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle.pointer);
realmLib.realm_sync_session_wait_for_upload_completion(session.handle.pointer, realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

Future<void> sessionWaitForDownload(Session session, [CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_sessionWaitCompletionCallback);
final userdata = realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle.pointer);
realmLib.realm_sync_session_wait_for_download_completion(session.handle.pointer, realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(), realmLib.addresses.realm_dart_userdata_async_free);
}
return completer.future;
}

static void _sessionWaitCompletionCallback(Object userdata, Pointer<realm_error_t> errorCode) {
final completer = userdata as CancellableCompleter<void>;
if (completer.isCancelled) {
return;
}
if (errorCode != nullptr) {
// Throw RealmException instead of RealmError to be recoverable by the user.
completer.completeError(RealmException(errorCode.toDart().toString()));
} else {
completer.complete();
}
}

String _getAppDirectoryFromPlugin() {
assert(isFlutterPlatform);

Expand Down Expand Up @@ -1813,13 +1733,6 @@ class RealmAsyncOpenTaskProgressNotificationTokenHandle extends HandleBase<realm
RealmAsyncOpenTaskProgressNotificationTokenHandle._(Pointer<realm_async_open_task_progress_notification_token_t> pointer) : super(pointer, 40);
}

class SessionHandle extends RootedHandleBase<realm_sync_session_t> {
@override
bool get shouldRoot => true;

SessionHandle._(Pointer<realm_sync_session_t> pointer, RealmHandle root) : super(root, pointer, 24);
}

extension on List<int> {
Pointer<Char> toCharPtr(Allocator allocator) {
return toUint8Ptr(allocator).cast();
Expand Down
6 changes: 6 additions & 0 deletions packages/realm_dart/lib/src/native/scheduler_handle.dart
@@ -1,6 +1,12 @@
// Copyright 2024 MongoDB, Inc.
// SPDX-License-Identifier: Apache-2.0

import 'dart:ffi';

import 'handle_base.dart';
import 'realm_bindings.dart';
import 'realm_library.dart';

class SchedulerHandle extends HandleBase<realm_scheduler> {
SchedulerHandle._(Pointer<realm_scheduler> pointer) : super(pointer, 24);

Expand Down
94 changes: 94 additions & 0 deletions packages/realm_dart/lib/src/native/session_handle.dart
@@ -0,0 +1,94 @@
// Copyright 2024 MongoDB, Inc.
// SPDX-License-Identifier: Apache-2.0

part of 'realm_core.dart';

class SessionHandle extends RootedHandleBase<realm_sync_session_t> {
@override
bool get shouldRoot => true;

SessionHandle._(Pointer<realm_sync_session_t> pointer, RealmHandle root) : super(root, pointer, 24);

String get path {
return realmLib.realm_sync_session_get_file_path(pointer).cast<Utf8>().toRealmDartString()!;
}

ConnectionState get connectionState {
final value = realmLib.realm_sync_session_get_connection_state(pointer);
return ConnectionState.values[value];
}

UserHandle get user {
return UserHandle._(realmLib.realm_sync_session_get_user(pointer));
}

SessionState get state {
final value = realmLib.realm_sync_session_get_state(pointer);
return _convertCoreSessionState(value);
}

SessionState _convertCoreSessionState(int value) {
switch (value) {
case 0: // RLM_SYNC_SESSION_STATE_ACTIVE
case 1: // RLM_SYNC_SESSION_STATE_DYING
return SessionState.active;
case 2: // RLM_SYNC_SESSION_STATE_INACTIVE
case 3: // RLM_SYNC_SESSION_STATE_WAITING_FOR_ACCESS_TOKEN
case 4: // RLM_SYNC_SESSION_STATE_PAUSED
return SessionState.inactive;
default:
throw Exception("Unexpected SessionState: $value");
}
}

void pause() {
realmLib.realm_sync_session_pause(pointer);
}

void resume() {
realmLib.realm_sync_session_resume(pointer);
}

Future<void> waitForUpload([CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_waitCompletionCallback);
final userdata = realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle.pointer);
realmLib.realm_sync_session_wait_for_upload_completion(
pointer,
realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(),
realmLib.addresses.realm_dart_userdata_async_free,
);
}
return completer.future;
}

Future<void> waitForDownload([CancellationToken? cancellationToken]) {
final completer = CancellableCompleter<void>(cancellationToken);
if (!completer.isCancelled) {
final callback = Pointer.fromFunction<Void Function(Handle, Pointer<realm_error_t>)>(_waitCompletionCallback);
final userdata = realmLib.realm_dart_userdata_async_new(completer, callback.cast(), scheduler.handle.pointer);
realmLib.realm_sync_session_wait_for_download_completion(
pointer,
realmLib.addresses.realm_dart_sync_wait_for_completion_callback,
userdata.cast(),
realmLib.addresses.realm_dart_userdata_async_free,
);
}
return completer.future;
}

static void _waitCompletionCallback(Object userdata, Pointer<realm_error_t> errorCode) {
final completer = userdata as CancellableCompleter<void>;
if (completer.isCancelled) {
return;
}
if (errorCode != nullptr) {
// Throw RealmException instead of RealmError to be recoverable by the user.
completer.completeError(RealmException(errorCode.toDart().toString()));
} else {
completer.complete();
}
}
}
2 changes: 1 addition & 1 deletion packages/realm_dart/lib/src/realm_class.dart
Expand Up @@ -317,7 +317,7 @@ class Realm implements Finalizable {

_ensureManagedByThis(object, 'delete object from Realm');

realmCore.deleteRealmObject(object);
object.handle.delete();
}

/// Deletes many [RealmObject]s from this `Realm`.
Expand Down
16 changes: 8 additions & 8 deletions packages/realm_dart/lib/src/session.dart
Expand Up @@ -18,38 +18,38 @@ class Session implements Finalizable {
final SessionHandle _handle;

/// The on-disk path of the file backing the [Realm] this [Session] represents
String get realmPath => realmCore.sessionGetPath(this);
String get realmPath => this.handle.path;

/// The session’s current state. This is different from [connectionState] since a
/// session may be active, even if the connection is disconnected (e.g. due to the device
/// being offline).
SessionState get state => realmCore.sessionGetState(this);
SessionState get state => this.handle.state;

/// The session’s current connection state. This is the physical state of the connection
/// and is different from the session's logical state, which is returned by [state].
ConnectionState get connectionState => realmCore.sessionGetConnectionState(this);
ConnectionState get connectionState => this.handle.connectionState;

/// The [User] that owns the [Realm] this [Session] is synchronizing.
User get user => UserInternal.create(realmCore.sessionGetUser(this));
User get user => UserInternal.create(this.handle.user);

Session._(this._handle);

/// Pauses any synchronization with the server until the Realm is re-opened again
/// after fully closing it or [resume] is called.
void pause() => realmCore.sessionPause(this);
void pause() => this.handle.pause();

/// Attempts to resume the session and enable synchronization with the server.
/// All sessions are active by default and calling this method is only necessary
/// if [pause] was called previously.
void resume() => realmCore.sessionResume(this);
void resume() => this.handle.resume();

/// Waits for the [Session] to finish all pending uploads.
/// An optional [cancellationToken] can be used to cancel the wait operation.
Future<void> waitForUpload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForUpload(this, cancellationToken);
Future<void> waitForUpload([CancellationToken? cancellationToken]) => this.handle.waitForUpload(cancellationToken);

/// Waits for the [Session] to finish all pending downloads.
/// An optional [cancellationToken] can be used to cancel the wait operation.
Future<void> waitForDownload([CancellationToken? cancellationToken]) => realmCore.sessionWaitForDownload(this, cancellationToken);
Future<void> waitForDownload([CancellationToken? cancellationToken]) => this.handle.waitForDownload(cancellationToken);

/// Gets a [Stream] of [SyncProgress] that can be used to track upload or download progress.
Stream<SyncProgress> getProgressStream(ProgressDirection direction, ProgressMode mode) {
Expand Down

0 comments on commit a8cecc6

Please sign in to comment.