Skip to content

Commit

Permalink
#539 done. Now pipelines way more flexible. Source could be customized.
Browse files Browse the repository at this point in the history
#535 related
  • Loading branch information
nicolay-r committed Dec 28, 2023
1 parent 958084c commit 2460001
Show file tree
Hide file tree
Showing 31 changed files with 212 additions and 146 deletions.
59 changes: 48 additions & 11 deletions arekit/common/docs/parser.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,61 @@
from arekit.common.docs.base import Document
from arekit.common.docs.parsed.base import ParsedDocument
from arekit.common.pipeline.base import BasePipeline
from arekit.common.pipeline.batching import BatchingPipeline
from arekit.common.pipeline.context import PipelineContext
from arekit.common.text.parser import BaseTextParser
from arekit.common.pipeline.utils import BatchIterator
from arekit.common.text.parsed import BaseParsedText


class DocumentParser(object):
class DocumentParsers(object):

@staticmethod
def __get_sent(doc, sent_ind):
return doc.get_sentence(sent_ind)
def parse(doc, pipeline_items, parent_ppl_ctx=None):
""" This document parser is based on single text parts (sentences)
that passes sequentially through the pipeline of transformations.
"""
assert(isinstance(doc, Document))
assert(isinstance(pipeline_items, list))
assert(isinstance(parent_ppl_ctx, PipelineContext) or parent_ppl_ctx is None)

pipeline = BasePipeline(pipeline_items)

parsed_sentences = []
for sent_ind in range(doc.SentencesCount):

# Composing the context from a single sentence.
ctx = PipelineContext({"input": doc.get_sentence(sent_ind)}, parent_ctx=parent_ppl_ctx)

# Apply all the operations.
pipeline.run(ctx)

# Collecting the result.
parsed_sentences.append(BaseParsedText(terms=ctx.provide("result")))

return ParsedDocument(doc_id=doc.ID, parsed_sentences=parsed_sentences)

@staticmethod
def parse(doc, text_parser, parent_ppl_ctx=None):
def parse_batch(doc, pipeline_items, batch_size, parent_ppl_ctx=None):
""" This document parser is based on batch of sentences.
"""
assert(isinstance(batch_size, int) and batch_size > 0)
assert(isinstance(doc, Document))
assert(isinstance(text_parser, BaseTextParser))
assert(isinstance(pipeline_items, list))
assert(isinstance(parent_ppl_ctx, PipelineContext) or parent_ppl_ctx is None)

parsed_sentences = [text_parser.run(params_dict={"input": DocumentParser.__get_sent(doc, sent_ind)},
parent_ctx=parent_ppl_ctx)
for sent_ind in range(doc.SentencesCount)]
pipeline = BatchingPipeline(pipeline_items)

parsed_sentences = []
for batch in BatchIterator(lst=list(range(doc.SentencesCount)), batch_size=batch_size):

# Composing the context from a single sentence.
ctx = PipelineContext({"input": [doc.get_sentence(s_ind) for s_ind in batch]},
parent_ctx=parent_ppl_ctx)

# Apply all the operations.
pipeline.run(ctx)

# Collecting the result.
parsed_sentences += [BaseParsedText(terms=result) for result in ctx.provide("result")]

return ParsedDocument(doc_id=doc.ID,
parsed_sentences=parsed_sentences)
return ParsedDocument(doc_id=doc.ID, parsed_sentences=parsed_sentences)
6 changes: 3 additions & 3 deletions arekit/common/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ class BasePipeline(object):

def __init__(self, pipeline):
assert(isinstance(pipeline, list))
self.__pipeline = pipeline
self._pipeline = pipeline

def run(self, pipeline_ctx):
assert(isinstance(pipeline_ctx, PipelineContext))

for item in filter(lambda itm: itm is not None, self.__pipeline):
for item in filter(lambda itm: itm is not None, self._pipeline):
assert(isinstance(item, BasePipelineItem))
item_result = item.apply(input_data=item.get_source(pipeline_ctx), pipeline_ctx=pipeline_ctx)
pipeline_ctx.update(param=item.ResultKey, value=item_result, is_new_key=False)
Expand All @@ -20,4 +20,4 @@ def run(self, pipeline_ctx):

