From e5ff6b44a4748e1bf187f2e04481f743c7505ddc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kopeck=C3=BD?= Date: Wed, 2 Dec 2020 11:42:35 +0100 Subject: [PATCH 1/3] Addded pre and post processing --- example/post_processors.py | 6 +++ example/pre_processors.py | 6 +++ oarepo_oai_pmh_harvester/decorators.py | 14 +++++ oarepo_oai_pmh_harvester/ext.py | 57 +++++++++++++++++---- oarepo_oai_pmh_harvester/synchronization.py | 12 ++++- tests/conftest.py | 8 ++- 6 files changed, 91 insertions(+), 12 deletions(-) create mode 100644 example/post_processors.py create mode 100644 example/pre_processors.py diff --git a/example/post_processors.py b/example/post_processors.py new file mode 100644 index 0000000..1686300 --- /dev/null +++ b/example/post_processors.py @@ -0,0 +1,6 @@ +from oarepo_oai_pmh_harvester.decorators import post_processor + + +@post_processor("uk", "xoai") +def post_processor_1(data): + return data diff --git a/example/pre_processors.py b/example/pre_processors.py new file mode 100644 index 0000000..8e4f733 --- /dev/null +++ b/example/pre_processors.py @@ -0,0 +1,6 @@ +from oarepo_oai_pmh_harvester.decorators import pre_processor + + +@pre_processor("uk", "xoai") +def pre_processor_1(data): + return data diff --git a/oarepo_oai_pmh_harvester/decorators.py b/oarepo_oai_pmh_harvester/decorators.py index 5ea8cd1..409dfca 100644 --- a/oarepo_oai_pmh_harvester/decorators.py +++ b/oarepo_oai_pmh_harvester/decorators.py @@ -21,3 +21,17 @@ def wrapper(func): current_oai_client.add_endpoint_handler(func, provider, parser) return wrapper + + +def pre_processor(provider, parser): + def wrapper(func): + current_oai_client.add_pre_processor(func, provider, parser) + + return wrapper + + +def post_processor(provider, parser): + def wrapper(func): + current_oai_client.add_post_processor(func, provider, parser) + + return wrapper diff --git a/oarepo_oai_pmh_harvester/ext.py b/oarepo_oai_pmh_harvester/ext.py index 482b63a..01500ca 100644 --- a/oarepo_oai_pmh_harvester/ext.py +++ b/oarepo_oai_pmh_harvester/ext.py @@ -25,7 +25,8 @@ def __call__(cls, *args, **kwargs): class OArepoOAIClientState(metaclass=Singleton): def __init__(self, app, _rules: defaultdict = None, _parsers: defaultdict = None, _providers: dict = None, _synchronizers=None, transformer_class=OAITransformer, - _endpoints=None, endpoint_handlers: dict = None): + _endpoints=None, endpoint_handlers: dict = None, _pre_processors: dict = None, + _post_processors: dict = None): self.app = app self._rules = _rules self._parsers = _parsers @@ -34,6 +35,8 @@ def __init__(self, app, _rules: defaultdict = None, _parsers: defaultdict = None self._synchronizers = _synchronizers self.transformer_class = transformer_class self._endpoints = _endpoints + self._pre_processors = _pre_processors + self._post_processors = _post_processors @property def providers(self): @@ -53,6 +56,12 @@ def parsers(self): self._load_parsers() return self._parsers + @property + def endpoints(self): + if not self._endpoints: + self.load_endpoints() + return self._endpoints + @property def endpoint_handlers(self): if self._endpoint_handlers is None: @@ -60,10 +69,16 @@ def endpoint_handlers(self): return self._endpoint_handlers @property - def endpoints(self): - if not self._endpoints: - self.load_endpoints() - return self._endpoints + def pre_processors(self): + if self._pre_processors is None: + self._load_pre_processors() + return self._pre_processors + + @property + def post_processors(self): + if self._post_processors is None: + self._load_post_processors() + return self._post_processors def load_endpoints(self): res = {} @@ -86,6 +101,14 @@ def _load_endpoint_handlers(self): for ep in iter_entry_points('oarepo_oai_pmh_harvester.mappings'): ep.load() + def _load_pre_processors(self): + for ep in iter_entry_points('oarepo_oai_pmh_harvester.pre_processors'): + ep.load() + + def _load_post_processors(self): + for ep in iter_entry_points('oarepo_oai_pmh_harvester.post_processors'): + ep.load() + def create_providers(self): providers = self.app.config.get("OAREPO_OAI_PROVIDERS") if providers: @@ -107,15 +130,25 @@ def add_rule(self, func, provider, parser_name, path, phase): self._rules = infinite_dd() self._rules[provider][parser_name][path][phase] = func + def add_parser(self, func, name): + if not self._parsers: + self._parsers = infinite_dd() + self._parsers[name] = func + def add_endpoint_handler(self, func, provider, parser_name): if not self._endpoint_handlers: self._endpoint_handlers = infinite_dd() self._endpoint_handlers[provider][parser_name] = func - def add_parser(self, func, name): - if not self._parsers: - self._parsers = infinite_dd() - self._parsers[name] = func + def add_pre_processor(self, func, provider, parser_name): + if not self._pre_processors: + self._pre_processors = defaultdict(lambda: defaultdict(list)) + self._pre_processors[provider][parser_name].append(func) + + def add_post_processor(self, func, provider, parser_name): + if not self._post_processors: + self._post_processors = defaultdict(lambda: defaultdict(list)) + self._post_processors[provider][parser_name].append(func) def create_synchronizer(self, provider_code, config): return OAISynchronizer( @@ -133,7 +166,11 @@ def create_synchronizer(self, provider_code, config): endpoint_mapping=config.get("endpoint_mapping", {}), from_=config.get("from"), endpoint_handler=self.endpoint_handlers, - bulk=config.get("bulk", True) + bulk=config.get("bulk", True), + pre_processors=self.pre_processors[provider_code][ + config["metadata_prefix"]] if self.pre_processors else None, + post_processors=self.post_processors[provider_code][ + config["metadata_prefix"]] if self.post_processors else None ) def run(self, providers_codes: List[str] = None, synchronizers_codes: List[str] = None, diff --git a/oarepo_oai_pmh_harvester/synchronization.py b/oarepo_oai_pmh_harvester/synchronization.py index eed100f..68cccf2 100644 --- a/oarepo_oai_pmh_harvester/synchronization.py +++ b/oarepo_oai_pmh_harvester/synchronization.py @@ -43,7 +43,9 @@ def __init__( pid_field=None, from_: str = None, endpoint_handler: dict = None, - bulk: bool = True + bulk: bool = True, + pre_processors: dict = None, + post_processors: dict = None ): # Counters @@ -78,6 +80,8 @@ def __init__( self.from_ = from_ self.endpoint_handler = endpoint_handler self.bulk = bulk + self.pre_processors = pre_processors + self.post_processors = post_processors @property def from_(self): @@ -288,7 +292,13 @@ def create_or_update(self, oai_identifier, datestamp: str, oai_rec=None, xml: _E if not xml: xml = self.get_xml(oai_identifier) parsed = self.parse(xml) + if self.pre_processors: + for processor in self.pre_processors: + parsed = processor(parsed) transformed = self.transform(parsed) + if self.post_processors: + for processor in self.post_processors: + transformed = processor(transformed) transformed.update(self.constant_fields) if oai_rec is None: diff --git a/tests/conftest.py b/tests/conftest.py index c600688..a27ce59 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -172,10 +172,16 @@ def load_entry_points(): entry_point = pkg_resources.EntryPoint.parse('xoai = example.parser', dist=distribution) entry_point2 = pkg_resources.EntryPoint.parse('rule = example.rules.uk.rule', dist=distribution) entry_point3 = pkg_resources.EntryPoint.parse('handler = example.mapping', dist=distribution) + entry_point4 = pkg_resources.EntryPoint.parse('pre_processor = example.pre_processors', + dist=distribution) + entry_point5 = pkg_resources.EntryPoint.parse('post_processor = example.post_processors', + dist=distribution) distribution._ep_map = { 'oarepo_oai_pmh_harvester.parsers': {'xoai': entry_point}, 'oarepo_oai_pmh_harvester.rules': {'rule': entry_point2}, - 'oarepo_oai_pmh_harvester.mappings': {'handler': entry_point3} + 'oarepo_oai_pmh_harvester.mappings': {'handler': entry_point3}, + 'oarepo_oai_pmh_harvester.pre_processors': {'pre_processor': entry_point4}, + 'oarepo_oai_pmh_harvester.post_processors': {'post_processor': entry_point5}, } pkg_resources.working_set.add(distribution) From 20583ecfb70daa2264dd26092e69472f13f74dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kopeck=C3=BD?= Date: Wed, 2 Dec 2020 12:04:52 +0100 Subject: [PATCH 2/3] Updated README --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/README.md b/README.md index 3f68422..31da6af 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,8 @@ Successful data collection requires several steps, which consist of: 1. **Configuration** (see configuration chapter) 1. **Parser**: function that converts XML into JSON 1. **Rules**: functions that convert raw JSON (from parser) into final JSON +1. **Processors (optional)**: two type of function (pre and post), that enable change data either before + transformation (pre_processor) or after transformation (post_processor) ### Parsers @@ -196,6 +198,58 @@ def rule(el, **kwargs): value_ = el[0]["value"][0] return {"title": value_} ``` +### Processors +The downloaded XML is first converted to JSON and then this JSON is remapped to JSON according to our model +. Sometimes it is necessary to modify the input or output JSON and the **Processors** are used for this purpose +. There are two types of processors pre and post. + +#### Pre-processor +It is used for updating data before transformation. Pre processor is registered similarly as other components. It is + necessary to register entry point and mark function with decorator. + + ```python +entry_points={ + 'oarepo_oai_pmh_harvester.pre_processors': [ + ' = example.pre_processors', + ], + } +``` + + Example of a pre_processor: + +```python +from oarepo_oai_pmh_harvester.decorators import pre_processor + + +@pre_processor("provider_name", "metadata_prefix") +def pre_processor_1(data): + data = data.update({"some_change": "change"}) + return data +``` + +#### Post-processor +It is used for updating data after transformation. + + ```python +entry_points={ + 'oarepo_oai_pmh_harvester.post_processors': [ + ' = example.post_processors', + ], + } +``` + + Example of a pre_processor: + +```python +from oarepo_oai_pmh_harvester.decorators import post_processor + + +@post_processor("provider_name", "metadata_prefix") +def pre_processor_1(data): + data = data.update({"some_change_2": "change_2"}) + return data +``` + ### CLI If all components (config, parser, rules) are set, the program can be run via the CLI: From ac728396e97d698c229fb3e3ceeac1f141f88d57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kopeck=C3=BD?= Date: Wed, 2 Dec 2020 12:05:43 +0100 Subject: [PATCH 3/3] Upgrade version --- oarepo_oai_pmh_harvester/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oarepo_oai_pmh_harvester/version.py b/oarepo_oai_pmh_harvester/version.py index 732ce2b..5b02c24 100644 --- a/oarepo_oai_pmh_harvester/version.py +++ b/oarepo_oai_pmh_harvester/version.py @@ -13,4 +13,4 @@ from __future__ import absolute_import, print_function -__version__ = '2.0.0a14' +__version__ = '2.0.0a15'