Skip to content

Commit

Permalink
Related to #250 and #281, refactoring. Updated comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolay-r committed May 27, 2022
1 parent 1222f7d commit 181d33a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 35 deletions.
13 changes: 13 additions & 0 deletions arekit/common/data/input/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,28 @@ def text_opinions_iter_pipeline(parse_news_func, iter_doc_opins,
""" Opinion collection generation pipeline.
NOTE: Here we do not perform IDs assignation!
"""
# TODO. #250, separate. Part 1. parameters.
assert(callable(parse_news_func))
assert(callable(iter_doc_opins))

# TODO. Part separate. Part 2. parameters.
assert(callable(value_to_group_id_func))
assert(isinstance(terms_per_context, int))

return BasePipeline([

### TODO. #250. Separate
### TODO. #250. Separate PART 1. (Related to ids -> (parsed_news, opinions)

# (id) -> (id, opinions)
MapPipelineItem(map_func=lambda doc_id: (doc_id, list(iter_doc_opins(doc_id)))),

# (id, opinions) -> (parsed_news, opinions).
MapPipelineItem(map_func=lambda data: (parse_news_func(data[0]), data[1])),

### TODO. Separate
### TODO. Separate PART 2. (parsed_news, opinions) -> linkages[]

# (parsed_news, opinions) -> (opins_provider, entities_provider, opinions).
MapPipelineItem(map_func=lambda data: (
ParsedNewsService(
Expand All @@ -75,6 +85,9 @@ def text_opinions_iter_pipeline(parse_news_func, iter_doc_opins,
text_opinion=text_opinion,
window_size=terms_per_context))),

### TODO. #250. Separate
### TODO. #250. Separate Part 3. Flatten. linkage[] -> linkages.

# linkages[] -> linkages.
FlattenIterPipelineItem()
])
16 changes: 3 additions & 13 deletions arekit/common/data/input/providers/opinions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from arekit.common.data.input.pipeline import text_opinions_iter_pipeline
from arekit.common.linkage.text_opinions import TextOpinionsLinkage
from arekit.common.pipeline.base import BasePipeline
from arekit.common.text_opinions.base import TextOpinion
Expand All @@ -7,24 +6,15 @@
class InputTextOpinionProvider(object):

def __init__(self, pipeline):
""" NOTE: it is important that the output of the pipeline
results in a TextOpinionLinkage instances.
"""
assert(isinstance(pipeline, BasePipeline))
self.__pipeline = pipeline
self.__current_id = None

# endregion

@classmethod
def create(cls, iter_doc_opins, value_to_group_id_func,
parse_news_func, terms_per_context):

pipeline = text_opinions_iter_pipeline(
parse_news_func=parse_news_func,
value_to_group_id_func=value_to_group_id_func,
iter_doc_opins=iter_doc_opins,
terms_per_context=terms_per_context)

return cls(pipeline)

def __assign_ids(self, linkage):
""" Perform IDs assignation.
"""
Expand Down
9 changes: 9 additions & 0 deletions arekit/common/experiment/api/ops_opin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def LabelsFormatter(self):

# region extraction

# TODO. #248 remove doc_ops (see details in the issue description).
def iter_annot_collections(self, exp_ctx, doc_ops, data_type):

collections_it = exp_ctx.Annotator.iter_annotated_collections(
data_type=data_type, doc_ops=doc_ops, opin_ops=self)

for doc_id, collection in collections_it:
yield doc_id, collection

def iter_opinions_for_extraction(self, doc_id, data_type):
""" providing opinions for further context-level opinion extraction process.
in terms of sentiment attitude extraction, this is a general method
Expand Down
20 changes: 14 additions & 6 deletions arekit/contrib/bert/handlers/serializer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from arekit.common.data.input.pipeline import text_opinions_iter_pipeline
from arekit.common.data.input.providers.columns.opinion import OpinionColumnsProvider
from arekit.common.data.input.providers.columns.sample import SampleColumnsProvider
from arekit.common.data.input.providers.opinions import InputTextOpinionProvider
Expand Down Expand Up @@ -60,14 +61,18 @@ def __handle_iteration(self, data_type):
rows_provider=sample_rows_provider,
storage=BaseRowsStorage())

# Create opinion provider
opinion_provider = InputTextOpinionProvider.create(
value_to_group_id_func=self.__value_to_group_id_func,
# TODO. #250. Expand this pipeline with the annotation (in advance).
# TODO. Check out the same comment at NetworkInputHelper.
pipeline = text_opinions_iter_pipeline(
parse_news_func=lambda doc_id: self.__doc_ops.parse_doc(doc_id),
value_to_group_id_func=self.__value_to_group_id_func,
iter_doc_opins=lambda doc_id: self.__opin_ops.iter_opinions_for_extraction(
doc_id=doc_id, data_type=data_type),
terms_per_context=self.__exp_ctx.TermsPerContext)

# Create opinion provider
opinion_provider = InputTextOpinionProvider(pipeline)

# Populate repositories
opinions_repo.populate(opinion_provider=opinion_provider,
doc_ids=list(self.__doc_ops.iter_doc_ids(data_type)),
Expand Down Expand Up @@ -102,8 +107,11 @@ def on_iteration(self, iter_index):
def on_before_iteration(self):
for data_type in self.__exp_ctx.DataFolding.iter_supported_data_types():

collections_it = self.__exp_ctx.Annotator.iter_annotated_collections(
data_type=data_type, opin_ops=self.__opin_ops, doc_ops=self.__doc_ops)
# TODO. #250. A part of the further pipeline.
# TODO. This might be included in InputTextOpinionProvider, as an initial operation
# TODO. In a whole pipeline. This code duplicates the one in NetworkInputHelper.
collections_it = self.__opin_ops.iter_annot_collections(
exp_ctx=self.__exp_ctx, doc_ops=self.__doc_ops, data_type=data_type)

for doc_id, collection in collections_it:

Expand All @@ -116,4 +124,4 @@ def on_before_iteration(self):
target=target,
labels_formatter=self.__annot_label_formatter)

# endregion
# endregion
34 changes: 18 additions & 16 deletions arekit/contrib/networks/core/input/helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import logging

from arekit.common.data.input.pipeline import text_opinions_iter_pipeline
from arekit.common.data.input.providers.columns.opinion import OpinionColumnsProvider
from arekit.common.data.input.providers.columns.sample import SampleColumnsProvider
from arekit.common.data.input.providers.opinions import InputTextOpinionProvider
Expand Down Expand Up @@ -92,15 +93,12 @@ def __perform_writing(exp_ctx, exp_io, doc_ops, data_type, opinion_provider,
# TODO. This is a particular implementation, which is considered to be
# TODO. Implemented at iter_opins_for_extraction (OpinionOperations)
@staticmethod
def __perform_annotation(exp_ctx, exp_io, doc_ops, opin_ops, data_type):
collections_it = exp_ctx.Annotator.iter_annotated_collections(
data_type=data_type, doc_ops=doc_ops, opin_ops=opin_ops)

def __save_annotation(exp_io, labels_fmt, data_type, collections_it):
for doc_id, collection in collections_it:
target = exp_io.create_opinion_collection_target(doc_id=doc_id, data_type=data_type)
exp_io.write_opinion_collection(collection=collection,
target=target,
labels_formatter=opin_ops.LabelsFormatter)
labels_formatter=labels_fmt)

# endregion

Expand All @@ -127,18 +125,22 @@ def prepare(exp_ctx, exp_io, doc_ops, opin_ops, terms_per_context, balance, valu
for data_type in exp_ctx.DataFolding.iter_supported_data_types():

# Perform annotation
NetworkInputHelper.__perform_annotation(exp_ctx=exp_ctx,
exp_io=exp_io,
doc_ops=doc_ops,
opin_ops=opin_ops,
data_type=data_type)

# Compose opinion provider
opinion_provider = InputTextOpinionProvider.create(
value_to_group_id_func=value_to_group_id_func,
# TODO. #250. This should be transformed into pipeline element.
# TODO. And then combined (embedded) into pipeline below.
NetworkInputHelper.__save_annotation(
exp_io=exp_io,
labels_fmt=opin_ops.LabelsFormatter,
data_type=data_type,
collections_it=opin_ops.iter_annot_collections())

# TODO. #250. Organize a complete pipeline.
# TODO. Now InputTextOpinionProvider has only a part from annotated opinions
# TODO. We need to extend our pipeline with the related pre-processing (annotation).
# TODO. See text_opinions_iter_pipeline method.
pipeline = text_opinions_iter_pipeline(
parse_news_func=lambda doc_id: doc_ops.parse_doc(doc_id),
value_to_group_id_func=value_to_group_id_func,
iter_doc_opins=lambda doc_id:
# TODO. Perform annotation here.
opin_ops.iter_opinions_for_extraction(doc_id=doc_id, data_type=data_type),
terms_per_context=terms_per_context)

Expand All @@ -147,7 +149,7 @@ def prepare(exp_ctx, exp_io, doc_ops, opin_ops, terms_per_context, balance, valu
exp_io=exp_io,
doc_ops=doc_ops,
data_type=data_type,
opinion_provider=opinion_provider,
opinion_provider=InputTextOpinionProvider(pipeline),
terms_per_context=terms_per_context,
balance=balance,
text_provider=text_provider)
Expand Down

0 comments on commit 181d33a

Please sign in to comment.