def append(self, item):
assert(isinstance(item, BasePipelineItem))
self.__pipeline.append(item)
self._pipeline.append(item)
32 changes: 32 additions & 0 deletions arekit/common/pipeline/batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from arekit.common.pipeline.base import BasePipeline
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem


class BatchingPipeline(BasePipeline):

def run(self, pipeline_ctx):
assert(isinstance(pipeline_ctx, PipelineContext))

for item in filter(lambda itm: itm is not None, self._pipeline):
assert (isinstance(item, BasePipelineItem))

# Handle the content of the batch or batch itself.
if item.SupportBatching:
handled_batch = item.get_source(pipeline_ctx)
else:
content = item.get_source(pipeline_ctx, call_func=False)
handled_batch = [item._src_func(i) if item._src_func is not None else i for i in content]

# At present, each batch represent a list of contents.
assert(isinstance(handled_batch, list))

batch_result = []
input_data_iter = [handled_batch] if item.SupportBatching else handled_batch
for input_data in input_data_iter:
item_result = item.apply(input_data=input_data, pipeline_ctx=pipeline_ctx)
batch_result.append(item_result)

pipeline_ctx.update(param=item.ResultKey, value=batch_result, is_new_key=False)

return pipeline_ctx
14 changes: 10 additions & 4 deletions arekit/common/pipeline/items/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ def __init__(self, src_key="result", result_key="result", src_func=None):
assert(isinstance(src_key, str) or src_key is None)
assert(callable(src_func) or src_func is None)
self.__src_key = src_key
self.__src_func = src_func
self._src_func = src_func
self.__result_key = result_key

@property
def ResultKey(self):
return self.__result_key

def get_source(self, src_ctx):
@property
def SupportBatching(self):
""" By default pipeline item is not designed for batching.
"""
return False

def get_source(self, src_ctx, call_func=True):
""" Extract input element for processing.
"""
assert(isinstance(src_ctx, PipelineContext))
Expand All @@ -27,8 +33,8 @@ def get_source(self, src_ctx):

# Extracting actual source.
src_data = src_ctx.provide(self.__src_key)
if self.__src_func is not None:
src_data = self.__src_func(src_data)
if self._src_func is not None and call_func:
src_data = self._src_func(src_data)

return src_data

Expand Down
19 changes: 19 additions & 0 deletions arekit/common/pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class BatchIterator:

def __init__(self, lst, batch_size):
assert(isinstance(lst, list))
assert(isinstance(batch_size, int) and batch_size > 0)
self.__lst = lst
self.__index = 0
self.__batch_size = batch_size

def __iter__(self):
return self

def __next__(self):
if self.__index < len(self.__lst):
batch = self.__lst[self.__index:self.__index + self.__batch_size]
self.__index += 2
return batch
else:
raise StopIteration
11 changes: 0 additions & 11 deletions arekit/common/text/parser.py

This file was deleted.

22 changes: 11 additions & 11 deletions arekit/contrib/source/ruattitudes/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def iter_docs(input_file, get_doc_index_func):
assert(title_terms_count == t_len or title_terms_count is None)
reset = True

