Skip to content

Commit

Permalink
Merge 3662acd into 707edcc
Browse files Browse the repository at this point in the history
  • Loading branch information
desistefanova committed May 26, 2023
2 parents 707edcc + 3662acd commit 95bb81a
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 24 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,6 +1,9 @@
## vNext (TBD)

### Enhancements

* Added `CompensatingWriteError` that contains detailed information about the writes that have been reverted by the server due to permissions or subscription view restrictions. It will be received on `syncErrorHandle` callbak, which is set to `Configuration.flexibleSync` similarly to other session errors. ([#1291](https://github.com/realm/realm-dart/pull/1291))

* Add `RealmResults.isValid` ([#1231](https://github.com/realm/realm-dart/pull/1231)).
* Support `Decimal128` datatype ([#1192](https://github.com/realm/realm-dart/pull/1192)).
* Realm logging is extended to support logging of all Realm storage level messages. (Core upgrade).
Expand Down
53 changes: 52 additions & 1 deletion lib/src/configuration.dart
Expand Up @@ -608,7 +608,14 @@ class ClientResetError extends SyncError {
/// The [ClientResetError] has error code of [SyncClientErrorCode.autoClientResetFailure]
SyncClientErrorCode get code => SyncClientErrorCode.autoClientResetFailure;
ClientResetError(String message, [this._config]) : super(message, SyncErrorCategory.client, SyncClientErrorCode.autoClientResetFailure.code);
String get originalFilePath => _userInfo?[_originalFilePathKey] ?? "";
/// The path where the backup copy of the realm will be placed once the client reset process is complete.
String get backupFilePath => _userInfo?[_backupFilePathKey] ?? "";
final Map<String, String>? _userInfo;
ClientResetError(String message, [this._config, this._userInfo]) : super(message, SyncErrorCategory.client, SyncClientErrorCode.autoClientResetFailure.code);
@override
String toString() {
Expand All @@ -622,6 +629,9 @@ class ClientResetError extends SyncError {
if (_config is! FlexibleSyncConfiguration) {
throw RealmException("The current configuration is not FlexibleSyncConfiguration.");
}
if (originalFilePath != _config?.path) {
throw RealmException("The current configuration does not match the original realm file path.");
}
final flexibleConfig = _config as FlexibleSyncConfiguration;
return realmCore.immediatelyRunFileActions(flexibleConfig.user.app, flexibleConfig.path);
}
Expand All @@ -630,6 +640,9 @@ class ClientResetError extends SyncError {
/// Thrown when an error occurs during synchronization
/// {@category Sync}
class SyncError extends RealmError {
final _originalFilePathKey = "ORIGINAL_FILE_PATH";
final _backupFilePathKey = "RECOVERY_FILE_PATH";
/// The numeric code value indicating the type of the sync error.
final int codeValue;
Expand Down Expand Up @@ -792,3 +805,41 @@ enum GeneralSyncErrorCode {
final int code;
const GeneralSyncErrorCode(this.code);
}
/// A class containing the details for a compensating write performed by the server.
class CompensatingWriteInfo {
CompensatingWriteInfo(this.objectType, this.reason, this.primaryKey);
/// The type of the object which was affected by the compensating write.
final String objectType;
/// The reason for the server to perform a compensating write.
final String reason;
/// The primary key of the object which was affected by the compensating write.
final RealmValue primaryKey;
@override
String toString() {
return "CompensatingWriteInfo: objectType: $objectType\n reason: $reason\n primaryKey: $primaryKey\n";
}
}
/// An error type that describes a compensating write error,
/// which indicates that one more object changes have been reverted
/// by the server.
/// {@category Sync}
class CompensatingWriteError extends SyncError {
/// The [CompensatingWriteError] has error code of [SyncSessionErrorCode.compensatingWrite]
SyncSessionErrorCode get code => SyncSessionErrorCode.compensatingWrite;
/// The list of the compensating writes performed by the server.
late final List<CompensatingWriteInfo> compensatingWrites;
CompensatingWriteError(String message, this.compensatingWrites) : super(message, SyncErrorCategory.session, SyncSessionErrorCode.compensatingWrite.code);
@override
String toString() {
return "CompensatingWriteError message: $message category: $category code: $code\n $compensatingWrites";
}
}
81 changes: 65 additions & 16 deletions lib/src/native/realm_core.dart
Expand Up @@ -2907,42 +2907,60 @@ extension on Pointer<realm_value_t> {
if (this == nullptr) {
throw RealmException("Can not convert nullptr realm_value to Dart value");
}
return ref.toDartValueByRef(realm);
}
}

switch (ref.type) {
extension on realm_value_t {
Object? toDartValueByRef(Realm? realm) {
switch (type) {
case realm_value_type.RLM_TYPE_NULL:
return null;
case realm_value_type.RLM_TYPE_INT:
return ref.values.integer;
return values.integer;
case realm_value_type.RLM_TYPE_BOOL:
return ref.values.boolean;
return values.boolean;
case realm_value_type.RLM_TYPE_STRING:
return ref.values.string.data.cast<Utf8>().toRealmDartString(length: ref.values.string.size)!;
return values.string.data.cast<Utf8>().toRealmDartString(length: values.string.size)!;
case realm_value_type.RLM_TYPE_FLOAT:
return ref.values.fnum;
return values.fnum;
case realm_value_type.RLM_TYPE_DOUBLE:
return ref.values.dnum;
return values.dnum;
case realm_value_type.RLM_TYPE_LINK:
final objectKey = ref.values.link.target;
final classKey = ref.values.link.target_table;
if (realm == null) {
return null;
}
final objectKey = values.link.target;
final classKey = values.link.target_table;
if (realm.metadata.getByClassKeyIfExists(classKey) == null) return null; // temprorary workaround to avoid crash on assertion
return realmCore._getObject(realm, classKey, objectKey);
case realm_value_type.RLM_TYPE_BINARY:
throw Exception("Not implemented");
case realm_value_type.RLM_TYPE_TIMESTAMP:
final seconds = ref.values.timestamp.seconds;
final nanoseconds = ref.values.timestamp.nanoseconds;
final seconds = values.timestamp.seconds;
final nanoseconds = values.timestamp.nanoseconds;
return DateTime.fromMicrosecondsSinceEpoch(seconds * _microsecondsPerSecond + nanoseconds ~/ _nanosecondsPerMicrosecond, isUtc: true);
case realm_value_type.RLM_TYPE_DECIMAL128:
var decimal = ref.values.decimal128; // NOTE: Does not copy the struct!
var decimal = values.decimal128; // NOTE: Does not copy the struct!
decimal = _realmLib.realm_dart_decimal128_copy(decimal); // This is a workaround to that
return Decimal128Internal.fromNative(decimal);
case realm_value_type.RLM_TYPE_OBJECT_ID:
return ObjectId.fromBytes(cast<Uint8>().asTypedList(12));
return ObjectId.fromBytes(values.object_id.bytes.toIntList(ObjectId.byteLength));
case realm_value_type.RLM_TYPE_UUID:
return Uuid.fromBytes(cast<Uint8>().asTypedList(16).buffer);
return Uuid.fromBytes(values.uuid.bytes.toIntList(16).buffer);
default:
throw RealmException("realm_value_type ${ref.type} not supported");
throw RealmException("realm_value_type $type not supported");
}
}
}

extension on Array<Uint8> {
Uint8List toIntList(int count) {
List<int> result = List.filled(count, this[0]);
for (var i = 1; i < count; i++) {
result[i] = this[i];
}
return Uint8List.fromList(result);
}
}

Expand Down Expand Up @@ -2993,15 +3011,46 @@ extension on Pointer<Utf8> {
}

extension on realm_sync_error {
static int length1(Pointer<Uint8> codeUnits) {
var length = 0;
while (codeUnits[length] != 0) {
length++;
}
return length;
}

SyncError toSyncError(Configuration config) {
final message = detailed_message.cast<Utf8>().toRealmDartString()!;
final SyncErrorCategory category = SyncErrorCategory.values[error_code.category];

//client reset can be requested with is_client_reset_requested disregarding the error_code.value
if (is_client_reset_requested) {
return ClientResetError(message, config);
Map<String, String> userInfoMap = {};
final ui = user_info_map.cast<realm_sync_error_user_info>();
for (int i = 0; i < user_info_length; ++i) {
final uiEntry = ui[i];
final key = uiEntry.key.cast<Utf8>().toDartString();
final value = uiEntry.value.cast<Utf8>().toDartString();
userInfoMap.addEntries([MapEntry(key, value)]);
}
return ClientResetError(message, config, userInfoMap);
}
if (category == SyncErrorCategory.session) {
final sessionErrorCode = SyncSessionErrorCode.fromInt(error_code.value);
if (sessionErrorCode == SyncSessionErrorCode.compensatingWrite) {
List<CompensatingWriteInfo> compensatingWrites = [];

final cw = compensating_writes.cast<realm_sync_error_compensating_write_info>();
for (int i = 0; i < compensating_writes_length; ++i) {
final cwi = cw[i];
final object_name = cwi.object_name.cast<Utf8>().toDartString();
final reason = cwi.reason.cast<Utf8>().toDartString();
final primary_key = cwi.primary_key.toDartValueByRef(null);
compensatingWrites.add(CompensatingWriteInfo(object_name, reason, RealmValue.from(primary_key)));
}
return CompensatingWriteError(message, compensatingWrites);
}
}

return SyncError.create(message, category, error_code.value, isFatal: is_fatal);
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/realm_dart_sync.cpp
Expand Up @@ -61,28 +61,72 @@ RLM_API void realm_dart_sync_client_log_callback(realm_userdata_t userdata, real
RLM_API void realm_dart_sync_error_handler_callback(realm_userdata_t userdata, realm_sync_session_t* session, realm_sync_error_t error)
{
// the pointers in error are to stack values, we need to make copies and move them into the scheduler invocation
struct compensating_write_copy {
std::string reason;
std::string object_name;
realm_value_t primary_key;
} cw_buf;

struct error_copy {
std::string message;
std::string detailed_message;
const char* c_original_file_path_key;
const char* c_recovery_file_path_key;
bool is_fatal;
bool is_unrecognized_by_client;
bool is_client_reset_requested;
realm_sync_error_action_e server_requests_action;
std::vector<std::pair<std::string, std::string>> user_info_values;
std::vector<realm_sync_error_user_info_t> user_info;
std::vector<compensating_write_copy> compensating_writes_values;
std::vector<realm_sync_error_compensating_write_info_t> compensating_writes;
} buf;

buf.message = error.error_code.message;
buf.detailed_message = error.detailed_message;
buf.c_original_file_path_key = error.c_original_file_path_key;
buf.c_recovery_file_path_key = error.c_recovery_file_path_key;
buf.is_fatal = error.is_fatal;
buf.is_unrecognized_by_client = error.is_unrecognized_by_client;
buf.is_client_reset_requested = error.is_client_reset_requested;
buf.server_requests_action = error.server_requests_action;
buf.user_info_values.reserve(error.user_info_length);
buf.user_info.reserve(error.user_info_length);
buf.compensating_writes_values.reserve(error.compensating_writes_length);
buf.compensating_writes.reserve(error.compensating_writes_length);

for (size_t i = 0; i < error.user_info_length; i++) {
auto& [key, value] = buf.user_info_values.emplace_back(error.user_info_map[i].key, error.user_info_map[i].value);
buf.user_info.push_back({ key.c_str(), value.c_str() });
}

for (size_t i = 0; i < error.compensating_writes_length; i++) {
auto cw = error.compensating_writes[i];
cw_buf.reason = cw.reason;
cw_buf.object_name = cw.object_name;
cw_buf.primary_key = cw.primary_key;

auto& cw_new = buf.compensating_writes_values.emplace_back(cw_buf);
realm_sync_error_compensating_write_info_t cw_new_copy;
cw_new_copy.reason = cw_new.reason.c_str();
cw_new_copy.object_name = cw_new.object_name.c_str();
cw_new_copy.primary_key = cw_new.primary_key;
buf.compensating_writes.push_back(cw_new_copy);
}

auto ud = reinterpret_cast<realm_dart_userdata_async_t>(userdata);
ud->scheduler->invoke([ud, session = *session, error = std::move(error), buf = std::move(buf)]() mutable {
//we moved buf so we need to update the error pointers here.
error.error_code.message = buf.message.c_str();
error.detailed_message = buf.detailed_message.c_str();
error.c_original_file_path_key = buf.c_original_file_path_key;
error.c_recovery_file_path_key = buf.c_recovery_file_path_key;
error.is_fatal = buf.is_fatal;
error.is_unrecognized_by_client = buf.is_unrecognized_by_client;
error.is_client_reset_requested = buf.is_client_reset_requested;
error.server_requests_action = buf.server_requests_action;
error.user_info_map = buf.user_info.data();
error.compensating_writes = buf.compensating_writes.data();
(reinterpret_cast<realm_sync_error_handler_func_t>(ud->dart_callback))(ud->handle, const_cast<realm_sync_session_t*>(&session), error);
});
}
Expand Down
29 changes: 29 additions & 0 deletions test/client_reset_test.dart
Expand Up @@ -509,6 +509,35 @@ Future<void> main([List<String>? args]) async {
_checkProducts(realmA, comparer, expectedList: [task0Id, task1Id, task3Id], notExpectedList: [task2Id]);
_checkProducts(realmB, comparer, expectedList: [task0Id, task1Id, task3Id], notExpectedList: [task2Id]);
});

baasTest('ClientResetError details are received', (appConfig) async {
final app = App(appConfig);
final user = await getIntegrationUser(app);

final resetCompleter = Completer<void>();
SyncError? error;
final config = Configuration.flexibleSync(
user,
[Task.schema, Schedule.schema],
clientResetHandler: ManualRecoveryHandler((syncError) {
error = syncError;
resetCompleter.complete();
}),
);

final realm = await getRealmAsync(config);
await realm.syncSession.waitForUpload();

await triggerClientReset(realm);
await waitFutureWithTimeout(resetCompleter.future, timeoutError: "ManualRecoveryHandler is not reported.");
expect(error, isA<ClientResetError>());
final clientResetError = error?.as<ClientResetError>();
expect(clientResetError?.category, SyncErrorCategory.client);
expect(clientResetError?.code, SyncClientErrorCode.autoClientResetFailure);
expect(clientResetError?.isFatal, isTrue);
expect(clientResetError?.backupFilePath, isNotEmpty);
expect(clientResetError?.originalFilePath, isNotEmpty);
});
}

Future<Realm> _syncRealmForUser<T extends RealmObject>(FlexibleSyncConfiguration config, [List<T>? items]) async {
Expand Down
18 changes: 11 additions & 7 deletions test/subscription_test.dart
Expand Up @@ -573,18 +573,22 @@ Future<void> main([List<String>? args]) async {
});
final realm = getRealm(config);
final query = realm.query<Product>(r'name BEGINSWITH $0', [productNamePrefix]);
if (realm.subscriptions.find(query) == null) {
realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.add(query));
}
realm.subscriptions.update((mutableSubscriptions) => mutableSubscriptions.add(query));
await realm.subscriptions.waitForSynchronization();
realm.write(() => realm.add(Product(ObjectId(), "doesn't match subscription")));

final productId = ObjectId();
realm.write(() => realm.add(Product(productId, "doesn't match subscription")));
await realm.syncSession.waitForUpload();

expect(compensatingWriteError, isA<SyncSessionError>());
final sessionError = compensatingWriteError.as<SyncSessionError>();
expect(compensatingWriteError, isA<CompensatingWriteError>());
final sessionError = compensatingWriteError.as<CompensatingWriteError>();
expect(sessionError.category, SyncErrorCategory.session);
expect(sessionError.isFatal, false);
expect(sessionError.code, SyncSessionErrorCode.compensatingWrite);
final writeReason = sessionError.compensatingWrites.first;
expect(writeReason, isNotNull);
expect(writeReason.objectType, "Product");
expect(writeReason.reason, 'write to "$productId" in table "${writeReason.objectType}" not allowed; object is outside of the current query view');
expect(writeReason.primaryKey.value, productId);
expect(sessionError.message!.startsWith('Client attempted a write that is outside of permissions or query filters'), isTrue);
});
}

0 comments on commit 95bb81a

Please sign in to comment.