Skip to content

Commit

Permalink
Merge pull request #17 from oarepo/develop
Browse files Browse the repository at this point in the history
Added pre and post processors
  • Loading branch information
Semtexcz committed Dec 2, 2020
2 parents 6fd7777 + ac72839 commit e433214
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 13 deletions.
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ Successful data collection requires several steps, which consist of:
1. **Configuration** (see configuration chapter)
1. **Parser**: function that converts XML into JSON
1. **Rules**: functions that convert raw JSON (from parser) into final JSON
1. **Processors (optional)**: two type of function (pre and post), that enable change data either before
transformation (pre_processor) or after transformation (post_processor)

### Parsers

Expand Down Expand Up @@ -196,6 +198,58 @@ def rule(el, **kwargs):
value_ = el[0]["value"][0]
return {"title": value_}
```
### Processors
The downloaded XML is first converted to JSON and then this JSON is remapped to JSON according to our model
. Sometimes it is necessary to modify the input or output JSON and the **Processors** are used for this purpose
. There are two types of processors pre and post.

#### Pre-processor
It is used for updating data before transformation. Pre processor is registered similarly as other components. It is
necessary to register entry point and mark function with decorator.

```python
entry_points={
'oarepo_oai_pmh_harvester.pre_processors': [
'<name> = example.pre_processors',
],
}
```

Example of a pre_processor:

```python
from oarepo_oai_pmh_harvester.decorators import pre_processor


@pre_processor("provider_name", "metadata_prefix")
def pre_processor_1(data):
data = data.update({"some_change": "change"})
return data
```

#### Post-processor
It is used for updating data after transformation.

```python
entry_points={
'oarepo_oai_pmh_harvester.post_processors': [
'<name> = example.post_processors',
],
}
```

Example of a pre_processor:

```python
from oarepo_oai_pmh_harvester.decorators import post_processor


