diff --git a/Realm/Realm/Configurations/SyncConfigurationBase.cs b/Realm/Realm/Configurations/SyncConfigurationBase.cs index 11dd3d8045..9c46725739 100644 --- a/Realm/Realm/Configurations/SyncConfigurationBase.cs +++ b/Realm/Realm/Configurations/SyncConfigurationBase.cs @@ -209,24 +209,24 @@ internal override Realm CreateRealm(RealmSchema schema) internal override async Task CreateRealmAsync(RealmSchema schema) { - var session = new Session(SharedRealmHandleExtensions.GetSession(DatabasePath, ToNative(), EncryptionKey)); - IDisposable subscription = null; - try + var configuration = new Realms.Native.Configuration { - if (OnProgress != null) - { - var observer = new Observer(OnProgress); - subscription = session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork).Subscribe(observer); - } - await session.WaitForDownloadAsync(); - } - finally + Path = DatabasePath, + schema_version = SchemaVersion, + enable_cache = EnableCache + }; + + // Keep that until we open the Realm on the foreground. + var backgroundHandle = await SharedRealmHandleExtensions.OpenWithSyncAsync(configuration, ToNative(), schema, EncryptionKey); + + var foregroundHandle = SharedRealmHandleExtensions.OpenWithSync(configuration, ToNative(), schema, EncryptionKey); + backgroundHandle.Close(); + if (IsDynamic && !schema.Any()) { - subscription?.Dispose(); - session.CloseHandle(); + foregroundHandle.GetSchema(nativeSchema => schema = RealmSchema.CreateFromObjectStoreSchema(nativeSchema)); } - return CreateRealm(schema); + return new Realm(foregroundHandle, this, schema); } internal Native.SyncConfiguration ToNative() diff --git a/Realm/Realm/Handles/SessionHandle.cs b/Realm/Realm/Handles/SessionHandle.cs index 7489180027..7777bf7e37 100644 --- a/Realm/Realm/Handles/SessionHandle.cs +++ b/Realm/Realm/Handles/SessionHandle.cs @@ -65,7 +65,7 @@ private static class NativeMethods [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_wait", CallingConvention = CallingConvention.Cdecl)] [return: MarshalAs(UnmanagedType.I1)] - public static extern bool wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex); + public static extern void wait(SessionHandle session, IntPtr task_completion_source, ProgressDirection direction, out NativeException ex); [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncsession_report_error_for_testing", CallingConvention = CallingConvention.Cdecl)] public static extern void report_error_for_testing(SessionHandle session, int error_code, [MarshalAs(UnmanagedType.LPWStr)] string message, IntPtr message_len, [MarshalAs(UnmanagedType.I1)] bool is_fatal); @@ -145,12 +145,11 @@ public void UnregisterProgressNotifier(ulong token) ex.ThrowIfNecessary(); } - public bool Wait(TaskCompletionSource tcs, ProgressDirection direction) + public void Wait(TaskCompletionSource tcs, ProgressDirection direction) { var tcsHandle = GCHandle.Alloc(tcs); - var result = NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex); + NativeMethods.wait(this, GCHandle.ToIntPtr(tcsHandle), direction, out var ex); ex.ThrowIfNecessary(); - return result; } public IntPtr GetRawPointer() diff --git a/Realm/Realm/Handles/SharedRealmHandleExtensions.cs b/Realm/Realm/Handles/SharedRealmHandleExtensions.cs index 0b284ba85e..94ab68ee4f 100644 --- a/Realm/Realm/Handles/SharedRealmHandleExtensions.cs +++ b/Realm/Realm/Handles/SharedRealmHandleExtensions.cs @@ -33,12 +33,12 @@ namespace Realms.Sync { internal static class SharedRealmHandleExtensions { - // This is int, because Interlocked.Exchange cannot work with narrower types such as bool. - private static int _fileSystemConfigured; - // We only save it to avoid allocating the GCHandle multiple times. private static readonly NativeMethods.LogMessageCallback _logCallback; + // This is int, because Interlocked.Exchange cannot work with narrower types such as bool. + private static int _fileSystemConfigured; + private static class NativeMethods { [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync", CallingConvention = CallingConvention.Cdecl)] @@ -48,6 +48,14 @@ private static class NativeMethods byte[] encryptionKey, out NativeException ex); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "shared_realm_open_with_sync_async", CallingConvention = CallingConvention.Cdecl)] + public static extern void open_with_sync_async(Configuration configuration, Native.SyncConfiguration sync_configuration, + [MarshalAs(UnmanagedType.LPArray), In] SchemaObject[] objects, int objects_length, + [MarshalAs(UnmanagedType.LPArray), In] SchemaProperty[] properties, + byte[] encryptionKey, + IntPtr task_completion_source, + out NativeException ex); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void RefreshAccessTokenCallbackDelegate(IntPtr session_handle_ptr); @@ -63,6 +71,9 @@ private static class NativeMethods [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public unsafe delegate void LogMessageCallback(byte* message_buf, IntPtr message_len, LogLevel logLevel); + [UnmanagedFunctionPointer(CallingConvention.Cdecl)] + public unsafe delegate void OpenRealmCallback(IntPtr task_completion_source, IntPtr shared_realm, int error_code, byte* message_buf, IntPtr message_len); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_syncmanager_configure", CallingConvention = CallingConvention.Cdecl)] public static extern unsafe void configure([MarshalAs(UnmanagedType.LPWStr)] string base_path, IntPtr base_path_length, [MarshalAs(UnmanagedType.LPWStr)] string user_agent, IntPtr user_agent_length, @@ -70,6 +81,9 @@ private static class NativeMethods [MarshalAs(UnmanagedType.I1)] bool resetMetadataOnError, out NativeException exception); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncmanager_callbacks", CallingConvention = CallingConvention.Cdecl)] + public static extern void install_syncmanager_callbacks(OpenRealmCallback open_callback); + [DllImport(InteropConfig.DLL_NAME, EntryPoint = "realm_install_syncsession_callbacks", CallingConvention = CallingConvention.Cdecl)] public static extern void install_syncsession_callbacks(RefreshAccessTokenCallbackDelegate refresh_callback, SessionErrorCallback error_callback, SessionProgressCallback progress_callback, SessionWaitCallback wait_callback); @@ -133,6 +147,12 @@ static unsafe SharedRealmHandleExtensions() NativeMethods.install_syncsession_callbacks(refresh, error, progress, wait); + NativeMethods.OpenRealmCallback openRealm = HandleOpenRealmCallback; + + GCHandle.Alloc(openRealm); + + NativeMethods.install_syncmanager_callbacks(openRealm); + _logCallback = HandleLogMessage; GCHandle.Alloc(_logCallback); } @@ -149,6 +169,22 @@ public static SharedRealmHandle OpenWithSync(Configuration configuration, Native return new SharedRealmHandle(result); } + public static Task OpenWithSyncAsync(Configuration configuration, Native.SyncConfiguration syncConfiguration, RealmSchema schema, byte[] encryptionKey) + { + System.Diagnostics.Debug.WriteLine("Thread on Open: " + Environment.CurrentManagedThreadId); + + DoInitialFileSystemConfiguration(); + + var marshaledSchema = new SharedRealmHandle.SchemaMarshaler(schema); + + var tcs = new TaskCompletionSource(); + var tcsHandle = GCHandle.Alloc(tcs); + NativeMethods.open_with_sync_async(configuration, syncConfiguration, marshaledSchema.Objects, marshaledSchema.Objects.Length, marshaledSchema.Properties, encryptionKey, GCHandle.ToIntPtr(tcsHandle), out var nativeException); + nativeException.ThrowIfNecessary(); + + return tcs.Task; + } + public static string GetRealmPath(User user, Uri serverUri) { DoInitialFileSystemConfiguration(); @@ -349,8 +385,34 @@ private static unsafe void HandleSessionWaitCallback(IntPtr taskCompletionSource else { var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code); - const string outerMessage = "A system error occurred while waiting for completion. See InnerException for more details"; - tcs.TrySetException(new RealmException(outerMessage, inner)); + const string OuterMessage = "A system error occurred while waiting for completion. See InnerException for more details"; + tcs.TrySetException(new RealmException(OuterMessage, inner)); + } + } + finally + { + handle.Free(); + } + } + + [NativeCallback(typeof(NativeMethods.OpenRealmCallback))] + private static unsafe void HandleOpenRealmCallback(IntPtr taskCompletionSource, IntPtr shared_realm, int error_code, byte* messageBuffer, IntPtr messageLength) + { + var handle = GCHandle.FromIntPtr(taskCompletionSource); + var tcs = (TaskCompletionSource)handle.Target; + + try + { + if (error_code == 0) + { + System.Diagnostics.Debug.WriteLine("Thread on Opened: " + Environment.CurrentManagedThreadId); + tcs.TrySetResult(new SharedRealmHandle(shared_realm)); + } + else + { + var inner = new SessionException(Encoding.UTF8.GetString(messageBuffer, (int)messageLength), (ErrorCode)error_code); + const string OuterMessage = "A system error occurred while opening a Realm. See InnerException for more details"; + tcs.TrySetException(new RealmException(OuterMessage, inner)); } } finally diff --git a/Realm/Realm/RealmCollectionBase.cs b/Realm/Realm/RealmCollectionBase.cs index 933666b8d1..f15c445fe5 100644 --- a/Realm/Realm/RealmCollectionBase.cs +++ b/Realm/Realm/RealmCollectionBase.cs @@ -38,7 +38,7 @@ public abstract class RealmCollectionBase ISchemaSource, IThreadConfined { - protected static readonly PropertyType _argumentType = PropertyTypeEx.ToPropertyType(typeof(T), out _); + protected static readonly PropertyType _argumentType = typeof(T).ToPropertyType(out _); private readonly List> _callbacks = new List>(); diff --git a/Realm/Realm/Sync/Session.cs b/Realm/Realm/Sync/Session.cs index dedd4a8b60..c6234ed0e4 100644 --- a/Realm/Realm/Sync/Session.cs +++ b/Realm/Realm/Sync/Session.cs @@ -112,11 +112,7 @@ public IObservable GetProgressObservable(ProgressDirection directi public Task WaitForUploadAsync() { var tcs = new TaskCompletionSource(); - if (!Handle.Wait(tcs, ProgressDirection.Upload)) - { - throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state"); - } - + Handle.Wait(tcs, ProgressDirection.Upload); return tcs.Task; } @@ -128,11 +124,7 @@ public Task WaitForUploadAsync() public Task WaitForDownloadAsync() { var tcs = new TaskCompletionSource(); - if (!Handle.Wait(tcs, ProgressDirection.Download)) - { - throw new InvalidOperationException("Cannot register a wait callback on a session in the Error state"); - } - + Handle.Wait(tcs, ProgressDirection.Download); return tcs.Task; } diff --git a/Tests/Realm.Tests/Sync/SessionTests.cs b/Tests/Realm.Tests/Sync/SessionTests.cs index aa81492147..3be3decd87 100644 --- a/Tests/Realm.Tests/Sync/SessionTests.cs +++ b/Tests/Realm.Tests/Sync/SessionTests.cs @@ -107,6 +107,7 @@ public void Session_Error_ShouldPassCorrectSession() } [Test] + [Ignore("This is no longer relevant with the automatic client reset recovery.")] public void Session_DivergingHistories_ShouldRaiseClientResetException() { TestHelpers.RunAsyncTest(async () => diff --git a/wrappers/src/results_cs.cpp b/wrappers/src/results_cs.cpp index f94900d9d8..15f88ea791 100644 --- a/wrappers/src/results_cs.cpp +++ b/wrappers/src/results_cs.cpp @@ -29,6 +29,7 @@ #include "schema_cs.hpp" #include #include +#include "keypath_helpers.hpp" using namespace realm; using namespace realm::binding; @@ -138,7 +139,7 @@ REALM_EXPORT Results* results_get_filtered_results(const Results& results, uint1 parser::ParserResult result = parser::parse(query_string.to_string()); parser::KeyPathMapping mapping; - alias_backlinks(mapping, realm); + realm::alias_backlinks(mapping, *realm); query_builder::NoArguments no_args; query_builder::apply_predicate(query, result.predicate, no_args, mapping); diff --git a/wrappers/src/schema_cs.hpp b/wrappers/src/schema_cs.hpp index c6727a51b2..e25592e0a9 100644 --- a/wrappers/src/schema_cs.hpp +++ b/wrappers/src/schema_cs.hpp @@ -90,19 +90,4 @@ REALM_FORCEINLINE SchemaObject SchemaObject::for_marshalling(const ObjectSchema& util::Optional create_schema(SchemaObject* objects, int objects_length, SchemaProperty* properties); -REALM_FORCEINLINE void alias_backlinks(parser::KeyPathMapping &mapping, const SharedRealm &realm) -{ - const Schema &schema = realm->schema(); - for (auto it = schema.begin(); it != schema.end(); ++it) { - for (const Property &property : it->computed_properties) { - if (property.type == PropertyType::LinkingObjects) { - auto target_object_schema = schema.find(property.object_type); - const TableRef table = ObjectStore::table_for_object_type(realm->read_group(), it->name); - const TableRef target_table = ObjectStore::table_for_object_type(realm->read_group(), target_object_schema->name); - std::string native_name = "@links." + std::string(target_table->get_name()) + "." + property.link_origin_property_name; - mapping.add_mapping(table, property.name, native_name); - } - } - } -} #endif /* defined(SCHEMA_CS_HPP) */ diff --git a/wrappers/src/subscription_cs.cpp b/wrappers/src/subscription_cs.cpp index 51e1866692..18593aebdd 100644 --- a/wrappers/src/subscription_cs.cpp +++ b/wrappers/src/subscription_cs.cpp @@ -16,6 +16,7 @@ #include "schema_cs.hpp" #include #include +#include "keypath_helpers.hpp" using namespace realm; using namespace realm::binding; @@ -36,32 +37,23 @@ REALM_EXPORT Subscription* realm_subscription_create(Results& results, uint16_t* return handle_errors(ex, [&]() { auto name = name_len >= 0 ? util::Optional(Utf16StringAccessor(name_buf, name_len).to_string()) : none; auto optional_ttl = time_to_live >= 0 ? util::Optional(time_to_live) : none; - - IncludeDescriptor inclusion_paths; - if (inclusions_len >= 0) { - DescriptorOrdering combined_orderings; - parser::KeyPathMapping mapping; - alias_backlinks(mapping, results.get_realm()); - - for (auto i = 0; i < inclusions_len; i++) { - auto inclusion_path = inclusions[i].value; - DescriptorOrdering ordering; - parser::DescriptorOrderingState ordering_state = parser::parse_include_path(inclusion_path); - query_builder::apply_ordering(ordering, results.get_query().get_table(), ordering_state, mapping); - combined_orderings.append_include(ordering.compile_included_backlinks()); - } - - if (combined_orderings.will_apply_include()) { - inclusion_paths = combined_orderings.compile_included_backlinks(); - } + + std::vector paths; + for (auto i = 0; i < inclusions_len; i++) { + paths.emplace_back(inclusions[i].value); } + parser::KeyPathMapping mapping; + realm::alias_backlinks(mapping, *results.get_realm()); + + auto inclusion_paths = realm::generate_include_from_keypaths(paths, *results.get_realm(), results.get_object_schema(), mapping); + realm::partial_sync::SubscriptionOptions options; options.user_provided_name = name; options.time_to_live_ms = optional_ttl; options.update = update; options.inclusions = inclusion_paths; - + auto result = realm::partial_sync::subscribe(results, options); return new Subscription(std::move(result)); }); diff --git a/wrappers/src/sync_manager_cs.cpp b/wrappers/src/sync_manager_cs.cpp index b6ee8e7351..758da9485d 100644 --- a/wrappers/src/sync_manager_cs.cpp +++ b/wrappers/src/sync_manager_cs.cpp @@ -44,6 +44,9 @@ using LogMessageDelegate = void(const char* message, size_t message_len, util::L namespace realm { namespace binding { + void (*s_open_realm_callback)(void* task_completion_source, SharedRealm* realm, int32_t error_code, const char* message, size_t message_len); + + class SyncLogger : public util::RootLogger { public: SyncLogger(LogMessageDelegate* delegate) @@ -76,12 +79,61 @@ namespace binding { }; } +Realm::Config get_shared_realm_config(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key) +{ + Realm::Config config; + config.schema_mode = SchemaMode::Additive; + + if (objects_length > 0) { + config.schema = create_schema(objects, objects_length, properties); + } + + config.schema_version = configuration.schema_version; + + std::string realm_url(Utf16StringAccessor(sync_configuration.url, sync_configuration.url_len)); + + config.sync_config = std::make_shared(*sync_configuration.user, realm_url); + config.sync_config->bind_session_handler = bind_session; + config.sync_config->error_handler = handle_session_error; + config.path = Utf16StringAccessor(configuration.path, configuration.path_len); + + // by definition the key is only allowed to be 64 bytes long, enforced by C# code + if (encryption_key) { + auto& key = *reinterpret_cast*>(encryption_key); + + config.encryption_key = std::vector(key.begin(), key.end()); + config.sync_config->realm_encryption_key = key; + } + +#if !REALM_PLATFORM_APPLE + if (sync_configuration.trusted_ca_path) { + Utf16StringAccessor trusted_ca_path(sync_configuration.trusted_ca_path, sync_configuration.trusted_ca_path_len); + config.sync_config->ssl_trust_certificate_path = trusted_ca_path.to_string(); + } +#endif + + config.sync_config->client_validate_ssl = sync_configuration.client_validate_ssl; + config.sync_config->is_partial = sync_configuration.is_partial; + + if (sync_configuration.partial_sync_identifier) { + Utf16StringAccessor partial_sync_identifier(sync_configuration.partial_sync_identifier, sync_configuration.partial_sync_identifier_len); + config.sync_config->custom_partial_sync_identifier = partial_sync_identifier.to_string(); + } + + return config; +} + } using SharedSyncUser = std::shared_ptr; extern "C" { +REALM_EXPORT void realm_install_syncmanager_callbacks(decltype(s_open_realm_callback) open_callback) +{ + s_open_realm_callback = open_callback; +} + REALM_EXPORT void realm_syncmanager_configure(const uint16_t* base_path_buf, size_t base_path_len, const uint16_t* user_agent_buf, size_t user_agent_len, const SyncManager::MetadataMode* mode, const char* encryption_key_buf, bool reset_on_error, @@ -135,48 +187,31 @@ REALM_EXPORT util::Logger::Level realm_syncmanager_get_log_level() { return SyncManager::shared().log_level(); } - -REALM_EXPORT SharedRealm* shared_realm_open_with_sync(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, NativeException::Marshallable& ex) + +REALM_EXPORT void shared_realm_open_with_sync_async(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, void* task_completion_source, NativeException::Marshallable& ex) { - return handle_errors(ex, [&]() { - Realm::Config config; - config.schema_mode = SchemaMode::Additive; + handle_errors(ex, [&]() { + auto config = get_shared_realm_config(configuration, sync_configuration, objects, objects_length, properties, encryption_key); - if (objects_length > 0) { - config.schema = create_schema(objects, objects_length, properties); - } - - config.schema_version = configuration.schema_version; - - std::string realm_url(Utf16StringAccessor(sync_configuration.url, sync_configuration.url_len)); - - config.sync_config = std::make_shared(*sync_configuration.user, realm_url); - config.sync_config->bind_session_handler = bind_session; - config.sync_config->error_handler = handle_session_error; - config.path = Utf16StringAccessor(configuration.path, configuration.path_len); - - // by definition the key is only allowed to be 64 bytes long, enforced by C# code - if (encryption_key) { - auto& key = *reinterpret_cast*>(encryption_key); - - config.encryption_key = std::vector(key.begin(), key.end()); - config.sync_config->realm_encryption_key = key; - } + Realm::get_shared_realm(config, [task_completion_source](SharedRealm realm, std::exception_ptr error) { + if (error) { + try { + std::rethrow_exception(error); + } catch (const std::system_error& system_error) { + const std::error_code& ec = system_error.code(); + s_open_realm_callback(task_completion_source, nullptr, ec.value(), ec.message().c_str(), ec.message().length()); + } + } else { + s_open_realm_callback(task_completion_source, new SharedRealm(realm), 0, nullptr, 0); + } + }); + }); +} -#if !REALM_PLATFORM_APPLE - if (sync_configuration.trusted_ca_path) { - Utf16StringAccessor trusted_ca_path(sync_configuration.trusted_ca_path, sync_configuration.trusted_ca_path_len); - config.sync_config->ssl_trust_certificate_path = trusted_ca_path.to_string(); - } -#endif - - config.sync_config->client_validate_ssl = sync_configuration.client_validate_ssl; - config.sync_config->is_partial = sync_configuration.is_partial; - - if (sync_configuration.partial_sync_identifier) { - Utf16StringAccessor partial_sync_identifier(sync_configuration.partial_sync_identifier, sync_configuration.partial_sync_identifier_len); - config.sync_config->custom_partial_sync_identifier = partial_sync_identifier.to_string(); - } +REALM_EXPORT SharedRealm* shared_realm_open_with_sync(Configuration configuration, SyncConfiguration sync_configuration, SchemaObject* objects, int objects_length, SchemaProperty* properties, uint8_t* encryption_key, NativeException::Marshallable& ex) +{ + return handle_errors(ex, [&]() { + auto config = get_shared_realm_config(configuration, sync_configuration, objects, objects_length, properties, encryption_key); auto realm = Realm::get_shared_realm(config); if (!configuration.read_only) diff --git a/wrappers/src/sync_session_cs.cpp b/wrappers/src/sync_session_cs.cpp index fc9e7f424d..3faf7c4cd4 100644 --- a/wrappers/src/sync_session_cs.cpp +++ b/wrappers/src/sync_session_cs.cpp @@ -160,17 +160,17 @@ REALM_EXPORT void realm_syncsession_unregister_progress_notifier(const SharedSyn }); } -REALM_EXPORT bool realm_syncsession_wait(const SharedSyncSession& session, void* task_completion_source, CSharpNotifierType direction, NativeException::Marshallable& ex) +REALM_EXPORT void realm_syncsession_wait(const SharedSyncSession& session, void* task_completion_source, CSharpNotifierType direction, NativeException::Marshallable& ex) { - return handle_errors(ex, [&] { + handle_errors(ex, [&] { auto waiter = [task_completion_source](std::error_code error) { s_wait_callback(task_completion_source, error.value(), error.message().c_str(), error.message().length()); }; if (direction == CSharpNotifierType::Upload) { - return session->wait_for_upload_completion(waiter); + session->wait_for_upload_completion(waiter); } else { - return session->wait_for_download_completion(waiter); + session->wait_for_download_completion(waiter); } }); }