Skip to content

Commit

Permalink
Use async open from OS
Browse files Browse the repository at this point in the history
  • Loading branch information
nirinchev committed May 13, 2019
1 parent a0407d3 commit 0e6f7cd
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 123 deletions.
28 changes: 14 additions & 14 deletions Realm/Realm/Configurations/SyncConfigurationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,24 +209,24 @@ internal override Realm CreateRealm(RealmSchema schema)

internal override async Task<Realm> CreateRealmAsync(RealmSchema schema)
{
var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey));
IDisposable subscription = null;
try
var configuration = new Realms.Native.Configuration
{
if (OnProgress != null)
{
var observer = new Observer<SyncProgress>(OnProgress);
subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer);
}
await session.WaitForDownloadAsync();
}
finally
Path = DatabasePath,
schema_version = SchemaVersion,
enable_cache = EnableCache
};

// Keep that until we open the Realm on the foreground.
var backgroundHandle = await SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey);

var srHandle = SharedRealmHandleExtensions.OpenWithSync(configuration, ToNative(), schema, EncryptionKey);
backgroundHandle.Close();
if (IsDynamic && !schema.Any())
{
subscription?.Dispose();
session.CloseHandle();
srHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema));
}

return CreateRealm(schema);
return new Realm(srHandle, this, schema);
}

internal Native.SyncConfiguration ToNative()
Expand Down
2 changes: 1 addition & 1 deletion Realm/Realm/DataBinding/WovenSetterMethodInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
//
////////////////////////////////////////////////////////////////////////////

using Realms.Helpers;
using System;
using System.Globalization;
using System.Reflection;
using Realms.Helpers;

namespace Realms.DataBinding
{
Expand Down
7 changes: 3 additions & 4 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static class NativeMethods

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)]
[return: MarshalAs(UnmanagedType.I1)]
public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);
public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)]
public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal);
Expand Down Expand Up @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token)
ex.ThrowIfNecessary();
}

public bool Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
public void Wait(TaskCompletionSource<object> tcs, ProgressDirection direction)
{
var tcsHandle = GCHandle.Alloc(tcs);
var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex);
ex.ThrowIfNecessary();
return result;
}

