-
Notifications
You must be signed in to change notification settings - Fork 38
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* Changes to agent merging and uniqueness rules * Simplify uniqueness constructions * Misc improvement * Handle _id not being added to django models * Ignore cases where parent_id is undefined * Handle ObjectDoesNotExist later * Infer institution to global level * Renable accession agent tests * Revert previous commit * Add unit test for merging within collecting event * Typo fix and only ids fetch in unit test * Revert linting savepoint * Remove uncommitable file * Add ordering to dedup * Fix failing test * Reverse collector order for unit tests * Add unit test for rollback on exception * Minor typo fix in unit test * Stricten unit test internal error * Use resolve record merge utility * Improvements to sync error and unit test * Revert to using client post for rollback * formatting changes --------- Co-authored-by: alec_dev <acwhite211@gmail.com>
- Loading branch information
1 parent
3b6a6ab
commit 792d6ef
Showing
3 changed files
with
448 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,180 @@ | ||
import json | ||
from django.core.exceptions import ObjectDoesNotExist | ||
from typing import Dict, List, Union | ||
from typing import Dict, List, Union, Tuple | ||
from specifyweb.specify import models | ||
from .orm_signal_handler import orm_signal_handler | ||
from .exceptions import BusinessRuleException | ||
from specifyweb.middleware.general import serialize_django_obj | ||
|
||
def make_uniqueness_rule(model_name, parent_field, unique_field): | ||
|
||
def make_uniqueness_rule(model_name, | ||
rule_fields: Tuple[Tuple[str], Tuple[str]]): | ||
model = getattr(models, model_name) | ||
table_name = models.datamodel.get_table(model_name).name | ||
if parent_field is None: | ||
# uniqueness is global | ||
@orm_signal_handler('pre_save', model_name) | ||
def check_unique(instance): | ||
value = getattr(instance, unique_field) | ||
if value is None: return | ||
conflicts = model.objects.only('id').filter(**{ | ||
unique_field: value}) | ||
if instance.id is not None: | ||
conflicts = conflicts.exclude(id=instance.id) | ||
if conflicts: | ||
raise BusinessRuleException( | ||
"{} must have unique {}".format(table_name, unique_field), | ||
{"table" : table_name, | ||
"localizationKey" : "fieldNotUnique", | ||
"fieldName" : unique_field, | ||
"fieldData" : (unique_field, serialize_django_obj(value)), | ||
"conflicting" : list(conflicts.values_list('id', flat=True)[:100])}) | ||
else: | ||
@orm_signal_handler('pre_save', model_name) | ||
def check_unique(instance): | ||
if isinstance(parent_field, dict): return | ||
base_fields = rule_fields[0] | ||
parent_fields = rule_fields[1] | ||
all_fields = [field for partition in rule_fields for field in partition] | ||
|
||
def get_matchable(instance): | ||
def best_match_or_none(field_name): | ||
try: | ||
parent = getattr(instance, parent_field + '_id', None) | ||
object_or_field = getattr(instance, field_name, None) | ||
if object_or_field is None: | ||
return None | ||
if not hasattr(object_or_field, 'id'): | ||
return field_name, object_or_field | ||
if hasattr(instance, field_name+'_id'): | ||
return field_name+'_id', object_or_field.id | ||
|
||
except ObjectDoesNotExist: | ||
parent = None | ||
|
||
if parent is None: return | ||
value = getattr(instance, unique_field) | ||
if value is None: return | ||
conflicts = model.objects.only('id').filter(**{ | ||
parent_field + '_id': parent, | ||
unique_field: value}) | ||
if instance.id is not None: | ||
conflicts = conflicts.exclude(id=instance.id) | ||
if conflicts: | ||
raise BusinessRuleException( | ||
"{} must have unique {} in {}".format(table_name, unique_field, parent_field), | ||
{"table" : table_name, | ||
"localizationKey" : "childFieldNotUnique", | ||
"fieldName" : unique_field, | ||
"fieldData" : (unique_field, serialize_django_obj(value)), | ||
"parentField" : parent_field, | ||
"parentData" : f"{parent_field}: id={parent}", | ||
"conflicting" : list(conflicts.values_list('id', flat=True)[:100])}) | ||
pass | ||
return None | ||
|
||
matchable = {} | ||
field_mapping = {} | ||
for field in all_fields: | ||
matched_or_none = best_match_or_none(field) | ||
if matched_or_none is not None: | ||
field_mapping[field] = matched_or_none[0] | ||
matchable[matched_or_none[0]] = matched_or_none[1] | ||
|
||
if len(matchable) != len(all_fields): | ||
# if any field is missing, pass | ||
return None | ||
|
||
return field_mapping, matchable | ||
|
||
def get_exception(conflicts, matchable, field_map): | ||
error_message = '{} must have unique {}'.format(table_name, | ||
join_with_and(base_fields)) | ||
|
||
response = {"table": table_name, | ||
"localizationKey": "fieldNotUnique" | ||
if len(parent_fields) == 0 | ||
else "childFieldNotUnique", | ||
"fieldName": ','.join(base_fields), | ||
"fieldData": serialize_multiple_django(matchable, field_map, base_fields), | ||
} | ||
|
||
if len(parent_fields) > 0: | ||
error_message += ' in {}'.format(join_with_and(parent_fields)) \ | ||
if len(parent_fields) > 0 else '' | ||
response.update({ | ||
"parentField": ','.join(parent_fields), | ||
"parentData": serialize_multiple_django(matchable, field_map, parent_fields) | ||
}) | ||
response['conflicting'] = list( | ||
conflicts.values_list('id', flat=True)[:100]) | ||
return BusinessRuleException(error_message, response) | ||
|
||
@orm_signal_handler('pre_save', model_name) | ||
def check_unique(instance): | ||
match_result = get_matchable(instance) | ||
if match_result is None: | ||
return | ||
field_map, matchable = match_result | ||
conflicts = model.objects.only('id').filter(**matchable) | ||
if instance.id is not None: | ||
conflicts = conflicts.exclude(id=instance.id) | ||
if conflicts: | ||
raise get_exception(conflicts, matchable, field_map) | ||
return check_unique | ||
|
||
RAW_UNIQUENESS_RULES: Dict[str, Dict[str, List[Union[Dict[str, Union[str, list]], str, None]]]] = \ | ||
json.load(open('specifyweb/frontend/js_src/lib/components/DataModel/uniquness_rules.json')) | ||
|
||
def join_with_and(fields): | ||
return ' and '.join(fields) | ||
|
||
|
||
RAW_UNIQUENESS_RULES: Dict[ | ||
str, Dict[str, List[Union[Dict[str, Union[str, list]], str, None]]]] = \ | ||
json.load(open( | ||
'specifyweb/frontend/js_src/lib/components/DataModel/uniquness_rules.json')) | ||
|
||
''' | ||
The current definition of uniqueness rules are rather inconvenient. | ||
For example, a definition like | ||
"AccessionAgent":{ | ||
"role":[ | ||
{ | ||
"field":"accession", | ||
"otherFields":[ | ||
"agent" | ||
] | ||
}, | ||
{ | ||
"field":"repositoryagreement", | ||
"otherFields":[ | ||
"agent" | ||
] | ||
} | ||
], | ||
"agent":[ | ||
{ | ||
"field":"accession", | ||
"otherFields":[ | ||
"role" | ||
] | ||
}, | ||
{ | ||
"field":"repositoryagreement", | ||
"otherFields":[ | ||
"role" | ||
] | ||
} | ||
] | ||
} | ||
can simply be | ||
"AccessionAgent": [ | ||
(("role", "agent"), ("accession")), | ||
(("role", "agent"), ("repositoryagreement")) | ||
] | ||
The second format also makes it much easier to construct django queries. | ||
So, parse_uniqueness_rules() automatically converts the current representation | ||
TODO: Refactor front-end to use this instead | ||
''' | ||
|
||
|
||
def parse_uniqueness_rules(): | ||
PARSED_UNIQUENESS_RULES = {} | ||
for table, rules in RAW_UNIQUENESS_RULES.items(): | ||
table = table.lower().capitalize() | ||
if hasattr(models, table): | ||
PARSED_UNIQUENESS_RULES[table] = {} | ||
PARSED_UNIQUENESS_RULES[table] = [] | ||
for field_name, rule in rules.items(): | ||
# The Specify Model field names are always in lowercase | ||
field_name = field_name.lower() | ||
PARSED_UNIQUENESS_RULES[table][field_name] = rule | ||
|
||
for rule_fields in rule: | ||
child, parent = resolve_child_parent(field_name, | ||
rule_fields) | ||
matching_rule = [matched_rule | ||
for matched_rule in | ||
PARSED_UNIQUENESS_RULES[table] | ||
if matched_rule == (child, parent)] | ||
if len(matching_rule) == 0: | ||
PARSED_UNIQUENESS_RULES[table].append((child, parent)) | ||
return PARSED_UNIQUENESS_RULES | ||
|
||
UNIQUENESS_RULES = parse_uniqueness_rules() | ||
|
||
def resolve_child_parent(field, rule_instance): | ||
child = [field] | ||
parent = [] | ||
if isinstance(rule_instance, dict): | ||
parent.append(rule_instance['field']) | ||
child.extend(rule_instance['otherFields']) | ||
else: | ||
if rule_instance is not None and rule_instance != 'institution': | ||
parent.append(rule_instance) | ||
child.sort() | ||
return tuple(child), tuple(parent) | ||
|
||
|
||
def serialize_multiple_django(matchable, field_map, fields): | ||
return {field: serialize_django_obj(matchable[field_map[field]]) | ||
for field in fields} | ||
|
||
UNIQUENESS_RULES = parse_uniqueness_rules() | ||
|
||
uniqueness_rules = [make_uniqueness_rule(model, parent_field, unique_field) | ||
uniqueness_rules = [make_uniqueness_rule(model, rule_field) | ||
for model, rules in list(UNIQUENESS_RULES.items()) | ||
for unique_field, parent_fields in list(rules.items()) | ||
for parent_field in parent_fields] | ||
for rule_field in rules | ||
] |
Oops, something went wrong.