Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async open from OS #1859

Merged
merged 15 commits into from
Aug 26, 2019
3 changes: 2 additions & 1 deletion Realm/Realm/Configurations/InMemoryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Native;
using Realms.Schema;
Expand Down Expand Up @@ -63,7 +64,7 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(new SharedRealmHandle(srPtr), this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
return Task.FromResult(CreateRealm(schema));
}
Expand Down
8 changes: 8 additions & 0 deletions Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

namespace Realms.Sync
Expand Down Expand Up @@ -65,6 +67,12 @@ public QueryBasedSyncConfiguration(Uri serverUri = null, User user = null, strin
{
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
return base.CreateRealmAsync(schema, cancellationToken);
}

internal override Realm CreateRealm(RealmSchema schema)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
Expand Down
7 changes: 4 additions & 3 deletions Realm/Realm/Configurations/RealmConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Exceptions;
using Realms.Helpers;
Expand Down Expand Up @@ -166,7 +167,7 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
// Can't use async/await due to mono inliner bugs
// If we are on UI thread will be set but often also set on long-lived workers to use Post back to UI thread.
Expand All @@ -177,7 +178,7 @@ internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
using (CreateRealm(schema))
{
}
}).ContinueWith(_ => CreateRealm(schema), scheduler);
}, cancellationToken).ContinueWith(_ => CreateRealm(schema), scheduler);
}

return Task.FromResult(CreateRealm(schema));
Expand All @@ -198,4 +199,4 @@ private static bool ShouldCompactOnLaunchCallback(IntPtr delegatePtr, ulong tota
}
}
}
}
}
5 changes: 3 additions & 2 deletions Realm/Realm/Configurations/RealmConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

Expand Down Expand Up @@ -141,6 +142,6 @@ internal RealmConfigurationBase Clone()

internal abstract Realm CreateRealm(RealmSchema schema);

internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema);
internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken);
}
}
}
65 changes: 53 additions & 12 deletions Realm/Realm/Configurations/SyncConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Helpers;
using Realms.Schema;
Expand Down Expand Up @@ -207,26 +208,66 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override async Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override async Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey));
IDisposable subscription = null;
var configuration = new Realms.Native.Configuration
{
Path = DatabasePath,
schema_version = SchemaVersion,
enable_cache = EnableCache
};

var tcs = new TaskCompletionSource<ThreadSafeReferenceHandle>();
var tcsHandle = GCHandle.Alloc(tcs);
ProgressNotificationToken progressToken = null;
try
{
if (OnProgress != null)
using (var handle = SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey, tcsHandle))
{
var observer = new Observer<SyncProgress>(OnProgress);
subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer);
cancellationToken.Register(() =>
{
if (!handle.IsClosed)
{
handle.Cancel();
tcs.TrySetCanceled();
}
});

if (OnProgress != null)
{
progressToken = new ProgressNotificationToken(
observer: (progress) =>
{
OnProgress(progress);
},
register: handle.RegisterProgressNotifier,
unregister: (token) =>
{
if (!handle.IsClosed)
{
handle.UnregisterProgressNotifier(token);
}
});
}

using (var realmReference = await tcs.Task)
{
var realmPtr = SharedRealmHandle.ResolveFromReference(realmReference);
var sharedRealmHandle = new SharedRealmHandle(realmPtr);
if (IsDynamic && !schema.Any())
{
sharedRealmHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema));
}

return new Realm(sharedRealmHandle, this, schema);
}
}
await session.WaitForDownloadAsync();
}
finally
{
subscription?.Dispose();
session.CloseHandle();
tcsHandle.Free();
progressToken?.Dispose();
}

return CreateRealm(schema);
}

internal Native.SyncConfiguration ToNative()
Expand Down Expand Up @@ -254,4 +295,4 @@ internal static string GetSDKUserAgent()
return $"RealmDotNet/{version} ({RuntimeInformation.FrameworkDescription})";
}
}
}
}
69 changes: 69 additions & 0 deletions Realm/Realm/Handles/AsyncOpenTaskHandle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2019 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Runtime.InteropServices;

namespace Realms
{
internal class AsyncOpenTaskHandle : RealmHandle
{
private static class NativeMethods
{
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_destroy", CallingConvention = CallingConvention.Cdecl)]
public static extern void destroy(IntPtr asyncTaskHandle);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_cancel", CallingConvention = CallingConvention.Cdecl)]
public static extern void cancel(AsyncOpenTaskHandle handle, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_register_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern ulong register_progress_notifier(AsyncOpenTaskHandle handle, IntPtr token_ptr, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_unregister_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern void unregister_progress_notifier(AsyncOpenTaskHandle handle, ulong token, out NativeException ex);
}

public AsyncOpenTaskHandle(IntPtr handle) : base(null, handle)
{
}

public void Cancel()
{
NativeMethods.cancel(this, out var ex);
ex.ThrowIfNecessary();
}

protected override void Unbind()
{
NativeMethods.destroy(handle);
}

public ulong RegisterProgressNotifier(GCHandle managedHandle)
{
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), out var ex);
ex.ThrowIfNecessary();
return token;
}

public void UnregisterProgressNotifier(ulong token)
{
NativeMethods.unregister_progress_notifier(this, token, out var ex);
ex.ThrowIfNecessary();
}
}
}
2 changes: 1 addition & 1 deletion Realm/Realm/Handles/NotifiableObjectHandleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected NotifiableObjectHandleBase(RealmHandle root, IntPtr handle) : base(roo

public IntPtr DestroyNotificationToken(IntPtr token)
{
var result = NativeMethods.destroy_notificationtoken(token, out NativeException nativeException);
var result = NativeMethods.destroy_notificationtoken(token, out var nativeException);
nativeException.ThrowIfNecessary();
return result;
}
Expand Down
11 changes: 5 additions & 6 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static class NativeMethods

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.I1)]
public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);
public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)]
public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal);
Expand Down Expand Up @@ -131,10 +131,10 @@ public void RefreshAccessToken(string accessToken, string serverPath)
ex.ThrowIfNecessary();
}

public ulong RegisterProgressNotifier(IntPtr tokenPtr, ProgressDirection direction, ProgressMode mode)
public ulong RegisterProgressNotifier(GCHandle managedHandle, ProgressDirection direction, ProgressMode mode)
{
var isStreaming = mode == ProgressMode.ReportIndefinitely;
var token = NativeMethods.register_progress_notifier(this, tokenPtr, direction, isStreaming, out var ex);
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), direction, isStreaming, out var ex);
ex.ThrowIfNecessary();
return token;
}
Expand All @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token)
ex.ThrowIfNecessary();
}

public bool Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
public void Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
{
var tcsHandle = GCHandle.Alloc(tcs);
var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
ex.ThrowIfNecessary();
return result;
}

public IntPtr GetRawPointer()
Expand Down
Loading