if RuAttitudesFormatReader.STEXT_KEY in line and line.index(RuAttitudesFormatReader.STEXT_KEY) == 0:
if RuAttitudesFormatReader.STEXT_KEY in line and line.__index(RuAttitudesFormatReader.STEXT_KEY) == 0:
sentence = RuAttitudesSentence(is_title=False,
text=RuAttitudesFormatReader.__parse_sentence(line, False),
sentence_opins=opinions_list,
Expand Down Expand Up @@ -173,19 +173,19 @@ def __parse_sentence(line, is_title):
def __parse_sentence_opin(line):
line = line[len(RuAttitudesFormatReader.OPINION_KEY):]

s_from = line.index('b:(')
s_to = line.index(')', s_from)
s_from = line.__index('b:(')
s_to = line.__index(')', s_from)
label = int(line[s_from + 3:s_to])

o_from = line.index('oi:[')
o_to = line.index(']', o_from)
o_from = line.__index('oi:[')
o_to = line.__index(']', o_from)
source_object_id_in_sentence, target_object_id_in_sentence = line[o_from + 4:o_to].split(',')

source_object_id_in_sentence = int(source_object_id_in_sentence)
target_object_id_in_sentence = int(target_object_id_in_sentence)

s_from = line.index('si:{')
s_to = line.index('}', s_from)
s_from = line.__index('si:{')
s_to = line.__index('}', s_from)
opninion_key = line[s_from+4:s_to]

sentence_opin = SentenceOpinion(source_id=source_object_id_in_sentence,
Expand Down Expand Up @@ -254,15 +254,15 @@ def __try_get_type(line):
template = 'type:'
if template in line:
is_auth = RuAttitudesFormatReader.AUTH_LABEL in line
t_from = line.index(template)
t_to = line.index(RuAttitudesFormatReader.AUTH_LABEL[0], t_from) if is_auth else len(line)
t_from = line.__index(template)
t_to = line.__index(RuAttitudesFormatReader.AUTH_LABEL[0], t_from) if is_auth else len(line)
return line[t_from + len(template):t_to].strip()

# Tag, utilized in RuAttitudes-1.* format.
template = 't:['
if template in line:
t_from = line.index(template)
t_to = line.index(']', t_from)
t_from = line.__index(template)
t_to = line.__index(']', t_from)
return line[t_from + len(template):t_to].strip()

# endregion
2 changes: 1 addition & 1 deletion arekit/contrib/utils/lexicons/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def load(cls, filepath, separator=','):

@staticmethod
def __check(df):
for index in df.index:
for index in df.__index:
relation = df.loc[index][0]
assert(len(relation.split('<->')) == 2)

Expand Down
1 change: 0 additions & 1 deletion arekit/contrib/utils/pipelines/items/sampling/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from arekit.common.experiment.api.base_samples_io import BaseSamplesIO
from arekit.common.experiment.data_type import DataType
from arekit.common.pipeline.base import BasePipeline
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem
from arekit.contrib.utils.serializer import InputDataSerializationHelper

Expand Down
1 change: 0 additions & 1 deletion arekit/contrib/utils/pipelines/items/text/tokenizer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

from arekit.common.context.token import Token
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem
from arekit.common.utils import split_by_whitespaces
from arekit.contrib.utils.processing.text.tokens import Tokens
Expand Down
2 changes: 1 addition & 1 deletion arekit/contrib/utils/pipelines/items/text/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __optionally_register(prts):
for entity_index, translated_value in enumerate(translated_parts[1:]):
entity_placeholder_instance = entity_placeholder_template.format(entity_index)
# Cropping text part.
to_ind = text.index(entity_placeholder_instance)
to_ind = text.__index(entity_placeholder_instance)

if self.__do_translate_entity:
origin_entities[entity_index].set_display_value(translated_value.strip())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ def create_text_relation_extraction_pipeline(nerel_version,
predefined_annot = PredefinedTextOpinionAnnotator(doc_ops, label_formatter)

pipelines = {
DataType.Train: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Train: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
text_opinion_filters=text_opinion_filters),
DataType.Test: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Test: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
text_opinion_filters=text_opinion_filters),
DataType.Dev: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Dev: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ def create_text_relation_extraction_pipeline(nerel_bio_version,
predefined_annot = PredefinedTextOpinionAnnotator(doc_ops, label_formatter)

pipelines = {
DataType.Train: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Train: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
text_opinion_filters=text_opinion_filters),
DataType.Test: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Test: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
text_opinion_filters=text_opinion_filters),
DataType.Dev: text_opinion_extraction_pipeline(text_parser=text_parser,
DataType.Dev: text_opinion_extraction_pipeline(pipeline_items=text_parser,
get_doc_by_id_func=doc_ops.by_id,
annotators=[predefined_annot],
entity_index_func=lambda brat_entity: brat_entity.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ def create_text_opinion_extraction_pipeline(text_parser,
text_opinion_filters=custom_text_opinion_filters,
get_doc_by_id_func=doc_provider.by_id,
entity_index_func=lambda brat_entity: brat_entity.ID,
text_parser=text_parser)
pipeline_items=text_parser)

return pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def create_text_opinion_extraction_pipeline(rusentrel_version,
text_opinion_filters=text_opinion_filters,
get_doc_by_id_func=doc_provider.by_id,
entity_index_func=lambda brat_entity: brat_entity.ID,
text_parser=text_parser)
pipeline_items=text_parser)

return pipeline

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def create_main_pipeline(text_parser, doc_provider, annotators, text_opinion_fil
"""
return text_opinion_extraction_pipeline(
get_doc_by_id_func=doc_provider.by_id,
text_parser=text_parser,
pipeline_items=text_parser,
annotators=annotators,
entity_index_func=lambda brat_entity: brat_entity.ID,
text_opinion_filters=text_opinion_filters)
Expand Down
10 changes: 4 additions & 6 deletions arekit/contrib/utils/pipelines/text_opinion/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
from arekit.common.docs.parsed.base import ParsedDocument
from arekit.common.docs.parsed.providers.entity_service import EntityServiceProvider
from arekit.common.docs.parsed.service import ParsedDocumentService
from arekit.common.docs.parser import DocumentParser
from arekit.common.docs.parser import DocumentParsers
from arekit.common.pipeline.base import BasePipeline
from arekit.common.pipeline.items.flatten import FlattenIterPipelineItem
from arekit.common.pipeline.items.map import MapPipelineItem
from arekit.common.pipeline.items.map_nested import MapNestedPipelineItem
from arekit.common.text.parser import BaseTextParser
from arekit.common.text_opinions.base import TextOpinion
from arekit.contrib.utils.pipelines.text_opinion.filters.base import TextOpinionFilter
from arekit.contrib.utils.pipelines.text_opinion.filters.limitation import FrameworkLimitationsTextOpinionFilter
Expand Down Expand Up @@ -64,9 +63,8 @@ def __to_id(text_opinion):
yield MetaEmptyLinkedDataWrapper(doc_id=parsed_doc.RelatedDocID)


def text_opinion_extraction_pipeline(text_parser, get_doc_by_id_func, annotators, entity_index_func,
def text_opinion_extraction_pipeline(pipeline_items, get_doc_by_id_func, annotators, entity_index_func,
text_opinion_filters=None, use_meta_between_docs=True):
assert(isinstance(text_parser, BaseTextParser))
assert(callable(get_doc_by_id_func))
assert(isinstance(annotators, list))
assert(isinstance(text_opinion_filters, list) or text_opinion_filters is None)
Expand All @@ -80,8 +78,8 @@ def text_opinion_extraction_pipeline(text_parser, get_doc_by_id_func, annotators
MapPipelineItem(map_func=lambda doc_id: get_doc_by_id_func(doc_id)),

# (doc, ppl_ctx) -> (parsed_doc)
MapNestedPipelineItem(map_func=lambda doc, ppl_ctx: DocumentParser.parse(
doc=doc, text_parser=text_parser, parent_ppl_ctx=ppl_ctx)),
MapNestedPipelineItem(map_func=lambda doc, ppl_ctx: DocumentParsers.parse(
doc=doc, pipeline_items=pipeline_items, parent_ppl_ctx=ppl_ctx)),

# (parsed_doc) -> (text_opinions)
MapPipelineItem(map_func=lambda parsed_doc: __iter_text_opinion_linkages(
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ python -m pytest --html=pytest_report.html --self-contained-html --continue-on-c
## Publishing Release

Navigate to the root folder of this project and execute:
```python
```bash
python3 setup.py sdist bdist_wheel
twine check ./dist/*
twine upload ./dist/*
Expand Down
Loading

0 comments on commit 2460001

Please sign in to comment.