diff --git a/Realm/Realm/Configurations/InMemoryConfiguration.cs b/Realm/Realm/Configurations/InMemoryConfiguration.cs index 6707786a61..0e95879400 100644 --- a/Realm/Realm/Configurations/InMemoryConfiguration.cs +++ b/Realm/Realm/Configurations/InMemoryConfiguration.cs @@ -17,6 +17,7 @@ //////////////////////////////////////////////////////////////////////////// using System; +using System.Threading; using System.Threading.Tasks; using Realms.Native; using Realms.Schema; @@ -63,7 +64,7 @@ internal override Realm CreateRealm(RealmSchema schema) return new Realm(new SharedRealmHandle(srPtr), this, schema); } - internal override Task CreateRealmAsync(RealmSchema schema) + internal override Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken) { return Task.FromResult(CreateRealm(schema)); } diff --git a/Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs b/Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs index 3b73e34128..028287ba25 100644 --- a/Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs +++ b/Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs @@ -17,6 +17,8 @@ //////////////////////////////////////////////////////////////////////////// using System; +using System.Threading; +using System.Threading.Tasks; using Realms.Schema; namespace Realms.Sync @@ -65,6 +67,12 @@ public QueryBasedSyncConfiguration(Uri serverUri = null, User user = null, strin { } + internal override Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken) + { + schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema); + return base.CreateRealmAsync(schema, cancellationToken); + } + internal override Realm CreateRealm(RealmSchema schema) { schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema); diff --git a/Realm/Realm/Configurations/RealmConfiguration.cs b/Realm/Realm/Configurations/RealmConfiguration.cs index 9ac49eaaa9..e01af80f4a 100644 --- a/Realm/Realm/Configurations/RealmConfiguration.cs +++ b/Realm/Realm/Configurations/RealmConfiguration.cs @@ -19,6 +19,7 @@ using System; using System.Linq; using System.Runtime.InteropServices; +using System.Threading; using System.Threading.Tasks; using Realms.Exceptions; using Realms.Helpers; @@ -166,7 +167,7 @@ internal override Realm CreateRealm(RealmSchema schema) return new Realm(srHandle, this, schema); } - internal override Task CreateRealmAsync(RealmSchema schema) + internal override Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken) { // Can't use async/await due to mono inliner bugs // If we are on UI thread will be set but often also set on long-lived workers to use Post back to UI thread. @@ -177,7 +178,7 @@ internal override Task CreateRealmAsync(RealmSchema schema) using (CreateRealm(schema)) { } - }).ContinueWith(_ => CreateRealm(schema), scheduler); + }, cancellationToken).ContinueWith(_ => CreateRealm(schema), scheduler); } return Task.FromResult(CreateRealm(schema)); @@ -198,4 +199,4 @@ private static bool ShouldCompactOnLaunchCallback(IntPtr delegatePtr, ulong tota } } } -} \ No newline at end of file +} diff --git a/Realm/Realm/Configurations/RealmConfigurationBase.cs b/Realm/Realm/Configurations/RealmConfigurationBase.cs index d5b9c00c01..5d7aee9102 100644 --- a/Realm/Realm/Configurations/RealmConfigurationBase.cs +++ b/Realm/Realm/Configurations/RealmConfigurationBase.cs @@ -18,6 +18,7 @@ using System; using System.IO; +using System.Threading; using System.Threading.Tasks; using Realms.Schema; @@ -141,6 +142,6 @@ internal RealmConfigurationBase Clone() internal abstract Realm CreateRealm(RealmSchema schema); - internal abstract Task CreateRealmAsync(RealmSchema schema); + internal abstract Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken); } -} \ No newline at end of file +} diff --git a/Realm/Realm/Configurations/SyncConfigurationBase.cs b/Realm/Realm/Configurations/SyncConfigurationBase.cs index 11dd3d8045..f999183576 100644 --- a/Realm/Realm/Configurations/SyncConfigurationBase.cs +++ b/Realm/Realm/Configurations/SyncConfigurationBase.cs @@ -21,6 +21,7 @@ using System.Linq; using System.Reflection; using System.Runtime.InteropServices; +using System.Threading; using System.Threading.Tasks; using Realms.Helpers; using Realms.Schema; @@ -207,26 +208,66 @@ internal override Realm CreateRealm(RealmSchema schema) return new Realm(srHandle, this, schema); } - internal override async Task CreateRealmAsync(RealmSchema schema) + internal override async Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken) { - var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey)); - IDisposable subscription = null; + var configuration = new Realms.Native.Configuration + { + Path = DatabasePath, + schema_version = SchemaVersion, + enable_cache = EnableCache + }; + + var tcs = new TaskCompletionSource(); + var tcsHandle = GCHandle.Alloc(tcs); + ProgressNotificationToken progressToken = null; try { - if (OnProgress != null) + using (var handle = SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey, tcsHandle)) { - var observer = new Observer(OnProgress); - subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer); + cancellationToken.Register(() => + { + if (!handle.IsClosed) + { + handle.Cancel(); + tcs.TrySetCanceled(); + } + }); + + if (OnProgress != null) + { + progressToken = new ProgressNotificationToken( + observer: (progress) => + { + OnProgress(progress); + }, + register: handle.RegisterProgressNotifier, + unregister: (token) => + { + if (!handle.IsClosed) + { + handle.UnregisterProgressNotifier(token); + } + }); + } + + using (var realmReference = await tcs.Task) + { + var realmPtr = SharedRealmHandle.ResolveFromReference(realmReference); + var sharedRealmHandle = new SharedRealmHandle(realmPtr); + if (IsDynamic && !schema.Any()) + { + sharedRealmHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema)); + } + + return new Realm(sharedRealmHandle, this, schema); + } } - await session.WaitForDownloadAsync(); } finally { - subscription?.Dispose(); - session.CloseHandle(); + tcsHandle.Free(); + progressToken?.Dispose(); } - - return CreateRealm(schema); } internal Native.SyncConfiguration ToNative() @@ -254,4 +295,4 @@ internal static string GetSDKUserAgent() return $"RealmDotNet/{version} ({RuntimeInformation.FrameworkDescription})"; } } -} \ No newline at end of file +} diff --git a/Realm/Realm/Handles/AsyncOpenTaskHandle.cs b/Realm/Realm/Handles/AsyncOpenTaskHandle.cs new file mode 100644 index 0000000000..77362e37cf --- /dev/null +++ b/Realm/Realm/Handles/AsyncOpenTaskHandle.cs @@ -0,0 +1,69 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2019 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +using System; +using System.Runtime.InteropServices; + +namespace Realms +{ + internal class AsyncOpenTaskHandle : RealmHandle + { + private static class NativeMethods + { + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_destroy", CallingConvention = CallingConvention.Cdecl)] + public static extern void destroy(IntPtr asyncTaskHandle); + + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_cancel", CallingConvention = CallingConvention.Cdecl)] + public static extern void cancel(AsyncOpenTaskHandle handle, out NativeException ex); + + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_register_progress_notifier", CallingConvention = CallingConvention.Cdecl)] + public static extern ulong register_progress_notifier(AsyncOpenTaskHandle handle, IntPtr token_ptr, out NativeException ex); + + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_unregister_progress_notifier", CallingConvention = CallingConvention.Cdecl)] + public static extern void unregister_progress_notifier(AsyncOpenTaskHandle handle, ulong token, out NativeException ex); + } + + public AsyncOpenTaskHandle(IntPtr handle) : base(null, handle) + { + } + + public void Cancel() + { + NativeMethods.cancel(this, out var ex); + ex.ThrowIfNecessary(); + } + + protected override void Unbind() + { + NativeMethods.destroy(handle); + } + + public ulong RegisterProgressNotifier(GCHandle managedHandle) + { + var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), out var ex); + ex.ThrowIfNecessary(); + return token; + } + + public void UnregisterProgressNotifier(ulong token) + { + NativeMethods.unregister_progress_notifier(this, token, out var ex); + ex.ThrowIfNecessary(); + } + } +} diff --git a/Realm/Realm/Handles/NotifiableObjectHandleBase.cs b/Realm/Realm/Handles/NotifiableObjectHandleBase.cs index c96e0da60b..faff2e39f2 100644 --- a/Realm/Realm/Handles/NotifiableObjectHandleBase.cs +++ b/Realm/Realm/Handles/NotifiableObjectHandleBase.cs @@ -57,7 +57,7 @@ protected NotifiableObjectHandleBase(RealmHandle root, IntPtr handle) : base(roo public IntPtr DestroyNotificationToken(IntPtr token) { - var result = NativeMethods.destroy_notificationtoken(token, out NativeException nativeException); + var result = NativeMethods.destroy_notificationtoken(token, out var nativeException); nativeException.ThrowIfNecessary(); return result; } diff --git a/Realm/Realm/Handles/SessionHandle.cs b/Realm/Realm/Handles/SessionHandle.cs index 7489180027..92f6744245 100644 --- a/Realm/Realm/Handles/SessionHandle.cs +++ b/Realm/Realm/Handles/SessionHandle.cs @@ -65,7 +65,7 @@ private static class NativeMethods [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)] [return: MarshalAs(UnmanagedType.I1)] - public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex); + public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex); [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)] public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal); @@ -131,10 +131,10 @@ public void RefreshAccessToken(string accessToken, string serverPath) ex.ThrowIfNecessary(); } - public ulong RegisterProgressNotifier(IntPtr tokenPtr, ProgressDirection direction, ProgressMode mode) + public ulong RegisterProgressNotifier(GCHandle managedHandle, ProgressDirection direction, ProgressMode mode) { var isStreaming = mode == ProgressMode.ReportIndefinitely; - var token = NativeMethods.register_progress_notifier(this, tokenPtr, direction, isStreaming, out var ex); + var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), direction, isStreaming, out var ex); ex.ThrowIfNecessary(); return token; } @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token) ex.ThrowIfNecessary(); } - public bool Wait(TaskCompletionSource tcs, ProgressDirection direction) + public void Wait(TaskCompletionSource tcs, ProgressDirection direction) { var tcsHandle = GCHandle.Alloc(tcs); - var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex); + NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex); ex.ThrowIfNecessary(); - return result; } public IntPtr GetRawPointer() diff --git a/Realm/Realm/Handles/SharedRealmHandle.cs b/Realm/Realm/Handles/SharedRealmHandle.cs index f6a6b64e80..6bbb82af7f 100644 --- a/Realm/Realm/Handles/SharedRealmHandle.cs +++ b/Realm/Realm/Handles/SharedRealmHandle.cs @@ -39,9 +39,9 @@ private static class NativeMethods public delegate void GetNativeSchemaCallback(Native.Schema schema, IntPtr managed_callback); [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open", CallingConvention = CallingConvention.Cdecl)] - public static extern IntPtr open(Native.Configuration configuration, - [MarshalAs(UnmanagedType.LPArray), In] Native.SchemaObject[] objects, int objects_length, - [MarshalAs(UnmanagedType.LPArray), In] Native.SchemaProperty[] properties, + public static extern IntPtr open(Configuration configuration, + [MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length, + [MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties, byte[] encryptionKey, out NativeException ex); @@ -94,6 +94,9 @@ private static class NativeMethods [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_resolve_query_reference", CallingConvention = CallingConvention.Cdecl)] public static extern IntPtr resolve_query_reference(SharedRealmHandle sharedRealm, ThreadSafeReferenceHandle referenceHandle, out NativeException ex); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_resolve_realm_reference", CallingConvention = CallingConvention.Cdecl)] + public static extern IntPtr resolve_realm_reference(ThreadSafeReferenceHandle referenceHandle, out NativeException ex); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_write_copy", CallingConvention = CallingConvention.Cdecl)] public static extern void write_copy(SharedRealmHandle sharedRealm, [MarshalAs(UnmanagedType.LPWStr)] string path, IntPtr path_len, byte[] encryptionKey, out NativeException ex); @@ -146,11 +149,18 @@ protected override void Unbind() NativeMethods.destroy(handle); } - public static IntPtr Open(Native.Configuration configuration, RealmSchema schema, byte[] encryptionKey) + public static IntPtr Open(Configuration configuration, RealmSchema schema, byte[] encryptionKey) { var marshaledSchema = new SchemaMarshaler(schema); - var result = NativeMethods.open(configuration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, out NativeException nativeException); + var result = NativeMethods.open(configuration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, out var nativeException); + nativeException.ThrowIfNecessary(); + return result; + } + + public static IntPtr ResolveFromReference(ThreadSafeReferenceHandle referenceHandle) + { + var result = NativeMethods.resolve_realm_reference(referenceHandle, out var nativeException); nativeException.ThrowIfNecessary(); return result; } @@ -248,12 +258,15 @@ public IntPtr ResolveReference(ThreadSafeReference reference) case ThreadSafeReference.Type.Object: result = NativeMethods.resolve_object_reference(this, reference.Handle, out nativeException); break; + case ThreadSafeReference.Type.List: result = NativeMethods.resolve_list_reference(this, reference.Handle, out nativeException); break; + case ThreadSafeReference.Type.Query: result = NativeMethods.resolve_query_reference(this, reference.Handle, out nativeException); break; + default: throw new NotSupportedException(); } @@ -296,9 +309,11 @@ public ObjectHandle CreateObjectWithPrimaryKey(Property pkProperty, object prima case PropertyType.String: var stringKey = (string)primaryKey; return CreateObjectWithPrimaryKey(table, stringKey, update, out isNew); + case PropertyType.Int: var longKey = primaryKey == null ? (long?)null : Convert.ToInt64(primaryKey); return CreateObjectWithPrimaryKey(table, longKey, pkProperty.Type.IsNullable(), update, out isNew); + default: throw new NotSupportedException($"Unexpected primary key of type: {pkProperty.Type}"); } @@ -341,12 +356,12 @@ public static void NotifyRealmChanged(IntPtr stateHandle) public class SchemaMarshaler { - public readonly Native.SchemaObject[] Objects; - public readonly Native.SchemaProperty[] Properties; + public readonly SchemaObject[] Objects; + public readonly SchemaProperty[] Properties; public SchemaMarshaler(RealmSchema schema) { - var properties = new List(); + var properties = new List(); Objects = schema.Select(@object => { @@ -354,7 +369,7 @@ public SchemaMarshaler(RealmSchema schema) properties.AddRange(@object.Select(ForMarshalling)); - return new Native.SchemaObject + return new SchemaObject { name = @object.Name, properties_start = start, @@ -364,9 +379,9 @@ public SchemaMarshaler(RealmSchema schema) Properties = properties.ToArray(); } - public static Native.SchemaProperty ForMarshalling(Property property) + public static SchemaProperty ForMarshalling(Property property) { - return new Native.SchemaProperty + return new SchemaProperty { name = property.Name, type = property.Type, diff --git a/Realm/Realm/Handles/SharedRealmHandleExtensions.cs b/Realm/Realm/Handles/SharedRealmHandleExtensions.cs index 937aa8c9d8..e953d7e881 100644 --- a/Realm/Realm/Handles/SharedRealmHandleExtensions.cs +++ b/Realm/Realm/Handles/SharedRealmHandleExtensions.cs @@ -33,19 +33,27 @@ namespace Realms.Sync { internal static class SharedRealmHandleExtensions { - // This is int, because Interlocked.Exchange cannot work with narrower types such as bool. - private static int _fileSystemConfigured; - // We only save it to avoid allocating the GCHandle multiple times. private static readonly NativeMethods.LogMessageCallback _logCallback; + // This is int, because Interlocked.Exchange cannot work with narrower types such as bool. + private static int _fileSystemConfigured; + private static class NativeMethods { [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync", CallingConvention = CallingConvention.Cdecl)] - public static extern IntPtr open_with_sync(Configuration configuration, Native.SyncConfiguration sync_configuration, + public static extern IntPtr open_with_sync(Configuration configuration, SyncConfiguration sync_configuration, + [MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length, + [MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties, + byte[] encryptionKey, + out NativeException ex); + + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync_async", CallingConvention = CallingConvention.Cdecl)] + public static extern IntPtr open_with_sync_async(Configuration configuration, SyncConfiguration sync_configuration, [MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length, [MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties, byte[] encryptionKey, + IntPtr task_completion_source, out NativeException ex); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] @@ -63,6 +71,9 @@ private static class NativeMethods [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public unsafe delegate void LogMessageCallback(byte* message_buf, IntPtr message_len, LogLevel logLevel); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + public unsafe delegate void OpenRealmCallback(IntPtr task_completion_source, IntPtr shared_realm, int error_code, byte* message_buf, IntPtr message_len); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncmanager_configure", CallingConvention = CallingConvention.Cdecl)] public static extern unsafe void configure([MarshalAs(UnmanagedType.LPWStr)] string base_path, IntPtr base_path_length, [MarshalAs(UnmanagedType.LPWStr)] string user_agent, IntPtr user_agent_length, @@ -70,6 +81,9 @@ private static class NativeMethods [MarshalAs(UnmanagedType.I1)] bool resetMetadataOnError, out NativeException exception); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncmanager_callbacks", CallingConvention = CallingConvention.Cdecl)] + public static extern void install_syncmanager_callbacks(OpenRealmCallback open_callback); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncsession_callbacks", CallingConvention = CallingConvention.Cdecl)] public static extern void install_syncsession_callbacks(RefreshAccessTokenCallbackDelegate refresh_callback, SessionErrorCallback error_callback, SessionProgressCallback progress_callback, SessionWaitCallback wait_callback); @@ -84,7 +98,7 @@ private static class NativeMethods public static extern void reconnect(); [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncmanager_get_session", CallingConvention = CallingConvention.Cdecl)] - public static extern IntPtr get_session([MarshalAs(UnmanagedType.LPWStr)] string path, IntPtr path_len, Native.SyncConfiguration configuration, byte[] encryptionKey, out NativeException ex); + public static extern IntPtr get_session([MarshalAs(UnmanagedType.LPWStr)] string path, IntPtr path_len, SyncConfiguration configuration, byte[] encryptionKey, out NativeException ex); [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncmanager_set_log_level", CallingConvention = CallingConvention.Cdecl)] public static extern unsafe void set_log_level(LogLevel* level, out NativeException exception); @@ -129,11 +143,17 @@ static unsafe SharedRealmHandleExtensions() NativeMethods.install_syncsession_callbacks(refresh, error, progress, wait); + NativeMethods.OpenRealmCallback openRealm = HandleOpenRealmCallback; + + GCHandle.Alloc(openRealm); + + NativeMethods.install_syncmanager_callbacks(openRealm); + _logCallback = HandleLogMessage; GCHandle.Alloc(_logCallback); } - public static SharedRealmHandle OpenWithSync(Configuration configuration, Native.SyncConfiguration syncConfiguration, RealmSchema schema, byte[] encryptionKey) + public static SharedRealmHandle OpenWithSync(Configuration configuration, SyncConfiguration syncConfiguration, RealmSchema schema, byte[] encryptionKey) { DoInitialFileSystemConfiguration(); @@ -145,6 +165,18 @@ public static SharedRealmHandle OpenWithSync(Configuration configuration, Native return new SharedRealmHandle(result); } + public static AsyncOpenTaskHandle OpenWithSyncAsync(Configuration configuration, SyncConfiguration syncConfiguration, RealmSchema schema, byte[] encryptionKey, GCHandle tcsHandle) + { + DoInitialFileSystemConfiguration(); + + var marshaledSchema = new SharedRealmHandle.SchemaMarshaler(schema); + + var asyncTaskPtr = NativeMethods.open_with_sync_async(configuration, syncConfiguration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, GCHandle.ToIntPtr(tcsHandle), out var nativeException); + nativeException.ThrowIfNecessary(); + var asyncTaskHandle = new AsyncOpenTaskHandle(asyncTaskPtr); + return asyncTaskHandle; + } + public static string GetRealmPath(User user, Uri serverUri) { DoInitialFileSystemConfiguration(); @@ -242,7 +274,7 @@ public static void ReconnectSessions() NativeMethods.reconnect(); } - public static SessionHandle GetSession(string path, Native.SyncConfiguration configuration, byte[] encryptionKey) + public static SessionHandle GetSession(string path, SyncConfiguration configuration, byte[] encryptionKey) { DoInitialFileSystemConfiguration(); @@ -318,7 +350,7 @@ private static unsafe void HandleSessionError(IntPtr sessionHandlePtr, ErrorCode [MonoPInvokeCallback(typeof(NativeMethods.SessionProgressCallback))] private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes) { - var token = (SyncProgressObservable.ProgressNotificationToken)GCHandle.FromIntPtr(tokenPtr).Target; + var token = (ProgressNotificationToken)GCHandle.FromIntPtr(tokenPtr).Target; token.Notify(transferredBytes, transferableBytes); } @@ -337,8 +369,8 @@ private static unsafe void HandleSessionWaitCallback(IntPtr taskCompletionSource else { var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code); - const string outerMessage = "A system error occurred while waiting for completion. See InnerException for more details"; - tcs.TrySetException(new RealmException(outerMessage, inner)); + const string OuterMessage = "A system error occurred while waiting for completion. See InnerException for more details"; + tcs.TrySetException(new RealmException(OuterMessage, inner)); } } finally @@ -347,6 +379,24 @@ private static unsafe void HandleSessionWaitCallback(IntPtr taskCompletionSource } } + [MonoPInvokeCallback(typeof(NativeMethods.OpenRealmCallback))] + private static unsafe void HandleOpenRealmCallback(IntPtr taskCompletionSource, IntPtr realm_reference, int error_code, byte* messageBuffer, IntPtr messageLength) + { + var handle = GCHandle.FromIntPtr(taskCompletionSource); + var tcs = (TaskCompletionSource)handle.Target; + + if (error_code == 0) + { + tcs.TrySetResult(new ThreadSafeReferenceHandle(realm_reference, isRealmReference: true)); + } + else + { + var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code); + const string OuterMessage = "A system error occurred while opening a Realm. See InnerException for more details"; + tcs.TrySetException(new RealmException(OuterMessage, inner)); + } + } + [MonoPInvokeCallback(typeof(NativeMethods.LogMessageCallback))] private static unsafe void HandleLogMessage(byte* messageBuffer, IntPtr messageLength, LogLevel level) { @@ -369,4 +419,4 @@ private static unsafe void HandleLogMessage(byte* messageBuffer, IntPtr messageL } } } -} \ No newline at end of file +} diff --git a/Realm/Realm/Handles/ThreadSafeReferenceHandle.cs b/Realm/Realm/Handles/ThreadSafeReferenceHandle.cs index 0760ca0a70..81c921e6d2 100644 --- a/Realm/Realm/Handles/ThreadSafeReferenceHandle.cs +++ b/Realm/Realm/Handles/ThreadSafeReferenceHandle.cs @@ -27,16 +27,30 @@ private static class NativeMethods { [DllImport(InteropConfig.DLL_NAME, EntryPoint = "thread_safe_reference_destroy", CallingConvention = CallingConvention.Cdecl)] public static extern void destroy(IntPtr handle); + + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_thread_safe_reference_destroy", CallingConvention = CallingConvention.Cdecl)] + public static extern void destroy_realm_reference(IntPtr handle); } + private bool _isRealmReference; + [Preserve] - public ThreadSafeReferenceHandle(IntPtr handle) : base(null, handle) + public ThreadSafeReferenceHandle(IntPtr handle, bool isRealmReference = false) : base(null, handle) { + _isRealmReference = isRealmReference; } protected override unsafe void Unbind() { - NativeMethods.destroy(handle); + // This is a bit awkward because ThreadSafeReference doesn't inherit from ThreadSafeReferenceBase + if (_isRealmReference) + { + NativeMethods.destroy_realm_reference(handle); + } + else + { + NativeMethods.destroy(handle); + } } } } diff --git a/Realm/Realm/Realm.cs b/Realm/Realm/Realm.cs index 56ac1883ee..ff1c4ccdf9 100644 --- a/Realm/Realm/Realm.cs +++ b/Realm/Realm/Realm.cs @@ -93,7 +93,8 @@ public static Realm GetInstance(RealmConfigurationBase config = null) /// /// A that is completed once the remote realm is fully synchronized or immediately if it's a local realm. /// A configuration object that describes the realm. - public static Task GetInstanceAsync(RealmConfigurationBase config = null) + /// An optional cancellation token that can be used to cancel the work. + public static Task GetInstanceAsync(RealmConfigurationBase config = null, CancellationToken cancellationToken = default) { if (config == null) { @@ -114,7 +115,7 @@ public static Task GetInstanceAsync(RealmConfigurationBase config = null) schema = RealmSchema.Default; } - return config.CreateRealmAsync(schema); + return config.CreateRealmAsync(schema, cancellationToken); } internal static Realm GetInstance(RealmConfigurationBase config, RealmSchema schema) @@ -187,7 +188,7 @@ private static bool IsRealmOpen(string path) state.GetLiveRealms().Any(); } - #endregion + #endregion static private State _state; @@ -943,7 +944,7 @@ public IList ResolveReference(ThreadSafeReference.List reference) return new RealmResults(this, reference.Metadata, resultsHandle); } - #endregion + #endregion Thread Handover /// /// Removes a persistent object from this Realm, effectively deleting it. @@ -1080,7 +1081,7 @@ internal void ExecuteOutsideTransaction(Action action) } } - #endregion + #endregion Transactions internal class State { diff --git a/Realm/Realm/RealmCollectionBase.cs b/Realm/Realm/RealmCollectionBase.cs index 28d9649f09..f48f9f862b 100644 --- a/Realm/Realm/RealmCollectionBase.cs +++ b/Realm/Realm/RealmCollectionBase.cs @@ -38,7 +38,7 @@ public abstract class RealmCollectionBase ISchemaSource, IThreadConfined { - protected static readonly PropertyType _argumentType = PropertyTypeEx.ToPropertyType(typeof(T), out _); + protected static readonly PropertyType _argumentType = typeof(T).ToPropertyType(out _); private readonly List> _callbacks = new List>(); @@ -143,12 +143,15 @@ internal RealmCollectionBase(Realm realm, RealmObject.Metadata metadata) } return Operator.Convert(Realm.MakeObject(Metadata, objectHandle)); + case PropertyType.String: case PropertyType.String | PropertyType.Nullable: return Operator.Convert(Handle.Value.GetStringAtIndex(index)); + case PropertyType.Data: case PropertyType.Data | PropertyType.Nullable: return Operator.Convert(Handle.Value.GetByteArrayAtIndex(index)); + default: return Handle.Value.GetPrimitiveAtIndex(index, _argumentType).Get(); } @@ -483,4 +486,4 @@ public void Dispose() } } } -} \ No newline at end of file +} diff --git a/Realm/Realm/Server/Notifier/NotifierRealmConfiguration.cs b/Realm/Realm/Server/Notifier/NotifierRealmConfiguration.cs index ede04ca182..9b426c73ce 100755 --- a/Realm/Realm/Server/Notifier/NotifierRealmConfiguration.cs +++ b/Realm/Realm/Server/Notifier/NotifierRealmConfiguration.cs @@ -17,6 +17,7 @@ //////////////////////////////////////////////////////////////////////////// using System; +using System.Threading; using System.Threading.Tasks; using Realms.Schema; @@ -41,7 +42,7 @@ internal override Realm CreateRealm(RealmSchema schema) return new Realm(handle, this, schema); } - internal override Task CreateRealmAsync(RealmSchema schema) + internal override Task CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken) { return Task.FromResult(CreateRealm(schema)); } diff --git a/Realm/Realm/Sync/ProgressNotifications/ProgressNotificationToken.cs b/Realm/Realm/Sync/ProgressNotifications/ProgressNotificationToken.cs new file mode 100644 index 0000000000..680bf42861 --- /dev/null +++ b/Realm/Realm/Sync/ProgressNotifications/ProgressNotificationToken.cs @@ -0,0 +1,70 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2019 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +using System; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Realms.Sync +{ + internal class ProgressNotificationToken : IDisposable + { + private readonly ulong _nativeToken; + private readonly GCHandle _gcHandle; + private readonly Action _observer; + private readonly Action _unregister; + + private bool isDisposed; + + public ProgressNotificationToken(Action observer, Func register, Action unregister) + { + _observer = observer; + _gcHandle = GCHandle.Alloc(this); + _unregister = unregister; + try + { + _nativeToken = register(_gcHandle); + } + catch + { + _gcHandle.Free(); + throw; + } + } + + public void Notify(ulong transferredBytes, ulong transferableBytes) + { + Task.Run(() => + { + _observer(new SyncProgress(transferredBytes, transferableBytes)); + }); + } + + public void Dispose() + { + if (!isDisposed) + { + GC.SuppressFinalize(this); + + isDisposed = true; + _unregister(_nativeToken); + _gcHandle.Free(); + } + } + } +} diff --git a/Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs b/Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs index e4e3084012..1d9ef85f7d 100644 --- a/Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs +++ b/Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs @@ -43,5 +43,7 @@ internal SyncProgress(ulong transferred, ulong transferable) TransferredBytes = transferred; TransferableBytes = transferable; } + + internal bool IsComplete => TransferableBytes == TransferredBytes; } } diff --git a/Realm/Realm/Sync/ProgressNotifications/SyncProgressObservable.cs b/Realm/Realm/Sync/ProgressNotifications/SyncProgressObservable.cs index 97c1a61757..3536f15260 100644 --- a/Realm/Realm/Sync/ProgressNotifications/SyncProgressObservable.cs +++ b/Realm/Realm/Sync/ProgressNotifications/SyncProgressObservable.cs @@ -17,8 +17,6 @@ //////////////////////////////////////////////////////////////////////////// using System; -using System.Runtime.InteropServices; -using System.Threading.Tasks; namespace Realms.Sync { @@ -37,67 +35,14 @@ public SyncProgressObservable(Session session, ProgressDirection direction, Prog public IDisposable Subscribe(IObserver observer) { - return new ProgressNotificationToken(_session, observer, _direction, _mode); - } - - public class ProgressNotificationToken : IDisposable - { - private readonly ulong _nativeToken; - private readonly ProgressMode _mode; - private readonly GCHandle _gcHandle; - - private bool isDisposed; - private Session _session; - private IObserver _observer; - - public ProgressNotificationToken(Session session, - IObserver observer, - ProgressDirection direction, - ProgressMode mode) + return new ProgressNotificationToken(progress => { - _session = session; - _observer = observer; - _mode = mode; - _gcHandle = GCHandle.Alloc(this); - try + observer.OnNext(progress); + if (_mode == ProgressMode.ForCurrentlyOutstandingWork && progress.IsComplete) { - _nativeToken = _session.Handle.RegisterProgressNotifier(GCHandle.ToIntPtr(_gcHandle), direction, mode); - } - catch - { - _gcHandle.Free(); - throw; - } - } - - public void Notify(ulong transferredBytes, ulong transferableBytes) - { - Task.Run(() => - { - _observer.OnNext(new SyncProgress(transferredBytes, transferableBytes)); - - if (_mode == ProgressMode.ForCurrentlyOutstandingWork && - transferredBytes >= transferableBytes) - { - _observer.OnCompleted(); - } - }); - } - - public void Dispose() - { - if (!isDisposed) - { - GC.SuppressFinalize(this); - - isDisposed = true; - - _session.Handle.UnregisterProgressNotifier(_nativeToken); - _gcHandle.Free(); - _session = null; - _observer = null; + observer.OnCompleted(); } - } + }, handle => _session.Handle.RegisterProgressNotifier(handle, _direction, _mode), _session.Handle.UnregisterProgressNotifier); } } -} \ No newline at end of file +} diff --git a/Realm/Realm/Sync/Session.cs b/Realm/Realm/Sync/Session.cs index dedd4a8b60..c6234ed0e4 100644 --- a/Realm/Realm/Sync/Session.cs +++ b/Realm/Realm/Sync/Session.cs @@ -112,11 +112,7 @@ public IObservable GetProgressObservable(ProgressDirection directi public Task WaitForUploadAsync() { var tcs = new TaskCompletionSource(); - if (!Handle.Wait(tcs, ProgressDirection.Upload)) - { - throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state"); - } - + Handle.Wait(tcs, ProgressDirection.Upload); return tcs.Task; } @@ -128,11 +124,7 @@ public Task WaitForUploadAsync() public Task WaitForDownloadAsync() { var tcs = new TaskCompletionSource(); - if (!Handle.Wait(tcs, ProgressDirection.Download)) - { - throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state"); - } - + Handle.Wait(tcs, ProgressDirection.Download); return tcs.Task; } diff --git a/Tests/Realm.Tests/Database/InstanceTests.cs b/Tests/Realm.Tests/Database/InstanceTests.cs index bd895150d4..549b758501 100644 --- a/Tests/Realm.Tests/Database/InstanceTests.cs +++ b/Tests/Realm.Tests/Database/InstanceTests.cs @@ -21,9 +21,7 @@ using System.Linq; using System.Reflection; using System.Threading.Tasks; -using Nito.AsyncEx; using NUnit.Framework; -using Realms; using Realms.Exceptions; using Realms.Schema; @@ -535,6 +533,7 @@ public void UsingDisposedRealm_ShouldThrowObjectDisposedException() #if WINDOWS_UWP [Ignore("Locks on .NET Native")] #endif + [Test] public void GetInstanceAsync_ExecutesMigrationsInBackground() { @@ -800,4 +799,4 @@ private static void AddDummyData(Realm realm) } } } -} \ No newline at end of file +} diff --git a/Tests/Realm.Tests/Realm.Tests.csproj b/Tests/Realm.Tests/Realm.Tests.csproj index b54c3e8067..a7ba0dd364 100644 --- a/Tests/Realm.Tests/Realm.Tests.csproj +++ b/Tests/Realm.Tests/Realm.Tests.csproj @@ -72,4 +72,10 @@ + + + + + + \ No newline at end of file diff --git a/Tests/Realm.Tests/Server/ShouldHandleTests.cs b/Tests/Realm.Tests/Server/ShouldHandleTests.cs index 4ab23972f4..619a1fab3d 100644 --- a/Tests/Realm.Tests/Server/ShouldHandleTests.cs +++ b/Tests/Realm.Tests/Server/ShouldHandleTests.cs @@ -18,7 +18,6 @@ using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; using NUnit.Framework; using Realms.Server; using Realms.Tests.Sync; @@ -68,7 +67,7 @@ public void ShouldHandle_WhenRealmExists_InvokedOnStart() var containsNewRealm = await TestHelpers.EnsureAsync(() => paths.Contains($"/{userId5}/newlyaddedrealm"), retryDelay: 100, - attempts: 10); // 1 second + attempts: 100); // 10 seconds Assert.True(containsNewRealm); } diff --git a/Tests/Realm.Tests/Sync/SessionTests.cs b/Tests/Realm.Tests/Sync/SessionTests.cs index aa81492147..3be3decd87 100644 --- a/Tests/Realm.Tests/Sync/SessionTests.cs +++ b/Tests/Realm.Tests/Sync/SessionTests.cs @@ -107,6 +107,7 @@ public void Session_Error_ShouldPassCorrectSession() } [Test] + [Ignore("This is no longer relevant with the automatic client reset recovery.")] public void Session_DivergingHistories_ShouldRaiseClientResetException() { TestHelpers.RunAsyncTest(async () => diff --git a/Tests/Realm.Tests/Sync/SyncTestBase.cs b/Tests/Realm.Tests/Sync/SyncTestBase.cs index 5ee5afb0ab..b01ac958a1 100644 --- a/Tests/Realm.Tests/Sync/SyncTestBase.cs +++ b/Tests/Realm.Tests/Sync/SyncTestBase.cs @@ -18,6 +18,7 @@ using System.Collections.Generic; using System.IO; +using System.Threading; using System.Threading.Tasks; using Realms.Sync; @@ -106,12 +107,12 @@ protected Realm GetRealm(RealmConfigurationBase config) return result; } - protected async Task GetRealmAsync(RealmConfigurationBase config, bool openAsync = true) + protected async Task GetRealmAsync(RealmConfigurationBase config, bool openAsync = true, CancellationToken cancellationToken = default(CancellationToken)) { Realm result; if (openAsync) { - result = await Realm.GetInstanceAsync(config); + result = await Realm.GetInstanceAsync(config, cancellationToken); } else { diff --git a/Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs b/Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs index 5012857d49..e575ea15cb 100644 --- a/Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs +++ b/Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs @@ -19,8 +19,8 @@ using System; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; -using Nito.AsyncEx; using NUnit.Framework; using Realms.Exceptions; using Realms.Schema; @@ -30,11 +30,12 @@ namespace Realms.Tests.Sync { - using ExplicitAttribute = NUnit.Framework.ExplicitAttribute; - [TestFixture, Preserve(AllMembers = true)] public class SynchronizedInstanceTests : SyncTestBase { + private const int OneMegabyte = 1024 * 1024; + private const int NumberOfObjects = 20; + [TestCase(true, true)] [TestCase(true, false)] [TestCase(false, true)] @@ -178,45 +179,62 @@ public void GetInstanceAsync_ReportsProgress() { SyncTestHelpers.RunRosTestAsync(async () => { - var realmPath = Guid.NewGuid().ToString(); - var user = await SyncTestHelpers.GetUserAsync(); - var config = new FullSyncConfiguration(new Uri($"/~/{realmPath}", UriKind.Relative), user, Guid.NewGuid().ToString()); - const int ObjectSize = 1000000; - const int ObjectsToRecord = 20; - using (var realm = GetRealm(config)) - { - for (var i = 0; i < ObjectsToRecord; i++) - { - realm.Write(() => - { - realm.Add(new HugeSyncObject(ObjectSize)); - }); - } - - await SyncTestHelpers.WaitForSyncAsync(realm); - } + var config = await SyncTestHelpers.GetIntegrationConfigAsync("foo"); + await PopulateData(config); var callbacksInvoked = 0; var lastProgress = default(SyncProgress); - config = new FullSyncConfiguration(new Uri($"/~/{realmPath}", UriKind.Relative), user, Guid.NewGuid().ToString()) + config = new FullSyncConfiguration(config.ServerUri, config.User, config.DatabasePath + "1") { OnProgress = (progress) => { callbacksInvoked++; - lastProgress = progress; + lastProgress = progress; } }; using (var realm = await GetRealmAsync(config)) { - Assert.That(realm.All().Count(), Is.EqualTo(ObjectsToRecord)); + Assert.That(realm.All().Count(), Is.EqualTo(NumberOfObjects)); Assert.That(callbacksInvoked, Is.GreaterThan(0)); Assert.That(lastProgress.TransferableBytes, Is.EqualTo(lastProgress.TransferredBytes)); } }); } + [Test] + public void GetInstanceAsync_Cancel_ShouldCancelWait() + { + SyncTestHelpers.RunRosTestAsync(async () => + { + var config = await SyncTestHelpers.GetIntegrationConfigAsync("foo"); + await PopulateData(config); + // Update config to make sure we're not opening the same Realm file. + config = new FullSyncConfiguration(config.ServerUri, config.User, config.DatabasePath + "1"); + + using (var cts = new CancellationTokenSource()) + { + var _ = Task.Run(async () => + { + await Task.Delay(1); + cts.Cancel(); + }); + + try + { + var realm = await Realm.GetInstanceAsync(config, cts.Token); + CleanupOnTearDown(realm); + Assert.Fail("Expected task to be cancelled."); + } + catch (Exception ex) + { + Assert.That(ex, Is.InstanceOf()); + } + } + }); + } + [Test] public void GetInstance_WhenDynamic_ReadsSchemaFromDisk() { @@ -317,6 +335,22 @@ private static void AddDummyData(Realm realm, bool singleTransaction) } } + private async Task PopulateData(FullSyncConfiguration config) + { + using (var realm = GetRealm(config)) + { + realm.Write(() => + { + for (var i = 0; i < NumberOfObjects; i++) + { + realm.Add(new HugeSyncObject(OneMegabyte)); + } + }); + + await GetSession(realm).WaitForUploadAsync(); + } + } + /* Code to generate the legacy Realm private static async Task GenerateLegacyRealm(bool encrypt) { @@ -342,4 +376,4 @@ private static async Task GenerateLegacyRealm(bool encrypt) return config.DatabasePath; }*/ } -} \ No newline at end of file +} diff --git a/wrappers/src/CMakeLists.txt b/wrappers/src/CMakeLists.txt index c3963c539c..e177150700 100644 --- a/wrappers/src/CMakeLists.txt +++ b/wrappers/src/CMakeLists.txt @@ -27,6 +27,7 @@ set(HEADERS if(REALM_ENABLE_SYNC) list(APPEND SOURCES + async_open_task_cs.cpp sync_manager_cs.cpp sync_session_cs.cpp sync_user_cs.cpp diff --git a/wrappers/src/async_open_task_cs.cpp b/wrappers/src/async_open_task_cs.cpp new file mode 100644 index 0000000000..fb581a588e --- /dev/null +++ b/wrappers/src/async_open_task_cs.cpp @@ -0,0 +1,59 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2019 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include +#include "error_handling.hpp" +#include "marshalling.hpp" +#include "realm_export_decls.hpp" +#include "sync/async_open_task.hpp" +#include "sync_session_cs.hpp" + +using namespace realm; +using namespace realm::binding; +using SharedAsyncOpenTask = std::shared_ptr; + +extern "C" { +REALM_EXPORT void realm_asyncopentask_destroy(SharedAsyncOpenTask* task) +{ + delete task; +} + +REALM_EXPORT void realm_asyncopentask_cancel(SharedAsyncOpenTask& task, NativeException::Marshallable& ex) +{ + handle_errors(ex, [&] { + task->cancel(); + }); +} + +REALM_EXPORT uint64_t realm_asyncopentask_register_progress_notifier(const SharedAsyncOpenTask& task, void* managed_state, NativeException::Marshallable& ex) +{ + return handle_errors(ex, [&] { + return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable) { + s_progress_callback(managed_state, transferred, transferable); + }); + }); +} + +REALM_EXPORT void realm_asyncopentask_unregister_progress_notifier(const SharedAsyncOpenTask& task, uint64_t token, NativeException::Marshallable& ex) +{ + return handle_errors(ex, [&] { + task->unregister_download_progress_notifier(token); + }); +} + +} diff --git a/wrappers/src/results_cs.cpp b/wrappers/src/results_cs.cpp index 26418642bc..6029277829 100644 --- a/wrappers/src/results_cs.cpp +++ b/wrappers/src/results_cs.cpp @@ -29,6 +29,7 @@ #include "schema_cs.hpp" #include #include +#include "keypath_helpers.hpp" using namespace realm; using namespace realm::binding; @@ -138,7 +139,7 @@ REALM_EXPORT Results* results_get_filtered_results(const Results& results, uint1 parser::ParserResult result = parser::parse(query_string.to_string()); parser::KeyPathMapping mapping; - alias_backlinks(mapping, realm); + realm::alias_backlinks(mapping, *realm); query_builder::NoArguments no_args; query_builder::apply_predicate(query, result.predicate, no_args, mapping); @@ -172,12 +173,12 @@ REALM_EXPORT Results* results_snapshot(const Results& results, NativeException:: REALM_EXPORT size_t results_find_object(Results& results, const Object& object_ptr, NativeException::Marshallable& ex) { - return handle_errors(ex, [&]() { - if (results.get_realm() != object_ptr.realm()) { - throw ObjectManagedByAnotherRealmException("Can't look up index of an object that belongs to a different Realm."); - } - return results.index_of(object_ptr.row()); - }); + return handle_errors(ex, [&]() { + if (results.get_realm() != object_ptr.realm()) { + throw ObjectManagedByAnotherRealmException("Can't look up index of an object that belongs to a different Realm."); + } + return results.index_of(object_ptr.row()); + }); } } // extern "C" diff --git a/wrappers/src/schema_cs.hpp b/wrappers/src/schema_cs.hpp index c6727a51b2..e25592e0a9 100644 --- a/wrappers/src/schema_cs.hpp +++ b/wrappers/src/schema_cs.hpp @@ -90,19 +90,4 @@ REALM_FORCEINLINE SchemaObject SchemaObject::for_marshalling(const ObjectSchema& util::Optional create_schema(SchemaObject* objects, int objects_length, SchemaProperty* properties); -REALM_FORCEINLINE void alias_backlinks(parser::KeyPathMapping &mapping, const SharedRealm &realm) -{ - const Schema &schema = realm->schema(); - for (auto it = schema.begin(); it != schema.end(); ++it) { - for (const Property &property : it->computed_properties) { - if (property.type == PropertyType::LinkingObjects) { - auto target_object_schema = schema.find(property.object_type); - const TableRef table = ObjectStore::table_for_object_type(realm->read_group(), it->name); - const TableRef target_table = ObjectStore::table_for_object_type(realm->read_group(), target_object_schema->name); - std::string native_name = "@links." + std::string(target_table->get_name()) + "." + property.link_origin_property_name; - mapping.add_mapping(table, property.name, native_name); - } - } - } -} #endif /* defined(SCHEMA_CS_HPP) */ diff --git a/wrappers/src/shared_realm_cs.cpp b/wrappers/src/shared_realm_cs.cpp index b63cc89f68..95a00ae9bf 100644 --- a/wrappers/src/shared_realm_cs.cpp +++ b/wrappers/src/shared_realm_cs.cpp @@ -255,10 +255,22 @@ REALM_EXPORT Results* shared_realm_resolve_query_reference(SharedRealm* realm, T }); } +REALM_EXPORT SharedRealm* shared_realm_resolve_realm_reference(ThreadSafeReference& reference, NativeException::Marshallable& ex) +{ + return handle_errors(ex, [&]() { + return new SharedRealm(Realm::get_shared_realm(std::move(reference))); + }); +} + REALM_EXPORT void thread_safe_reference_destroy(ThreadSafeReferenceBase* reference) { delete reference; } + +REALM_EXPORT void realm_thread_safe_reference_destroy(ThreadSafeReference* reference) +{ + delete reference; +} REALM_EXPORT void shared_realm_write_copy(SharedRealm* realm, uint16_t* path, size_t path_len, char* encryption_key, NativeException::Marshallable& ex) { diff --git a/wrappers/src/subscription_cs.cpp b/wrappers/src/subscription_cs.cpp index 51e1866692..94ee0509af 100644 --- a/wrappers/src/subscription_cs.cpp +++ b/wrappers/src/subscription_cs.cpp @@ -1,10 +1,20 @@ +//////////////////////////////////////////////////////////////////////////// // -// subscription_cs.cpp -// wrappers-sync +// Copyright 2019 Realm Inc. // -// Created by Nikola Irinchev on 3/23/18. -// Copyright © 2018 Realm. All rights reserved. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// #include #include @@ -16,6 +26,7 @@ #include "schema_cs.hpp" #include #include +#include "keypath_helpers.hpp" using namespace realm; using namespace realm::binding; @@ -36,26 +47,17 @@ REALM_EXPORT Subscription* realm_subscription_create(Results& results, uint16_t* return handle_errors(ex, [&]() { auto name = name_len >= 0 ? util::Optional(Utf16StringAccessor(name_buf, name_len).to_string()) : none; auto optional_ttl = time_to_live >= 0 ? util::Optional(time_to_live) : none; - - IncludeDescriptor inclusion_paths; - if (inclusions_len >= 0) { - DescriptorOrdering combined_orderings; - parser::KeyPathMapping mapping; - alias_backlinks(mapping, results.get_realm()); - - for (auto i = 0; i < inclusions_len; i++) { - auto inclusion_path = inclusions[i].value; - DescriptorOrdering ordering; - parser::DescriptorOrderingState ordering_state = parser::parse_include_path(inclusion_path); - query_builder::apply_ordering(ordering, results.get_query().get_table(), ordering_state, mapping); - combined_orderings.append_include(ordering.compile_included_backlinks()); - } - - if (combined_orderings.will_apply_include()) { - inclusion_paths = combined_orderings.compile_included_backlinks(); - } + + std::vector paths; + for (auto i = 0; i < inclusions_len; i++) { + paths.emplace_back(inclusions[i].value); } + parser::KeyPathMapping mapping; + realm::alias_backlinks(mapping, *results.get_realm()); + + auto inclusion_paths = realm::generate_include_from_keypaths(paths, *results.get_realm(), results.get_object_schema(), mapping); + realm::partial_sync::SubscriptionOptions options; options.user_provided_name = name; options.time_to_live_ms = optional_ttl; diff --git a/wrappers/src/sync_manager_cs.cpp b/wrappers/src/sync_manager_cs.cpp index d3b2cd9d29..61b8004fab 100644 --- a/wrappers/src/sync_manager_cs.cpp +++ b/wrappers/src/sync_manager_cs.cpp @@ -32,6 +32,8 @@ #include "sync_session_cs.hpp" #include "sync/impl/sync_metadata.hpp" #include "sync/partial_sync.hpp" +#include "sync/async_open_task.hpp" +#include "thread_safe_reference.hpp" #if REALM_WINDOWS #include @@ -41,9 +43,12 @@ using namespace realm; using namespace realm::binding; using LogMessageDelegate = void(const char* message, size_t message_len, util::Logger::Level level); +using SharedAsyncOpenTask = std::shared_ptr; namespace realm { namespace binding { + void (*s_open_realm_callback)(void* task_completion_source, ThreadSafeReference* ref, int32_t error_code, const char* message, size_t message_len); + class SyncLogger : public util::RootLogger { public: SyncLogger(LogMessageDelegate* delegate) @@ -76,12 +81,61 @@ namespace binding { }; } +Realm::Config get_shared_realm_config(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key) +{ + Realm::Config config; + config.schema_mode = SchemaMode::Additive; + + if (objects_length > 0) { + config.schema = create_schema(objects, objects_length, properties); + } + + config.schema_version = configuration.schema_version; + + std::string realm_url(Utf16StringAccessor(sync_configuration.url, sync_configuration.url_len)); + + config.sync_config = std::make_shared(*sync_configuration.user, realm_url); + config.sync_config->bind_session_handler = bind_session; + config.sync_config->error_handler = handle_session_error; + config.path = Utf16StringAccessor(configuration.path, configuration.path_len); + + // by definition the key is only allowed to be 64 bytes long, enforced by C# code + if (encryption_key) { + auto& key = *reinterpret_cast*>(encryption_key); + + config.encryption_key = std::vector(key.begin(), key.end()); + config.sync_config->realm_encryption_key = key; + } + +#if !REALM_PLATFORM_APPLE + if (sync_configuration.trusted_ca_path) { + Utf16StringAccessor trusted_ca_path(sync_configuration.trusted_ca_path, sync_configuration.trusted_ca_path_len); + config.sync_config->ssl_trust_certificate_path = trusted_ca_path.to_string(); + } +#endif + + config.sync_config->client_validate_ssl = sync_configuration.client_validate_ssl; + config.sync_config->is_partial = sync_configuration.is_partial; + + if (sync_configuration.partial_sync_identifier) { + Utf16StringAccessor partial_sync_identifier(sync_configuration.partial_sync_identifier, sync_configuration.partial_sync_identifier_len); + config.sync_config->custom_partial_sync_identifier = partial_sync_identifier.to_string(); + } + + return config; +} + } using SharedSyncUser = std::shared_ptr; extern "C" { +REALM_EXPORT void realm_install_syncmanager_callbacks(decltype(s_open_realm_callback) open_callback) +{ + s_open_realm_callback = open_callback; +} + REALM_EXPORT void realm_syncmanager_configure(const uint16_t* base_path_buf, size_t base_path_len, const uint16_t* user_agent_buf, size_t user_agent_len, const SyncManager::MetadataMode* mode, const char* encryption_key_buf, bool reset_on_error, @@ -136,47 +190,33 @@ REALM_EXPORT util::Logger::Level realm_syncmanager_get_log_level() return SyncManager::shared().log_level(); } -REALM_EXPORT SharedRealm* shared_realm_open_with_sync(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, NativeException::Marshallable& ex) +REALM_EXPORT SharedAsyncOpenTask* shared_realm_open_with_sync_async(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, void* task_completion_source, NativeException::Marshallable& ex) { - return handle_errors(ex, [&]() { - Realm::Config config; - config.schema_mode = SchemaMode::Additive; - - if (objects_length > 0) { - config.schema = create_schema(objects, objects_length, properties); - } - - config.schema_version = configuration.schema_version; - - std::string realm_url(Utf16StringAccessor(sync_configuration.url, sync_configuration.url_len)); + return handle_errors(ex, [&]() { + auto config = get_shared_realm_config(configuration, sync_configuration, objects, objects_length, properties, encryption_key); - config.sync_config = std::make_shared(*sync_configuration.user, realm_url); - config.sync_config->bind_session_handler = bind_session; - config.sync_config->error_handler = handle_session_error; - config.path = Utf16StringAccessor(configuration.path, configuration.path_len); + auto task = Realm::get_synchronized_realm(config); + task->start([task_completion_source](ThreadSafeReference ref, std::exception_ptr error) { + if (error) { + try { + std::rethrow_exception(error); + } catch (const std::system_error& system_error) { + const std::error_code& ec = system_error.code(); + s_open_realm_callback(task_completion_source, nullptr, ec.value(), ec.message().c_str(), ec.message().length()); + } + } else { + s_open_realm_callback(task_completion_source, new ThreadSafeReference(std::move(ref)), 0, nullptr, 0); + } + }); - // by definition the key is only allowed to be 64 bytes long, enforced by C# code - if (encryption_key) { - auto& key = *reinterpret_cast*>(encryption_key); - - config.encryption_key = std::vector(key.begin(), key.end()); - config.sync_config->realm_encryption_key = key; - } + return new SharedAsyncOpenTask(task); + }); +} -#if !REALM_PLATFORM_APPLE - if (sync_configuration.trusted_ca_path) { - Utf16StringAccessor trusted_ca_path(sync_configuration.trusted_ca_path, sync_configuration.trusted_ca_path_len); - config.sync_config->ssl_trust_certificate_path = trusted_ca_path.to_string(); - } -#endif - - config.sync_config->client_validate_ssl = sync_configuration.client_validate_ssl; - config.sync_config->is_partial = sync_configuration.is_partial; - - if (sync_configuration.partial_sync_identifier) { - Utf16StringAccessor partial_sync_identifier(sync_configuration.partial_sync_identifier, sync_configuration.partial_sync_identifier_len); - config.sync_config->custom_partial_sync_identifier = partial_sync_identifier.to_string(); - } +REALM_EXPORT SharedRealm* shared_realm_open_with_sync(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, NativeException::Marshallable& ex) +{ + return handle_errors(ex, [&]() { + auto config = get_shared_realm_config(configuration, sync_configuration, objects, objects_length, properties, encryption_key); auto realm = Realm::get_shared_realm(config); if (!configuration.read_only) @@ -264,4 +304,3 @@ REALM_EXPORT uint8_t realm_syncmanager_get_object_privileges(SharedRealm& shared } } - diff --git a/wrappers/src/sync_session_cs.cpp b/wrappers/src/sync_session_cs.cpp index fc9e7f424d..e0637dfb40 100644 --- a/wrappers/src/sync_session_cs.cpp +++ b/wrappers/src/sync_session_cs.cpp @@ -34,7 +34,7 @@ namespace realm { namespace binding { void (*s_refresh_access_token_callback)(std::shared_ptr*); void (*s_session_error_callback)(std::shared_ptr*, int32_t error_code, const char* message, size_t message_len, std::pair* user_info_pairs, int user_info_pairs_len); - void (*s_progress_callback)(size_t, uint64_t transferred_bytes, uint64_t transferrable_bytes); + void (*s_progress_callback)(void*, uint64_t transferred_bytes, uint64_t transferrable_bytes); void (*s_wait_callback)(void* task_completion_source, int32_t error_code, const char* message, size_t message_len); void bind_session(const std::string&, const realm::SyncConfig& config, std::shared_ptr session) @@ -140,7 +140,7 @@ enum class CSharpNotifierType : uint8_t { Download = 1 }; -REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedSyncSession& session, size_t managed_state, CSharpNotifierType direction, bool is_streaming, NativeException::Marshallable& ex) +REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedSyncSession& session, void* managed_state, CSharpNotifierType direction, bool is_streaming, NativeException::Marshallable& ex) { return handle_errors(ex, [&] { auto notifier_direction = direction == CSharpNotifierType::Upload @@ -160,17 +160,17 @@ REALM_EXPORT void realm_syncsession_unregister_progress_notifier(const SharedSyn }); } -REALM_EXPORT bool realm_syncsession_wait(const SharedSyncSession& session, void* task_completion_source, CSharpNotifierType direction, NativeException::Marshallable& ex) +REALM_EXPORT void realm_syncsession_wait(const SharedSyncSession& session, void* task_completion_source, CSharpNotifierType direction, NativeException::Marshallable& ex) { - return handle_errors(ex, [&] { + handle_errors(ex, [&] { auto waiter = [task_completion_source](std::error_code error) { s_wait_callback(task_completion_source, error.value(), error.message().c_str(), error.message().length()); }; if (direction == CSharpNotifierType::Upload) { - return session->wait_for_upload_completion(waiter); + session->wait_for_upload_completion(waiter); } else { - return session->wait_for_download_completion(waiter); + session->wait_for_download_completion(waiter); } }); } diff --git a/wrappers/src/sync_session_cs.hpp b/wrappers/src/sync_session_cs.hpp index 8a5b69b79e..218554c8f5 100644 --- a/wrappers/src/sync_session_cs.hpp +++ b/wrappers/src/sync_session_cs.hpp @@ -24,8 +24,10 @@ namespace realm { namespace binding { - REALM_EXPORT void bind_session(const std::string&, const realm::SyncConfig& config, std::shared_ptr session); - REALM_EXPORT void handle_session_error(std::shared_ptr session, SyncError error); + extern void (*s_progress_callback)(void*, uint64_t transferred_bytes, uint64_t transferrable_bytes); + + void bind_session(const std::string&, const realm::SyncConfig& config, std::shared_ptr session); + void handle_session_error(std::shared_ptr session, SyncError error); } }