In [None]:
# %pip install -e ../

In [None]:
%pip show cag

In [None]:
from cag.framework.annotator.pipeline.pipeline_base import Pipeline
from cag.utils.config import Config
from tqdm import tqdm

In [None]:
# make sure arangodb is up and running - Enter your credentials below
my_config = Config(
    url="http://127.0.0.1:8529",
    user="root",
    password="root",
    database="_system",
    graph="MyCagGraph",
)

## Create your first CAG Annotation Pipeline

In [None]:
# Extends the CAG Pipeline and implements the two methds: process_input and init_and_run
class MyFirstPipeline(Pipeline):
    def __post_init__(self):
        print("In POST INIT")
        self.out_path = "myfirstpipeline_output.csv"
        self.exec_transformer_based = False

    # Code for preprocessing the input data before annotating. In this case here, we convert the nodes into a list of tuples.
    # Each tuple contains the text of the node and the _key (the unique id of the node)
    def process_input(self) -> list:
        processed = []
        for txt_node in tqdm(self.input):
            processed.append((txt_node.text, {"_key": txt_node._key}))

        return processed

    def init_and_run(self):
        if self.exec_transformer_based:
            self.add_annotation_pipe(
                name="sentencizer",
                save_output=False,
                is_spacy=True,
                is_native=True,
            )
            self.add_annotation_pipe(
                name="EmotionPipeOrchestrator", save_output=True, is_spacy=True
            )
            # self.add_annotation_pipe(
            #    name="HedgePipeOrchestrator", save_output=True, is_spacy=True
            # )
            self.add_annotation_pipe(
                name="ToxicityPipeOrchestrator",
                save_output=True,
                is_spacy=True,
            )
        else:
            self.add_annotation_pipe(
                name="tok2vec",
                save_output=False,
                is_spacy=True,
                is_native=True,
            )  # mandatory for NER
            self.add_annotation_pipe(
                name="NamedEntityPipeOrchestrator",
                save_output=True,
                is_spacy=True,
            )
            self.add_annotation_pipe(
                name="mpqa_parser",
                save_output=False,
                is_spacy=True,
                is_native=True,
            )
            self.add_annotation_pipe(
                name="MpqaPipeOrchestrator", save_output=True, is_spacy=True
            )
            self.add_annotation_pipe(
                name="EmpathPipeOrchestrator", save_output=True, is_spacy=True
            )

        self.init_pipe_stack()

## Initialize your first cag Pipeline

In [None]:
cag_pipeline = MyFirstPipeline(my_config)
cag_pipeline.spacy_n_processors = 1  # In case you are using spacy pipe, this flag can be set to enable multiprocessing,
# NOTE: If you are using spacy with transformer based feature, set the flag to 1 or else the pipeline will freeze (this is a spacy bug and not related to cag)

## Fetch the TextNode and Annotate & Save

In [None]:
#  Loop over your data, annotateA and save
cag_pipeline.exec_transformer_based = False
cag_pipeline.init_and_run()

coll = cag_pipeline.database_config.db["TextNode"]
docs = coll.fetchAll(limit=300)
fetched = len(docs)
while docs is not None and len(docs) > 0:
    ## annotate

    # Set the INPUT - this will automatically call preprocess_input (make sure to implement it)
    cag_pipeline.reset_input_output()
    cag_pipeline.set_input(docs)

    cag_pipeline.annotate()
    cag_pipeline.save()

    cag_pipeline.reset_input_output()
    print(f"Processed {fetched} docs")
    docs = coll.fetchAll(limit=100, skip=fetched)
    fetched = fetched + len(docs)

### Transformer based Features

In [None]:
cag_pipeline.exec_transformer_based = True
cag_pipeline.out_path = "transformer_based_features.csv"
cag_pipeline.init_and_run()
coll = cag_pipeline.database_config.db["TextNode"]
docs = coll.fetchAll(limit=100)
fetched = len(docs)
while docs is not None and len(docs) > 0:
    ## annotate

    # Set the INPUT - this will automatically call preprocess_input (make sure to implement it)
    cag_pipeline.reset_input_output()
    cag_pipeline.set_input(docs)

    cag_pipeline.annotate()
    cag_pipeline.save()

    print(f"Processed {fetched} docs")
    docs = coll.fetchAll(limit=100, skip=fetched)
    fetched = fetched + len(docs)