Skip to content

Commit

Permalink
Use async open from OS (#1859)
Browse files Browse the repository at this point in the history
* Ignore the failing test (+5 squashed commits)
Squashed commits:
[ffc0d1d] Try to fix a sporadic test failure
[69c21e9] Use ROS 3.21.1
[c7a9714] Update OS
[697bbe6] Try to build android
[0e6f7cd] Use async open from OS

* wip

* wip

* still wip

* Fix double free-ing the gchandle

* Disable android tests

* wire up some native methods

* Address comments

* Revert jenkinsfile changes

* remove rogue launchsettings

* Update Realm/Realm/Realm.cs

Co-Authored-By: Yavor Georgiev <fealebenpae@users.noreply.github.com>

* Update Tests/Realm.Tests/Sync/SyncTestBase.cs

Co-Authored-By: Yavor Georgiev <fealebenpae@users.noreply.github.com>

* Apply suggestions from code review

Co-Authored-By: Yavor Georgiev <fealebenpae@users.noreply.github.com>

* address comments

* Don't use default
  • Loading branch information
nirinchev committed Aug 26, 2019
1 parent e9a4b2c commit 325d736
Show file tree
Hide file tree
Showing 33 changed files with 603 additions and 249 deletions.
3 changes: 2 additions & 1 deletion Realm/Realm/Configurations/InMemoryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Native;
using Realms.Schema;
Expand Down Expand Up @@ -63,7 +64,7 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(new SharedRealmHandle(srPtr), this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
return Task.FromResult(CreateRealm(schema));
}
Expand Down
8 changes: 8 additions & 0 deletions Realm/Realm/Configurations/QueryBasedSyncConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

namespace Realms.Sync
Expand Down Expand Up @@ -65,6 +67,12 @@ public QueryBasedSyncConfiguration(Uri serverUri = null, User user = null, strin
{
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
return base.CreateRealmAsync(schema, cancellationToken);
}

internal override Realm CreateRealm(RealmSchema schema)
{
schema = RealmSchema.CreateSchemaForClasses(_queryBasedPermissionTypes, schema);
Expand Down
7 changes: 4 additions & 3 deletions Realm/Realm/Configurations/RealmConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Exceptions;
using Realms.Helpers;
Expand Down Expand Up @@ -166,7 +167,7 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
// Can't use async/await due to mono inliner bugs
// If we are on UI thread will be set but often also set on long-lived workers to use Post back to UI thread.
Expand All @@ -177,7 +178,7 @@ internal override Task<Realm> CreateRealmAsync(RealmSchema schema)
using (CreateRealm(schema))
{
}
}).ContinueWith(_ => CreateRealm(schema), scheduler);
}, cancellationToken).ContinueWith(_ => CreateRealm(schema), scheduler);
}

return Task.FromResult(CreateRealm(schema));
Expand All @@ -198,4 +199,4 @@ private static bool ShouldCompactOnLaunchCallback(IntPtr delegatePtr, ulong tota
}
}
}
}
}
5 changes: 3 additions & 2 deletions Realm/Realm/Configurations/RealmConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Realms.Schema;

Expand Down Expand Up @@ -141,6 +142,6 @@ internal RealmConfigurationBase Clone()

internal abstract Realm CreateRealm(RealmSchema schema);

internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema);
internal abstract Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken);
}
}
}
65 changes: 53 additions & 12 deletions Realm/Realm/Configurations/SyncConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Realms.Helpers;
using Realms.Schema;
Expand Down Expand Up @@ -207,26 +208,66 @@ internal override Realm CreateRealm(RealmSchema schema)
return new Realm(srHandle, this, schema);
}

internal override async Task<Realm> CreateRealmAsync(RealmSchema schema)
internal override async Task<Realm> CreateRealmAsync(RealmSchema schema, CancellationToken cancellationToken)
{
var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey));
IDisposable subscription = null;
var configuration = new Realms.Native.Configuration
{
Path = DatabasePath,
schema_version = SchemaVersion,
enable_cache = EnableCache
};

var tcs = new TaskCompletionSource<ThreadSafeReferenceHandle>();
var tcsHandle = GCHandle.Alloc(tcs);
ProgressNotificationToken progressToken = null;
try
{
if (OnProgress != null)
using (var handle = SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey, tcsHandle))
{
var observer = new Observer<SyncProgress>(OnProgress);
subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer);
cancellationToken.Register(() =>
{
if (!handle.IsClosed)
{
handle.Cancel();
tcs.TrySetCanceled();
}
});