@post_processor("provider_name", "metadata_prefix")
def pre_processor_1(data):
data = data.update({"some_change_2": "change_2"})
return data
```


### CLI
If all components (config, parser, rules) are set, the program can be run via the CLI:
Expand Down
6 changes: 6 additions & 0 deletions example/post_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from oarepo_oai_pmh_harvester.decorators import post_processor


@post_processor("uk", "xoai")
def post_processor_1(data):
return data
6 changes: 6 additions & 0 deletions example/pre_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from oarepo_oai_pmh_harvester.decorators import pre_processor


@pre_processor("uk", "xoai")
def pre_processor_1(data):
return data
14 changes: 14 additions & 0 deletions oarepo_oai_pmh_harvester/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,17 @@ def wrapper(func):
current_oai_client.add_endpoint_handler(func, provider, parser)

return wrapper


def pre_processor(provider, parser):
def wrapper(func):
current_oai_client.add_pre_processor(func, provider, parser)

return wrapper


def post_processor(provider, parser):
def wrapper(func):
current_oai_client.add_post_processor(func, provider, parser)

return wrapper
57 changes: 47 additions & 10 deletions oarepo_oai_pmh_harvester/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def __call__(cls, *args, **kwargs):
class OArepoOAIClientState(metaclass=Singleton):
def __init__(self, app, _rules: defaultdict = None, _parsers: defaultdict = None,
_providers: dict = None, _synchronizers=None, transformer_class=OAITransformer,
_endpoints=None, endpoint_handlers: dict = None):
_endpoints=None, endpoint_handlers: dict = None, _pre_processors: dict = None,
_post_processors: dict = None):
self.app = app
self._rules = _rules
self._parsers = _parsers
Expand All @@ -34,6 +35,8 @@ def __init__(self, app, _rules: defaultdict = None, _parsers: defaultdict = None
self._synchronizers = _synchronizers
self.transformer_class = transformer_class
self._endpoints = _endpoints
self._pre_processors = _pre_processors
self._post_processors = _post_processors

@property
def providers(self):
Expand All @@ -53,17 +56,29 @@ def parsers(self):
self._load_parsers()
return self._parsers

@property
def endpoints(self):
if not self._endpoints:
self.load_endpoints()
return self._endpoints

@property
def endpoint_handlers(self):
if self._endpoint_handlers is None:
self._load_endpoint_handlers()
return self._endpoint_handlers

@property
def endpoints(self):
if not self._endpoints:
self.load_endpoints()
return self._endpoints
def pre_processors(self):
if self._pre_processors is None:
self._load_pre_processors()
return self._pre_processors

@property
def post_processors(self):
if self._post_processors is None:
self._load_post_processors()
return self._post_processors

def load_endpoints(self):
res = {}
Expand All @@ -86,6 +101,14 @@ def _load_endpoint_handlers(self):
for ep in iter_entry_points('oarepo_oai_pmh_harvester.mappings'):
ep.load()

def _load_pre_processors(self):
for ep in iter_entry_points('oarepo_oai_pmh_harvester.pre_processors'):
ep.load()

def _load_post_processors(self):
for ep in iter_entry_points('oarepo_oai_pmh_harvester.post_processors'):
ep.load()

def create_providers(self):
providers = self.app.config.get("OAREPO_OAI_PROVIDERS")
if providers:
Expand All @@ -107,15 +130,25 @@ def add_rule(self, func, provider, parser_name, path, phase):
self._rules = infinite_dd()
self._rules[provider][parser_name][path][phase] = func

def add_parser(self, func, name):
if not self._parsers:
self._parsers = infinite_dd()
self._parsers[name] = func

def add_endpoint_handler(self, func, provider, parser_name):
if not self._endpoint_handlers:
self._endpoint_handlers = infinite_dd()
self._endpoint_handlers[provider][parser_name] = func

def add_parser(self, func, name):
if not self._parsers:
self._parsers = infinite_dd()
self._parsers[name] = func
def add_pre_processor(self, func, provider, parser_name):
if not self._pre_processors:
self._pre_processors = defaultdict(lambda: defaultdict(list))
self._pre_processors[provider][parser_name].append(func)

def add_post_processor(self, func, provider, parser_name):
if not self._post_processors:
self._post_processors = defaultdict(lambda: defaultdict(list))
self._post_processors[provider][parser_name].append(func)

def create_synchronizer(self, provider_code, config):
return OAISynchronizer(
Expand All @@ -133,7 +166,11 @@ def create_synchronizer(self, provider_code, config):
endpoint_mapping=config.get("endpoint_mapping", {}),
from_=config.get("from"),
endpoint_handler=self.endpoint_handlers,
bulk=config.get("bulk", True)
bulk=config.get("bulk", True),
pre_processors=self.pre_processors[provider_code][
config["metadata_prefix"]] if self.pre_processors else None,
post_processors=self.post_processors[provider_code][
config["metadata_prefix"]] if self.post_processors else None
)

def run(self, providers_codes: List[str] = None, synchronizers_codes: List[str] = None,
Expand Down
12 changes: 11 additions & 1 deletion oarepo_oai_pmh_harvester/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def __init__(
pid_field=None,
from_: str = None,
endpoint_handler: dict = None,
bulk: bool = True
bulk: bool = True,
pre_processors: dict = None,
post_processors: dict = None
):

# Counters
Expand Down Expand Up @@ -78,6 +80,8 @@ def __init__(
self.from_ = from_
self.endpoint_handler = endpoint_handler
self.bulk = bulk
self.pre_processors = pre_processors
self.post_processors = post_processors

@property
def from_(self):
Expand Down Expand Up @@ -288,7 +292,13 @@ def create_or_update(self, oai_identifier, datestamp: str, oai_rec=None, xml: _E
if not xml:
xml = self.get_xml(oai_identifier)
parsed = self.parse(xml)
if self.pre_processors:
for processor in self.pre_processors:
parsed = processor(parsed)
transformed = self.transform(parsed)
if self.post_processors:
for processor in self.post_processors:
transformed = processor(transformed)
transformed.update(self.constant_fields)

if oai_rec is None:
Expand Down
2 changes: 1 addition & 1 deletion oarepo_oai_pmh_harvester/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@

from __future__ import absolute_import, print_function

__version__ = '2.0.0a14'
__version__ = '2.0.0a15'
8 changes: 7 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,16 @@ def load_entry_points():
entry_point = pkg_resources.EntryPoint.parse('xoai = example.parser', dist=distribution)
entry_point2 = pkg_resources.EntryPoint.parse('rule = example.rules.uk.rule', dist=distribution)
entry_point3 = pkg_resources.EntryPoint.parse('handler = example.mapping', dist=distribution)
entry_point4 = pkg_resources.EntryPoint.parse('pre_processor = example.pre_processors',
dist=distribution)
entry_point5 = pkg_resources.EntryPoint.parse('post_processor = example.post_processors',
dist=distribution)
distribution._ep_map = {
'oarepo_oai_pmh_harvester.parsers': {'xoai': entry_point},
'oarepo_oai_pmh_harvester.rules': {'rule': entry_point2},
'oarepo_oai_pmh_harvester.mappings': {'handler': entry_point3}
'oarepo_oai_pmh_harvester.mappings': {'handler': entry_point3},
'oarepo_oai_pmh_harvester.pre_processors': {'pre_processor': entry_point4},
'oarepo_oai_pmh_harvester.post_processors': {'post_processor': entry_point5},
}
pkg_resources.working_set.add(distribution)

Expand Down

0 comments on commit e433214

Please sign in to comment.