Skip to content

Commit

Permalink
refactor xml parsing into XmlParser class for import
Browse files Browse the repository at this point in the history
  • Loading branch information
MyPyDavid committed Feb 15, 2024
1 parent 8995ac2 commit b1acb97
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 120 deletions.
112 changes: 107 additions & 5 deletions rdmo/core/xml.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import logging
import re
from collections import OrderedDict
from dataclasses import dataclass, field
from pathlib import Path

from django.utils.translation import gettext_lazy as _

import defusedxml.ElementTree as ET
from packaging.version import parse

log = logging.getLogger(__name__)
from rdmo import __version__ as VERSION

logger = logging.getLogger(__name__)

models = {
'catalog': 'questions.catalog',
Expand All @@ -20,12 +27,100 @@
'view': 'views.view'
}

DEFAULT_RDMO_XML_VERSION = '1.11.0'


@dataclass
class XmlParser:

file_name:str = None
# post init attributes
file:Path = None # will be set from file_name
root = None
errors: list = field(default_factory=list)
parsed_elements: OrderedDict = field(default_factory=OrderedDict)

def __post_init__(self):
if self.file_name is None:
raise ValueError("File name is required.")
self.file = Path(self.file_name).resolve()
if not self.file.exists():
raise ValueError(f"File does not exist. {self.file}")

elements = self.parse_xml_to_elements(self.file)
self.parsed_elements = elements
self.errors.reverse()

def is_valid(self, raise_exception: bool = False) -> bool:
if self.errors and raise_exception: # raise for errors
raise ValueError(self.errors)
return not bool(self.errors)

def parse_xml_to_elements(self, xml_file: Path, raise_exception:bool=False) -> None:
root = None
# step 2: parse xml
try:
root = read_xml_file(self.file, raise_exception=True)
except Exception as e:
self.errors.append(_('XML Parsing Error') + f': {e!s}')
logger.info('XML parsing error. Import failed.')

if root is None:
self.errors.append(_('The content of the xml file does not consist of well formed data or markup.'))
return
elif root.tag != 'rdmo':
self.errors.append(_('This XML does not contain RDMO content.'))
return
self.root = root

# step 2.1: validate parsed xml
root_version = root.attrib.get('version') or DEFAULT_RDMO_XML_VERSION
parsed_version, rdmo_version = parse(root_version), parse(VERSION)
if parsed_version > rdmo_version:
logger.info(f'Import failed version validation ({parsed_version} > {rdmo_version})')
self.errors.append(_('This RDMO XML file does not have a valid version number.'))
self.errors.append(f'RDMO XML Version: {root_version}')
return

# step 3: create element dicts from xml
try:
elements = flat_xml_to_elements(root)
except KeyError as e:
logger.info('Import failed with KeyError (%s)' % e)
self.errors.append(_('This is not a valid RDMO XML file.'))
except TypeError as e:
logger.info('Import failed with TypeError (%s)' % e)
self.errors.append(_('This is not a valid RDMO XML file.'))
except AttributeError as e:
logger.info('Import failed with AttributeError (%s)' % e)
self.errors.append(_('This is not a valid RDMO XML file.'))
if self.errors:
return

# step 3.1: validate elements for legacy versions
try:
pre_conversion_validate_legacy_elements(elements, parsed_version)
except ValueError as e:
logger.info('Import failed with ValueError (%s)' % e)
self.errors.append(_('This is not a valid RDMO XML file.'))
self.errors.append(_('XML Parsing Error') + f': {e!s}')
if self.errors:
return
# step 4: convert elements from previous versions
elements = convert_elements(elements, root_version)

# step 5: order the elements and return
elements = order_elements(elements)

logger.info(f'XML parsing of {self.file.name} success (length: {len(elements)}).')
return elements


def read_xml_file(file_name, raise_exception=False):
try:
return ET.parse(file_name).getroot()
except Exception as e:
log.error('Xml parsing error: ' + str(e))
logger.error('Xml parsing error: ' + str(e))
if raise_exception:
raise e from e

Expand All @@ -34,7 +129,7 @@ def parse_xml_string(string):
try:
return ET.fromstring(string)
except Exception as e:
log.error('Xml parsing error: ' + str(e))
logger.error('Xml parsing error: ' + str(e))


def flat_xml_to_elements(root):
Expand Down Expand Up @@ -121,7 +216,7 @@ def strip_ns(tag, ns_map):

def convert_elements(elements, version):
parsed_version = parse('1.11.0') if version is None else parse(version)

pre_conversion_validate_legacy_elements(elements, parsed_version)
if parsed_version < parse('2.0.0'):
elements = convert_legacy_elements(elements)

Expand All @@ -131,6 +226,13 @@ def convert_elements(elements, version):
return elements


def pre_conversion_validate_legacy_elements(elements, parsed_version):
if parsed_version < parse('2.0.0'):
_keys_in_elements = list(filter(lambda x: 'key' in x, elements.values()))
if not _keys_in_elements:
raise ValueError(f"Missing legacy elements, elements containing 'key' were expected for this XML with version {parsed_version}.") # noqa: E501


