Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async open from OS #1859

Merged
merged 15 commits into from
Aug 26, 2019
5 changes: 3 additions & 2 deletions Realm/Realm/Configurations/InMemoryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Native;
using Realms.Schema;
Expand Down Expand Up @@ -63,9 +64,9 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(new SharedRealmHandle(srPtr), this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
return Task.FromResult(CreateRealm(schema));
}
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
10 changes: 9 additions & 1 deletion Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

namespace Realms.Sync
Expand Down Expand Up @@ -65,10 +67,16 @@ public QueryBasedSyncConfiguration(Uri serverUri = null, User user = null, strin
{
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
return base.CreateRealmAsync(schema, cancellationToken);
}

internal override Realm CreateRealm(RealmSchema schema)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
return base.CreateRealm(schema);
}
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 3 additions & 2 deletions Realm/Realm/Configurations/RealmConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Exceptions;
using Realms.Helpers;
Expand Down Expand Up @@ -166,7 +167,7 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
// Can't use async/await due to mono inliner bugs
// If we are on UI thread will be set but often also set on long-lived workers to use Post back to UI thread.
Expand All @@ -177,7 +178,7 @@ internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
using (CreateRealm(schema))
{
}
}).ContinueWith(_ => CreateRealm(schema), scheduler);
}, cancellationToken).ContinueWith(_ => CreateRealm(schema), scheduler);
}

return Task.FromResult(CreateRealm(schema));
Expand Down
3 changes: 2 additions & 1 deletion Realm/Realm/Configurations/RealmConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

Expand Down Expand Up @@ -141,6 +142,6 @@ internal RealmConfigurationBase Clone()

internal abstract Realm CreateRealm(RealmSchema schema);

internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema);
internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken);
}
}
63 changes: 52 additions & 11 deletions Realm/Realm/Configurations/SyncConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Helpers;
using Realms.Schema;
Expand Down Expand Up @@ -207,26 +208,66 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override async Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override async Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey));
IDisposable subscription = null;
var configuration = new Realms.Native.Configuration
{
Path = DatabasePath,
schema_version = SchemaVersion,
enable_cache = EnableCache
};

var tcs = new TaskCompletionSource<ThreadSafeReferenceHandle>();
var tcsHandle = GCHandle.Alloc(tcs);
ProgressNotificationToken progressToken = null;
try
{
if (OnProgress != null)
using (var handle = SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey, tcsHandle))
{
var observer = new Observer<SyncProgress>(OnProgress);
subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer);
cancellationToken.Register(() =>
{
if (!handle.IsClosed)
{
handle.Cancel();
tcs.TrySetCanceled();
}
});

if (OnProgress != null)
{
progressToken = new ProgressNotificationToken(
observer: (progress) =>
{
OnProgress(progress);
},
register: handle.RegisterProgressNotifier,
unregister: (token) =>
{
if (!handle.IsClosed)
{
handle.UnregisterProgressNotifier(token);
}
});
}

using (var realmReference = await tcs.Task)
{
var realmPtr = SharedRealmHandle.ResolveFromReference(realmReference);
var sharedRealmHandle = new SharedRealmHandle(realmPtr);
if (IsDynamic && !schema.Any())
{
sharedRealmHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema));
}

return new Realm(sharedRealmHandle, this, schema);
}
}
await session.WaitForDownloadAsync();
}
finally
{
subscription?.Dispose();
session.CloseHandle();
tcsHandle.Free();
progressToken?.Dispose();
}

return CreateRealm(schema);
}

internal Native.SyncConfiguration ToNative()
Expand Down
69 changes: 69 additions & 0 deletions Realm/Realm/Handles/AsyncOpenTaskHandle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2019 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Runtime.InteropServices;

namespace Realms
{
internal class AsyncOpenTaskHandle : RealmHandle
{
private static class NativeMethods
{
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_async_open_task_destroy", CallingConvention = CallingConvention.Cdecl)]
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
public static extern void destroy(IntPtr asyncTaskHandle);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_async_open_task_cancel", CallingConvention = CallingConvention.Cdecl)]
public static extern void cancel(AsyncOpenTaskHandle handle, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_async_open_task_register_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern ulong register_progress_notifier(AsyncOpenTaskHandle handle, IntPtr token_ptr, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_async_open_task_unregister_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern void unregister_progress_notifier(AsyncOpenTaskHandle handle, ulong token, out NativeException ex);
}

public AsyncOpenTaskHandle(IntPtr handle) : base(null, handle)
{
}

public void Cancel()
{
NativeMethods.cancel(this, out var ex);
ex.ThrowIfNecessary();
}

protected override void Unbind()
{
NativeMethods.destroy(handle);
}

public ulong RegisterProgressNotifier(GCHandle managedHandle)
{
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), out var ex);
ex.ThrowIfNecessary();
return token;
}

public void UnregisterProgressNotifier(ulong token)
{
NativeMethods.unregister_progress_notifier(this, token, out var ex);
ex.ThrowIfNecessary();
}
}
}
4 changes: 2 additions & 2 deletions Realm/Realm/Handles/NotifiableObjectHandleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected NotifiableObjectHandleBase(RealmHandle root, IntPtr handle) : base(roo

public IntPtr DestroyNotificationToken(IntPtr token)
{
var result = NativeMethods.destroy_notificationtoken(token, out NativeException nativeException);
var result = NativeMethods.destroy_notificationtoken(token, out var nativeException);
nativeException.ThrowIfNecessary();
return result;
}
Expand All @@ -66,4 +66,4 @@ public IntPtr DestroyNotificationToken(IntPtr token)

public abstract ThreadSafeReferenceHandle GetThreadSafeReference();
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
13 changes: 6 additions & 7 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static class NativeMethods

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.I1)]
public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);
public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)]
public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal);
Expand Down Expand Up @@ -131,10 +131,10 @@ public void RefreshAccessToken(string accessToken, string serverPath)
ex.ThrowIfNecessary();
}

