In [1]:
# Set some environment variables for demo purposes
%env SKEMA_HOST=http://127.0.0.1:9005
%env MIT_HOST=http://localhost:8000
%env OPENAI_KEY=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


env: SKEMA_HOST=http://127.0.0.1:9005
env: MIT_HOST=http://localhost:8000
env: OPENAI_KEY=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


# Text Reading Integration Notebook

## SKEMA Annotation Client

In [2]:
# Client code for SKEMA TR
from typing import Any, Dict, Union
import requests, os

def annotate_text_with_skema(text:Union[str, list[str]]) -> list[Dict[str, Any]]:
	endpoint = f"{os.environ['SKEMA_HOST']}/textFileToMentions"
	if isinstance(text, str):
		payload = [text] # If the text to annotate is a single string representing the contents of a document, make it a list with a single element
	else:
		payload = text # if the text to annotate is already a list of documents to annotate, it is the payload itself
	response = requests.post(endpoint, json=payload, timeout=120)
	if response.status_code == 200:
		return response.json()
	else:
		raise RuntimeError(f"Calling {endpoint} failed with HTTP code {response.status_code}")

## MIT Annotation Client

In [3]:
# Client code for MIT TR

def annotate_text_with_mit(texts:Union[str, list[str]]) -> list[Dict[str, Any]]:
	endpoint = f"{os.environ['MIT_HOST']}/annotation/find_text_vars/"
	if isinstance(texts, str):
		texts = [texts] # If the text to annotate is a single string representing the contents of a document, make it a list with a single element
	
	# TODO paralelize this
	return_values = list()
	for ix, text in enumerate(texts):
		params = {
			"gpt_key": os.environ['OPENAI_KEY'],
			"text": text
		}
		response = requests.post(endpoint, params=params)
		if response.status_code == 200:
			return_values.append(response.json())
		else:
			raise RuntimeError(f"Calling {endpoint} on the {ix}th input failed with HTTP code {response.status_code}")
	return return_values
    

## Normalization of Extractions

In [29]:
import json
from pathlib import Path
import tempfile
from typing import Optional, Dict, Any
from askem_extractions.importers import import_arizona, import_mit
from askem_extractions.importers.mit import merge_collections
from askem_extractions.data_model import AttributeCollection
import itertools as it

def normalize_extractions(
        arizona_extractions:Optional[Dict[str, Any]],
        mit_extractions:Optional[Dict]
    ) -> AttributeCollection:
    collections = list()
    with tempfile.TemporaryDirectory() as tmpdirname:
        
        skema_path = os.path.join(tmpdirname, "skema.json")
        mit_path = os.path.join(tmpdirname, "mit.json")

        if arizona_extractions:
            try:
                with open(skema_path, "w") as f:
                    json.dump(arizona_extractions, f)
                canonical_arizona = import_arizona(Path(skema_path))
                collections.append(canonical_arizona)
            except Exception as ex:
                print(ex)
        if mit_extractions:
            try:
                with open(mit_path, "w") as f:
                    json.dump(mit_extractions, f)
                canonical_mit = import_mit(Path(mit_path))
                collections.append(canonical_mit)
            except Exception as ex:
                print(ex)

        if arizona_extractions and mit_extractions:
            # Merge both with some de de-duplications
            params = {
                "gpt_key": os.environ['OPENAI_KEY']
            }

            data = {
                "mit_file": open(mit_path).read(),
                "arizona_file": open(skema_path).read()
            }
            response = requests.post(f"{os.environ['MIT_HOST']}/integration/get_mapping", params=params, data=data)

            if response.status_code == 200:
                map_data = response.text()
                map_path = os.path.join(tmpdirname, "mapping.txt")
                with open(map_path, 'w')as f:
                    f.write(map_data)
                merged_collection = \
                    merge_collections(
                        a_collection=collections[0],
                        m_collection=collections[1],
                        map_path=Path(map_path)
                    )
                
                # Return the merged collection here
                return merged_collection
            

    # Merge the colletions into a attribute collection
    attributes = list(it.chain.from_iterable(c.attributes for c in collections))

    return AttributeCollection(attributes=attributes)


## Integrated Workflow

In [31]:
def integrated_text_extractions(texts:Union[str, list[str]]) -> AttributeCollection:
    skema_extractions = annotate_text_with_skema(texts)
    mit_extractions = annotate_text_with_mit(texts)

    results = list()
    for skema, mit in zip(skema_extractions, mit_extractions):
        normalized = normalize_extractions(arizona_extractions=skema, mit_extractions=mit)
        results.append(normalized)
    return results

## Example

In [33]:
# Read a paper from the example directory
test_text = open("../inputs/1-s2.0-S2211379721005490-main.txt").read()

# Run the integrated pipeline
extractions = integrated_text_extractions(test_text)

# Print the result
for attribute in extractions[0].attributes:
    print(attribute)

type=<AttributeType.anchored_extraction: 'anchored_extraction'> amr_element_id=None payload=AnchoredExtraction(id=ID(id='E:-1690295439'), names=[Name(id=ID(id='T:447086538'), name='E', extraction_source=TextExtraction(page=0, block=0, char_start=17814, char_end=17815, document_reference=ID(id='N/A')), provenance=Provenance(method='Skema TR Pipeline rules', timestamp=datetime.datetime(2023, 6, 26, 23, 48, 37, 617833)))], descriptions=[Description(id=ID(id='T:-1178270728'), source='Matouk AE', grounding=[], extraction_source=TextExtraction(page=0, block=0, char_start=17817, char_end=17826, document_reference=ID(id='N/A')), provenance=Provenance(method='Skema TR Pipeline rules', timestamp=datetime.datetime(2023, 6, 26, 23, 48, 37, 617833)))], value_specs=[], groundings=[Grounding(grounding_text='LolC/E', grounding_id='vo:0010921', source=[], score=1.0000001192092896, provenance=Provenance(method='SKEMA-TR-Embedding', timestamp=datetime.datetime(2023, 6, 26, 23, 48, 37, 617943)))], data_co

# TODOS

- Parameterize the endpoint URLs
- Fetch the OPEN_AI key from the environment
- Make the annotations asynchronous and add flags to control which to run
- Make equivalent functions that read COSMOS json/binaries instead of text