def convert_legacy_elements(elements):
# first pass: identify pages
for uri, element in elements.items():
Expand Down Expand Up @@ -230,7 +332,7 @@ def convert_additional_input(elements):


def order_elements(elements):
ordered_elements = {}
ordered_elements = OrderedDict()
for uri, element in elements.items():
append_element(ordered_elements, elements, uri, element)
return ordered_elements
Expand Down
6 changes: 3 additions & 3 deletions rdmo/management/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@
}


def import_elements(uploaded_elements: List[Dict], save: bool = True, request: Optional[HttpRequest] = None):
def import_elements(uploaded_elements: Dict, save: bool = True, request: Optional[HttpRequest] = None) -> List[Dict]:
imported_elements = []
uploaded_uris = {i.get('uri') for i in uploaded_elements}
uploaded_uris = set(uploaded_elements.keys())
current_site = get_current_site(request)
questions_widget_types = get_widget_types()
for uploaded_element in uploaded_elements:
for _uri, uploaded_element in uploaded_elements.items():
element = import_element(element=uploaded_element, save=save, uploaded_uris=uploaded_uris,
request=request, current_site=current_site,
questions_widget_types=questions_widget_types)
Expand Down
29 changes: 14 additions & 15 deletions rdmo/management/management/commands/import.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging

from django.core.management.base import BaseCommand, CommandError
from django.utils.translation import gettext_lazy as _

from rdmo.core.xml import convert_elements, flat_xml_to_elements, order_elements, read_xml_file
from rdmo.core.xml import XmlParser
from rdmo.management.imports import import_elements

logger = logging.getLogger(__name__)
Expand All @@ -15,16 +14,16 @@ def add_arguments(self, parser):
parser.add_argument('xmlfile', action='store', default=False, help='RDMO XML export file')

def handle(self, *args, **options):
root = read_xml_file(options['xmlfile'])
if root is None:
raise CommandError(_('The content of the xml file does not consist of well formed data or markup.'))
elif root.tag != 'rdmo':
raise CommandError(_('This XML does not contain RDMO content.'))
else:
version = root.attrib.get('version')
elements = flat_xml_to_elements(root)
elements = convert_elements(elements, version)
elements = order_elements(elements)
parsed_elements = list(elements.values())

import_elements(parsed_elements)

try:
xml_parser = XmlParser(file_name=options['xmlfile'])
except CommandError as e:
logger.info('Import failed with XML parsing errors.')
raise CommandError(str(e)) from e

# step 7: check if valid
if not xml_parser.is_valid():
logger.info('Import failed with XML validation errors.')
raise CommandError(" ".join(map(str, xml_parser.errors)))

import_elements(xml_parser.parsed_elements)
44 changes: 29 additions & 15 deletions rdmo/management/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
from rdmo.core.xml import convert_elements, flat_xml_to_elements, order_elements, read_xml_file

from collections import OrderedDict
from typing import Dict, List, Tuple

from rdmo.core.xml import XmlParser

xml_error_files = [
('file-does-not-exist.xml', 'may not be blank'),
('xml/error.xml', 'syntax error'),
('xml/error-version.xml', 'RDMO XML Version: 99'),
('xml/elements/legacy/catalog-error-key.xml', 'Missing legacy elements'),
]

def read_xml_and_parse_to_elements(xml_file):
root = read_xml_file(xml_file)
version = root.attrib.get('version')
elements = flat_xml_to_elements(root)
elements = convert_elements(elements, version)
elements = order_elements(elements)
parsed_elements = list(elements.values())
return parsed_elements, root

def change_fields_elements(elements, update_dict=None, n=3):
xml_parser = XmlParser(file_name=xml_file)
if xml_parser.errors:
_msg = "\n".join(xml_parser.errors)
raise ValueError(f"This test function should NOT raise any Exceptions. {_msg!s}")
return xml_parser.parsed_elements, xml_parser.root

def _test_helper_change_fields_elements(elements, update_dict=None, n=3) -> Tuple[Dict, List]:
""" xml test preparation function """

update_dict = update_dict if update_dict is not None else {}
_default_update_dict = {'comment': "this is a test comment {}"}
update_dict.update(**_default_update_dict)

if len(elements) < n:
raise ValueError("Length of elements should not be smaller than n.")
_new_elements = []
_changed_elements = []
for _n,_element in enumerate(elements):
_new_elements = OrderedDict()
_changed_elements = OrderedDict()
for _n,(_uri, _element) in enumerate(elements.items()):
if _n <= n-1:
updated_and_changed = {}
changed_element = _element
Expand All @@ -31,6 +41,10 @@ def change_fields_elements(elements, update_dict=None, n=3):
_element[k] = val
if updated_and_changed:
changed_element['updated_and_changed'] = updated_and_changed
_changed_elements.append(changed_element)
_new_elements.append(_element)
return _new_elements, _changed_elements
_changed_elements[_uri] = changed_element
_new_elements[_uri] = _element
return _new_elements, list(_changed_elements.values())