public ulong RegisterProgressNotifier(IntPtr tokenPtr, ProgressDirection direction, ProgressMode mode)
public ulong RegisterProgressNotifier(GCHandle managedHandle, ProgressDirection direction, ProgressMode mode)
{
var isStreaming = mode == ProgressMode.ReportIndefinitely;
var token = NativeMethods.register_progress_notifier(this, tokenPtr, direction, isStreaming, out var ex);
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), direction, isStreaming, out var ex);
ex.ThrowIfNecessary();
return token;
}
Expand All @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token)
ex.ThrowIfNecessary();
}

public bool Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
public void Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
{
var tcsHandle = GCHandle.Alloc(tcs);
var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
ex.ThrowIfNecessary();
return result;
}

public IntPtr GetRawPointer()
Expand Down Expand Up @@ -187,4 +186,4 @@ protected override void Unbind()
NativeMethods.destroy(handle);
}
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
32 changes: 21 additions & 11 deletions Realm/Realm/Handles/SharedRealmHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ private static class NativeMethods
public delegate void GetNativeSchemaCallback(Native.Schema schema, IntPtr managed_callback);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open", CallingConvention = CallingConvention.Cdecl)]
public static extern IntPtr open(Native.Configuration configuration,
[MarshalAs(UnmanagedType.LPArray), In] Native.SchemaObject[] objects, int objects_length,
[MarshalAs(UnmanagedType.LPArray), In] Native.SchemaProperty[] properties,
public static extern IntPtr open(Configuration configuration,
[MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length,
[MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties,
byte[] encryptionKey,
out NativeException ex);

Expand Down Expand Up @@ -94,6 +94,9 @@ private static class NativeMethods
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_resolve_query_reference", CallingConvention = CallingConvention.Cdecl)]
public static extern IntPtr resolve_query_reference(SharedRealmHandle sharedRealm, ThreadSafeReferenceHandle referenceHandle, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_resolve_realm_reference", CallingConvention = CallingConvention.Cdecl)]
public static extern IntPtr resolve_realm_reference(ThreadSafeReferenceHandle referenceHandle, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_write_copy", CallingConvention = CallingConvention.Cdecl)]
public static extern void write_copy(SharedRealmHandle sharedRealm, [MarshalAs(UnmanagedType.LPWStr)] string path, IntPtr path_len, byte[] encryptionKey, out NativeException ex);

Expand Down Expand Up @@ -146,11 +149,18 @@ protected override void Unbind()
NativeMethods.destroy(handle);
}

public static IntPtr Open(Native.Configuration configuration, RealmSchema schema, byte[] encryptionKey)
public static IntPtr Open(Configuration configuration, RealmSchema schema, byte[] encryptionKey)
{
var marshaledSchema = new SchemaMarshaler(schema);

var result = NativeMethods.open(configuration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, out NativeException nativeException);
var result = NativeMethods.open(configuration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, out var nativeException);
nativeException.ThrowIfNecessary();
return result;
}

public static IntPtr ResolveFromReference(ThreadSafeReferenceHandle referenceHandle)
{
var result = NativeMethods.resolve_realm_reference(referenceHandle, out var nativeException);
nativeException.ThrowIfNecessary();
return result;
}
Expand Down Expand Up @@ -341,20 +351,20 @@ public static void NotifyRealmChanged(IntPtr stateHandle)

public class SchemaMarshaler
{
public readonly Native.SchemaObject[] Objects;
public readonly Native.SchemaProperty[] Properties;
public readonly SchemaObject[] Objects;
public readonly SchemaProperty[] Properties;

public SchemaMarshaler(RealmSchema schema)
{
var properties = new List<Native.SchemaProperty>();
var properties = new List<SchemaProperty>();

Objects = schema.Select(@object =>
{
var start = properties.Count;

properties.AddRange(@object.Select(ForMarshalling));

return new Native.SchemaObject
return new SchemaObject
{
name = @object.Name,
properties_start = start,
Expand All @@ -364,9 +374,9 @@ public SchemaMarshaler(RealmSchema schema)
Properties = properties.ToArray();
}

public static Native.SchemaProperty ForMarshalling(Property property)
public static SchemaProperty ForMarshalling(Property property)
{
return new Native.SchemaProperty
return new SchemaProperty
{
name = property.Name,
type = property.Type,
Expand Down
Loading