if (OnProgress != null)
{
progressToken = new ProgressNotificationToken(
observer: (progress) =>
{
OnProgress(progress);
},
register: handle.RegisterProgressNotifier,
unregister: (token) =>
{
if (!handle.IsClosed)
{
handle.UnregisterProgressNotifier(token);
}
});
}

using (var realmReference = await tcs.Task)
{
var realmPtr = SharedRealmHandle.ResolveFromReference(realmReference);
var sharedRealmHandle = new SharedRealmHandle(realmPtr);
if (IsDynamic && !schema.Any())
{
sharedRealmHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema));
}

return new Realm(sharedRealmHandle, this, schema);
}
}
await session.WaitForDownloadAsync();
}
finally
{
subscription?.Dispose();
session.CloseHandle();
tcsHandle.Free();
progressToken?.Dispose();
}

return CreateRealm(schema);
}

internal Native.SyncConfiguration ToNative()
Expand Down Expand Up @@ -254,4 +295,4 @@ internal static string GetSDKUserAgent()
return $"RealmDotNet/{version} ({RuntimeInformation.FrameworkDescription})";
}
}
}
}
69 changes: 69 additions & 0 deletions Realm/Realm/Handles/AsyncOpenTaskHandle.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2019 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Runtime.InteropServices;

namespace Realms
{
internal class AsyncOpenTaskHandle : RealmHandle
{
private static class NativeMethods
{
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_destroy", CallingConvention = CallingConvention.Cdecl)]
public static extern void destroy(IntPtr asyncTaskHandle);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_cancel", CallingConvention = CallingConvention.Cdecl)]
public static extern void cancel(AsyncOpenTaskHandle handle, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_register_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern ulong register_progress_notifier(AsyncOpenTaskHandle handle, IntPtr token_ptr, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_asyncopentask_unregister_progress_notifier", CallingConvention = CallingConvention.Cdecl)]
public static extern void unregister_progress_notifier(AsyncOpenTaskHandle handle, ulong token, out NativeException ex);
}

public AsyncOpenTaskHandle(IntPtr handle) : base(null, handle)
{
}

public void Cancel()
{
NativeMethods.cancel(this, out var ex);
ex.ThrowIfNecessary();
}

protected override void Unbind()
{
NativeMethods.destroy(handle);
}

public ulong RegisterProgressNotifier(GCHandle managedHandle)
{
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), out var ex);
ex.ThrowIfNecessary();
return token;
}

public void UnregisterProgressNotifier(ulong token)
{
NativeMethods.unregister_progress_notifier(this, token, out var ex);
ex.ThrowIfNecessary();
}
}
}
2 changes: 1 addition & 1 deletion Realm/Realm/Handles/NotifiableObjectHandleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected NotifiableObjectHandleBase(RealmHandle root, IntPtr handle) : base(roo

public IntPtr DestroyNotificationToken(IntPtr token)
{
var result = NativeMethods.destroy_notificationtoken(token, out NativeException nativeException);
var result = NativeMethods.destroy_notificationtoken(token, out var nativeException);
nativeException.ThrowIfNecessary();
return result;
}
Expand Down
11 changes: 5 additions & 6 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static class NativeMethods

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.I1)]
public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);
public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)]
public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal);
Expand Down Expand Up @@ -131,10 +131,10 @@ public void RefreshAccessToken(string accessToken, string serverPath)
ex.ThrowIfNecessary();
}

public ulong RegisterProgressNotifier(IntPtr tokenPtr, ProgressDirection direction, ProgressMode mode)
public ulong RegisterProgressNotifier(GCHandle managedHandle, ProgressDirection direction, ProgressMode mode)
{
var isStreaming = mode == ProgressMode.ReportIndefinitely;
var token = NativeMethods.register_progress_notifier(this, tokenPtr, direction, isStreaming, out var ex);
var token = NativeMethods.register_progress_notifier(this, GCHandle.ToIntPtr(managedHandle), direction, isStreaming, out var ex);
ex.ThrowIfNecessary();
return token;
}
Expand All @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token)
ex.ThrowIfNecessary();
}

public bool Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
public void Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
{
var tcsHandle = GCHandle.Alloc(tcs);
var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
ex.ThrowIfNecessary();
return result;
}

public IntPtr GetRawPointer()
Expand Down
Loading

0 comments on commit 325d736

Please sign in to comment.