def _test_helper_filter_updated_and_changed(elements: List[Dict]) -> List[Dict]:
filtered_elements = filter(lambda x: x.get('updated_and_changed', False), elements)
return list(filtered_elements)
2 changes: 1 addition & 1 deletion rdmo/management/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_import_error(db, settings):
with pytest.raises(CommandError) as e:
call_command('import', xml_file, stdout=stdout, stderr=stderr)

assert str(e.value) == 'The content of the xml file does not consist of well formed data or markup.'
assert str(e.value).startswith('The content of the xml file does not consist of well formed data or markup.')


def test_import_error2(db, settings):
Expand Down
10 changes: 7 additions & 3 deletions rdmo/management/tests/test_import_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from rdmo.conditions.models import Condition
from rdmo.management.imports import import_elements

from . import change_fields_elements, read_xml_and_parse_to_elements
from . import (
_test_helper_change_fields_elements,
_test_helper_filter_updated_and_changed,
read_xml_and_parse_to_elements,
)

imported_update_changes = [
None,
Expand Down Expand Up @@ -44,9 +48,9 @@ def test_update_conditions_with_changed_fields(db, settings, update_dict):
xml_file = Path(settings.BASE_DIR) / 'xml' / 'elements' / 'conditions.xml'

elements, root = read_xml_and_parse_to_elements(xml_file)
elements, changed_elements = change_fields_elements(elements, update_dict=update_dict, n=7)
elements, changed_elements = _test_helper_change_fields_elements(elements, update_dict=update_dict, n=7)
imported_elements = import_elements(elements)
imported_and_changed = [i for i in elements if i['updated_and_changed']]
imported_and_changed = _test_helper_filter_updated_and_changed(imported_elements)
assert len(root) == len(imported_elements) == 15
assert all(element['created'] is False for element in imported_elements)
assert all(element['updated'] is True for element in imported_elements)
Expand Down
11 changes: 8 additions & 3 deletions rdmo/management/tests/test_import_domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from rdmo.domain.models import Attribute
from rdmo.management.imports import import_elements

from . import change_fields_elements, read_xml_and_parse_to_elements
from . import (
_test_helper_change_fields_elements,
_test_helper_filter_updated_and_changed,
read_xml_and_parse_to_elements,
)


def test_create_domain(db, settings):
Expand Down Expand Up @@ -34,9 +38,10 @@ def test_update_attributes_with_changed_fields(db, settings):
xml_file = Path(settings.BASE_DIR) / 'xml' / 'elements' / 'attributes.xml'

elements, root = read_xml_and_parse_to_elements(xml_file)
elements, changed_elements = change_fields_elements(elements, n=50)
_change_count = Attribute.objects.count() / 2
elements, changed_elements = _test_helper_change_fields_elements(elements, n=_change_count)
imported_elements = import_elements(elements)
imported_and_changed = [i for i in elements if i['updated_and_changed']]
imported_and_changed = _test_helper_filter_updated_and_changed(imported_elements)
assert len(root) == len(imported_elements) == 86
assert all(element['created'] is False for element in imported_elements)
assert all(element['updated'] is True for element in imported_elements)
Expand Down
15 changes: 10 additions & 5 deletions rdmo/management/tests/test_import_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from rdmo.management.imports import import_elements
from rdmo.options.models import Option, OptionSet

from . import change_fields_elements, read_xml_and_parse_to_elements
from . import (
_test_helper_change_fields_elements,
_test_helper_filter_updated_and_changed,
read_xml_and_parse_to_elements,
)

imported_update_changes = [None]

Expand Down Expand Up @@ -45,10 +49,11 @@ def test_update_optionsets_with_changed_fields(db, settings, update_dict):
imported_elements = import_elements(elements)
assert len(root) == len(imported_elements) == 13
# start test with fresh options in db
elements, changed_elements = change_fields_elements(elements, update_dict=update_dict, n=7)
_n_change = int(Option.objects.count() / 2)
elements, changed_elements = _test_helper_change_fields_elements(elements, update_dict=update_dict, n=7)
imported_elements = import_elements(elements)
assert len(root) == len(imported_elements) == 13
imported_and_changed = [i for i in elements if i['updated_and_changed']]
imported_and_changed = _test_helper_filter_updated_and_changed(imported_elements)
assert all(element['created'] is False for element in imported_elements)
assert all(element['updated'] is True for element in imported_elements)
assert len(imported_and_changed) == len(changed_elements)
Expand Down Expand Up @@ -91,9 +96,9 @@ def test_update_options_with_changed_fields(db, settings, update_dict):
imported_elements = import_elements(elements)
assert len(root) == len(imported_elements) == 9
# start test with fresh options in db
elements, changed_elements = change_fields_elements(elements, update_dict=update_dict, n=4)
elements, changed_elements = _test_helper_change_fields_elements(elements, update_dict=update_dict, n=4)
imported_elements = import_elements(elements)
imported_and_changed = [i for i in elements if i['updated_and_changed']]
imported_and_changed = _test_helper_filter_updated_and_changed(imported_elements)
assert len(root) == len(imported_elements) == 9
assert all(element['created'] is False for element in imported_elements)
assert all(element['updated'] is True for element in imported_elements)
Expand Down

0 comments on commit b1acb97

Please sign in to comment.