public IntPtr GetRawPointer()
Expand Down
72 changes: 67 additions & 5 deletions Realm/Realm/Handles/SharedRealmHandleExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ namespace Realms.Sync
{
internal static class SharedRealmHandleExtensions
{
// This is int, because Interlocked.Exchange cannot work with narrower types such as bool.
private static int _fileSystemConfigured;

// We only save it to avoid allocating the GCHandle multiple times.
private static readonly NativeMethods.LogMessageCallback _logCallback;

// This is int, because Interlocked.Exchange cannot work with narrower types such as bool.
private static int _fileSystemConfigured;

private static class NativeMethods
{
[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync", CallingConvention = CallingConvention.Cdecl)]
Expand All @@ -48,6 +48,14 @@ private static class NativeMethods
byte[] encryptionKey,
out NativeException ex);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync_async", CallingConvention = CallingConvention.Cdecl)]
public static extern void open_with_sync_async(Configuration configuration, Native.SyncConfiguration sync_configuration,
[MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length,
[MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties,
byte[] encryptionKey,
IntPtr task_completion_source,
out NativeException ex);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void RefreshAccessTokenCallbackDelegate(IntPtr session_handle_ptr);

Expand All @@ -63,13 +71,19 @@ private static class NativeMethods
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public unsafe delegate void LogMessageCallback(byte* message_buf, IntPtr message_len, LogLevel logLevel);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public unsafe delegate void OpenRealmCallback(IntPtr task_completion_source, IntPtr shared_realm, int error_code, byte* message_buf, IntPtr message_len);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncmanager_configure", CallingConvention = CallingConvention.Cdecl)]
public static extern unsafe void configure([MarshalAs(UnmanagedType.LPWStr)] string base_path, IntPtr base_path_length,
[MarshalAs(UnmanagedType.LPWStr)] string user_agent, IntPtr user_agent_length,
UserPersistenceMode* userPersistence, byte[] encryptionKey,
[MarshalAs(UnmanagedType.I1)] bool resetMetadataOnError,
out NativeException exception);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncmanager_callbacks", CallingConvention = CallingConvention.Cdecl)]
public static extern void install_syncmanager_callbacks(OpenRealmCallback open_callback);

[DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncsession_callbacks", CallingConvention = CallingConvention.Cdecl)]
public static extern void install_syncsession_callbacks(RefreshAccessTokenCallbackDelegate refresh_callback, SessionErrorCallback error_callback, SessionProgressCallback progress_callback, SessionWaitCallback wait_callback);

Expand Down Expand Up @@ -133,6 +147,12 @@ static unsafe SharedRealmHandleExtensions()

NativeMethods.install_syncsession_callbacks(refresh, error, progress, wait);

NativeMethods.OpenRealmCallback openRealm = HandleOpenRealmCallback;

GCHandle.Alloc(openRealm);

NativeMethods.install_syncmanager_callbacks(openRealm);

_logCallback = HandleLogMessage;
GCHandle.Alloc(_logCallback);
}
Expand All @@ -149,6 +169,22 @@ public static SharedRealmHandle OpenWithSync(Configuration configuration, Native
return new SharedRealmHandle(result);
}

public static Task<SharedRealmHandle> OpenWithSyncAsync(Configuration configuration, Native.SyncConfiguration syncConfiguration, RealmSchema schema, byte[] encryptionKey)
{
System.Diagnostics.Debug.WriteLine("Thread on Open: " + Environment.CurrentManagedThreadId);

DoInitialFileSystemConfiguration();

var marshaledSchema = new SharedRealmHandle.SchemaMarshaler(schema);

var tcs = new TaskCompletionSource<SharedRealmHandle>();
var tcsHandle = GCHandle.Alloc(tcs);
NativeMethods.open_with_sync_async(configuration, syncConfiguration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, GCHandle.ToIntPtr(tcsHandle), out var nativeException);
nativeException.ThrowIfNecessary();

return tcs.Task;
}

public static string GetRealmPath(User user, Uri serverUri)
{
DoInitialFileSystemConfiguration();
Expand Down Expand Up @@ -349,8 +385,34 @@ private static unsafe void HandleSessionWaitCallback(IntPtr taskCompletionSource
else
{
var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code);
const string outerMessage = "A system error occurred while waiting for completion. See InnerException for more details";
tcs.TrySetException(new RealmException(outerMessage, inner));
const string OuterMessage = "A system error occurred while waiting for completion. See InnerException for more details";
tcs.TrySetException(new RealmException(OuterMessage, inner));
}
}
finally
{
handle.Free();
}
}

[NativeCallback(typeof(NativeMethods.OpenRealmCallback))]
private static unsafe void HandleOpenRealmCallback(IntPtr taskCompletionSource, IntPtr shared_realm, int error_code, byte* messageBuffer, IntPtr messageLength)
{
var handle = GCHandle.FromIntPtr(taskCompletionSource);
var tcs = (TaskCompletionSource<SharedRealmHandle>)handle.Target;

try
{
if (error_code == 0)
{
System.Diagnostics.Debug.WriteLine("Thread on Opened: " + Environment.CurrentManagedThreadId);
tcs.TrySetResult(new SharedRealmHandle(shared_realm));
}
else
{
var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code);
const string OuterMessage = "A system error occurred while opening a Realm. See InnerException for more details";
tcs.TrySetException(new RealmException(OuterMessage, inner));
}
}
finally
Expand Down
2 changes: 1 addition & 1 deletion Realm/Realm/Realm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ async Task doWorkAsync()
var didRefresh = await RefreshAsync();

// TODO: figure out why this assertion fails in `AsyncTests.AsyncWrite_ShouldExecuteOnWorkerThread`
//System.Diagnostics.Debug.Assert(didRefresh, "Expected RefreshAsync to return true.");
// System.Diagnostics.Debug.Assert(didRefresh, "Expected RefreshAsync to return true.");
}
return doWorkAsync();
}
Expand Down
2 changes: 1 addition & 1 deletion Realm/Realm/RealmCollectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class RealmCollectionBase<T>
ISchemaSource,
IThreadConfined
{
protected static readonly PropertyType _argumentType = PropertyTypeEx.ToPropertyType(typeof(T), out _);
protected static readonly PropertyType _argumentType = typeof(T).ToPropertyType(out _);

private readonly List<NotificationCallbackDelegate<T>> _callbacks = new List<NotificationCallbackDelegate<T>>();

Expand Down
22 changes: 15 additions & 7 deletions Realm/Realm/Schema/RealmSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,24 @@ internal static RealmSchema CreateSchemaForClasses(IEnumerable<Type> classes, Re

foreach (var @class in classes)
{
var typeInfo = @class.GetTypeInfo();
var objectSchema = ObjectSchema.FromType(@class.GetTypeInfo());
if (!classNames.Add(objectSchema.Name))

if (classNames.Add(objectSchema.Name))
{
builder.Add(objectSchema);
}
else
{
var duplicateType = builder.Single(s => s.Name == objectSchema.Name).Type;
var errorMessage = "The names (without namespace) of objects persisted in Realm must be unique." +
$"The duplicate types are {@class.FullName} and {duplicateType.FullName}. Either rename one" +
" of them or explicitly specify ObjectClasses on your RealmConfiguration.";
throw new NotSupportedException(errorMessage);
var duplicateType = builder.FirstOrDefault(s => s.Name == objectSchema.Name).Type;
if (typeInfo.FullName != duplicateType.FullName)
{
var errorMessage = "The names (without namespace) of objects persisted in Realm must be unique." +
$"The duplicate types are {@class.FullName} and {duplicateType.FullName}. Either rename one" +
" of them or explicitly specify ObjectClasses on your RealmConfiguration.";
throw new NotSupportedException(errorMessage);
}
}
builder.Add(objectSchema);
}

return builder.Build();
Expand Down
2 changes: 0 additions & 2 deletions Realm/Realm/Server/Notifier/Notifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ internal static unsafe void OnStarted(IntPtr managedInstance, int errorCode, byt
var message = Encoding.UTF8.GetString(messageBuffer, (int)messageLength);
notifier.OnStarted(new NotifierStartException(errorCode, message));
}

}

internal static void OnCalculationCompleted(IntPtr details_ptr, IntPtr managedCallbackPtr)
Expand Down Expand Up @@ -168,7 +167,6 @@ internal void CalculateChanges(string path, IntPtr calculation_ptr)
if (_isDisposed == 0)
{
_processor.Enqueue(path, calculation_ptr);

}
}

Expand Down
12 changes: 2 additions & 10 deletions Realm/Realm/Sync/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ public IObservable<SyncProgress> GetProgressObservable(ProgressDirection directi
public Task WaitForUploadAsync()
{
var tcs = new TaskCompletionSource<object>();
if (!Handle.Wait(tcs, ProgressDirection.Upload))
{
throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state");
}

Handle.Wait(tcs, ProgressDirection.Upload);
return tcs.Task;
}

Expand All @@ -128,11 +124,7 @@ public Task WaitForUploadAsync()
public Task WaitForDownloadAsync()
{
var tcs = new TaskCompletionSource<object>();
if (!Handle.Wait(tcs, ProgressDirection.Download))
{
throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state");
}

Handle.Wait(tcs, ProgressDirection.Download);
return tcs.Task;
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/Realm.Tests/Sync/PermissionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private static async Task AssertPermissions(User granter, User receiver, string
[Test]
public void WriteToReadOnlyRealm_ThrowsPermissionDenied()
{
SyncTestHelpers.RunRosTestAsync(async() =>
SyncTestHelpers.RunRosTestAsync(async () =>
{
var alice = await SyncTestHelpers.GetUserAsync();
var bob = await SyncTestHelpers.GetUserAsync();
Expand Down
3 changes: 2 additions & 1 deletion wrappers/src/results_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "schema_cs.hpp"
#include <realm/parser/parser.hpp>
#include <realm/parser/query_builder.hpp>
#include "keypath_helpers.hpp"

using namespace realm;
using namespace realm::binding;
Expand Down Expand Up @@ -138,7 +139,7 @@ REALM_EXPORT Results* results_get_filtered_results(const Results& results, uint1
parser::ParserResult result = parser::parse(query_string.to_string());

parser::KeyPathMapping mapping;
alias_backlinks(mapping, realm);
realm::alias_backlinks(mapping, *realm);

query_builder::NoArguments no_args;
query_builder::apply_predicate(query, result.predicate, no_args, mapping);
Expand Down
15 changes: 0 additions & 15 deletions wrappers/src/schema_cs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,4 @@ REALM_FORCEINLINE SchemaObject SchemaObject::for_marshalling(const ObjectSchema&

util::Optional<Schema> create_schema(SchemaObject* objects, int objects_length, SchemaProperty* properties);

REALM_FORCEINLINE void alias_backlinks(parser::KeyPathMapping &mapping, const SharedRealm &realm)
{
const Schema &schema = realm->schema();
for (auto it = schema.begin(); it != schema.end(); ++it) {
for (const Property &property : it->computed_properties) {
if (property.type == PropertyType::LinkingObjects) {
auto target_object_schema = schema.find(property.object_type);
const TableRef table = ObjectStore::table_for_object_type(realm->read_group(), it->name);
const TableRef target_table = ObjectStore::table_for_object_type(realm->read_group(), target_object_schema->name);
std::string native_name = "@links." + std::string(target_table->get_name()) + "." + property.link_origin_property_name;
mapping.add_mapping(table, property.name, native_name);
}
}
}
}
#endif /* defined(SCHEMA_CS_HPP) */
26 changes: 9 additions & 17 deletions wrappers/src/subscription_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "schema_cs.hpp"
#include <realm/parser/parser.hpp>
#include <realm/parser/query_builder.hpp>
#include "keypath_helpers.hpp"

using namespace realm;
using namespace realm::binding;
Expand All @@ -37,24 +38,15 @@ REALM_EXPORT Subscription* realm_subscription_create(Results& results, uint16_t*
auto name = name_len >= 0 ? util::Optional<std::string>(Utf16StringAccessor(name_buf, name_len).to_string()) : none;
auto optional_ttl = time_to_live >= 0 ? util::Optional<int64_t>(time_to_live) : none;

IncludeDescriptor inclusion_paths;
if (inclusions_len >= 0) {
DescriptorOrdering combined_orderings;
parser::KeyPathMapping mapping;
alias_backlinks(mapping, results.get_realm());

for (auto i = 0; i < inclusions_len; i++) {
auto inclusion_path = inclusions[i].value;
DescriptorOrdering ordering;
parser::DescriptorOrderingState ordering_state = parser::parse_include_path(inclusion_path);
query_builder::apply_ordering(ordering, results.get_query().get_table(), ordering_state, mapping);
combined_orderings.append_include(ordering.compile_included_backlinks());
}

if (combined_orderings.will_apply_include()) {
inclusion_paths = combined_orderings.compile_included_backlinks();
}
std::vector<StringData> paths;
for (auto i = 0; i < inclusions_len; i++) {
paths.emplace_back(inclusions[i].value);
}

parser::KeyPathMapping mapping;
realm::alias_backlinks(mapping, *results.get_realm());

auto inclusion_paths = realm::generate_include_from_keypaths(paths, *results.get_realm(), results.get_object_schema(), mapping);

realm::partial_sync::SubscriptionOptions options {
name,
Expand Down
Loading

0 comments on commit 0e6f7cd

Please sign in to comment.