In [46]:
# SL
import json
import pandas as pd
import ipywidgets as widgets
from IPython.display import display
import asyncio
import aiohttp

class Annotation():
    def __init__(self, metadata:json) -> None:
        self.metadata = metadata
        self.id = metadata.get("id", "na")
        self.queue = metadata.get("queue", "na").split("/")[-1]
        self.schema = metadata.get("schema", "na").split("/")[-1]

    def set_content(self, annotation:list) -> None:
        self.content_data = annotation
        
    def find_by_schema_id(self, content, schema_id: str):
        """
        Return all datapoints matching a schema id.
        :param content: annotation content tree 
        :param schema_id: f
        :return: the list of datapoints matching the schema ID
        """
        accumulator = []
        for node in content:
            if node["schema_id"] == schema_id:
                accumulator.append(node)
            elif "children" in node:
                accumulator.extend(self.find_by_schema_id(node["children"], schema_id))

        return accumulator
    
    def get_id_data(self, schema_id):
        return self.find_by_schema_id(self.content_data, schema_id)
    
    def get_positions(self, schema_id):
        datapoints = self.get_id_data(schema_id)
        positions = []
        for datapoint in datapoints:
            content = datapoint['content']
            if content['position']:
                positions.append(content['position'])
            else:
                positions.append(None)
        return positions



class AsyncRequestClient():    
    
    BASE_URL = "https://elis.rossum.ai/api" 
    HEADERS = {
        "Content-Type": "application/json",
    }
    
    def __init__(self, token, domain=None):
        self.token = token
        self.base_url = domain or AsyncRequestClient.BASE_URL    
        self.request_cache = {}

    def reset_inputs(self, token, domain):
        self.token = token
        self.base_url = domain


    async def _make_request(self, method, endpoint, headers=None, data=None, json=None, files=None, cache_on=True, ready_url=None):
        url = ready_url or  f"{self.base_url}/v1{endpoint}"
        headers = headers or AsyncRequestClient.HEADERS
        headers["Authorization"] = f"Bearer {self.token}"
        
        if self.request_cache.get(url, False) and cache_on:
            if self.request_cache[url]["json"] == json:
                print(f"Cached {url}")
                return self.request_cache[url]["response"]
        
        async with aiohttp.ClientSession() as session:
            try:
                async with session.request(method, url, headers=headers, data=data, json=json,) as response:
                    print(url, "  Request")
                    if response.status == 200:
                        self.request_cache[url] = {
                            "response": await response.json(),
                            "json": json                       
                                                }
                        return await response.json()
                    else:
                        print(response)
                        raise aiohttp.ClientResponseError
            except aiohttp.ClientResponseError as e:
                print("ClientResponseError occurred:", e)


    async def _get_annotation_content(self, annotation_id):
        endpoint = f"/annotations/{annotation_id}/content"
        response = await self._make_request("GET", endpoint, cache_on=True)        
        return response

    async def _search(self, params=None, next_page=None):        
        if next_page:
            response = await self._make_request("POST", self.endpoint, json=params, cache_on=True, ready_url=next_page)
        else: 
            self.endpoint = f"/annotations/search"         
            response = await self._make_request("POST", self.endpoint, json=params, cache_on=True)
                
        pagination = response["pagination"]
        next_page = pagination.get("next", False)                
        
        return next_page, response["results"]
    
    async def search_with_query(self, query:json ,allPages:bool = False)->dict:                
        annotation_library = {}        
        next, response  = await self._search(params=query)     
        
        for result in response:
            annotation_library[result["id"]] = Annotation(result)
        
        if allPages:
            while next:         
                next,response = await self._search(params=query, next_page=next)            
                for result in response:
                    annotation_library[result["id"]] = Annotation(result)
                                    
        return annotation_library

def form_dataset(obj:Annotation, key:str, field_id:str)->pd.DataFrame:
    temp_list = []
    datapoints = obj.find_by_schema_id(obj.content_data, field_id)
    if datapoints:
        for datapoint in datapoints:            
            content_value = datapoint["content"]["value"]
            temp_list.append({"IDs":key, field_id:content_value})
        temp_df = pd.DataFrame(temp_list)
        temp_df.set_index("IDs", inplace=True)
        return temp_df
    return pd.DataFrame() 

def show_results(field_ids, annotations_collection, base_url)-> display:    
    output = pd.DataFrame()
    for key, obj in annotations_collection.items():
        if len(field_ids) > 1:
            temp_merged_df = pd.DataFrame([{"IDs":key, "Address":f"{base_url}/{key}"}])
            temp_merged_df.set_index("IDs", inplace=True)            
            for field_id in field_ids:
                temp_merged_df = temp_merged_df.merge(form_dataset(obj,key,field_id), how='outer',left_index=True, right_index=True)
            output = pd.concat([output,temp_merged_df])
        else:            
            output = pd.concat([output,form_dataset(obj,key, field_ids[0])])
                        
    def make_clickable(url):
        return f'<a href="{url}" target="_blank">link</a>'
    
    styled_output = output.style.format({'Address': make_clickable})    
    display(styled_output)
    


# Function to create input widgets for a given set number
def create_input_widgets():      
    token_input = widgets.Textarea(value="", description=f"TOKEN:")
    url_input = widgets.Textarea(value="", description=f"Custom Domain:")    
    field_ids = widgets.Textarea(value="document_id", description = "Field ID to check")
    query = widgets.Textarea(
                    value= '{\n    "query": {\n        "$and": [\n            {\n                "queue": {\n                    "$in": [\n                        "https://elis.rossum.ai/api/v1/queues/XXXXXX",\n                        "https://elis.rossum.ai/api/v1/queues/XXXXXX",\n                        "https://elis.rossum.ai/api/v1/queues/XXXXXX"\n                    ]\n                }\n            },\n            {\n                "field.document_id.string": {\n                    "$emptyOrMissing": false\n                }\n            },\n            {\n                "status": {\n                    "$in": [\n                        "confirmed",\n                        "exported"\n                    ]\n                }\n            }\n        ]\n    }\n}',
                    description='Filter Query',
                    layout={'width': '80%', 'height': '500px'}  # Set width to 80% of the available space
                        )
    bool_toggle = widgets.ToggleButtons(
                    options=[True, False],
                    description='Load all pages of annotations:',                    
                    tooltips=['True', 'False'],
                    value=False # Default value                
                )
    
    options_with_labels = {'prod-eu': 'https://elis.rossum.ai', 'prod-jp': 'https://shared-jp.app.rossum.ai', 
                           'prod-eu2': f'.rossum.app', 'prod-us': 'https://us.app.rossum.ai'}
    dropdown = widgets.Dropdown(
                options=options_with_labels,
                description='Environment:'
                )

    return token_input, url_input, query, field_ids, bool_toggle, dropdown


async def process_annotations(client, token_input, url_input, query, field_ids, bool_toggle, dropdown):
    #query_string = query.value.replace("\n", "")
    #field_ids = field_ids.value.split(',')    

    if dropdown.label == "prod-eu2":
        url = f'https://{url_input.value}{dropdown.value}'
        client.reset_inputs(token_input, f'{url}/api')
    else:
        url = f'{dropdown.value}'
        client.reset_inputs(token_input, f'{url}/api')

    annotations_collection = await client.search_with_query(query, allPages=bool_toggle.value)

    # Create a list of coroutines for fetching annotation content
    annotation_tasks = [client._get_annotation_content(key) for key in annotations_collection.keys()]

    # Execute all annotation content fetching tasks concurrently
    annotation_contents = await asyncio.gather(*annotation_tasks)

    # Update annotation objects with fetched content
    for key, annotation_content in zip(annotations_collection.keys(), annotation_contents):
        obj = annotations_collection[key]
        content = annotation_content
        content= content["content"]
        obj.set_content(content)

    # show_results(field_ids, annotations_collection, base_url=f'{url}/document')
    return annotations_collection


In [12]:
# #Initialize client
client = AsyncRequestClient('', '')

# Initialize the set_widgets list
token_input, url_input, query, field_id, bool_toggle, dropdown = create_input_widgets()
display(dropdown, url_input, bool_toggle)

Dropdown(description='Environment:', options={'prod-eu': 'https://elis.rossum.ai', 'prod-jp': 'https://shared-…

Textarea(value='', description='Custom Domain:')

ToggleButtons(description='Load all pages of annotations:', index=1, options=(True, False), tooltips=('True', …

In [48]:
token_input = "abf6c9c6bcbb003039a77ab681b4d1a008c85ec6"
query = {
    "query": {
        "$and": [
            # {
            #     "queue": {
            #         "$in": [
            #             "https://elis.rossum.ai/api/v1/queues/XXXXXX",
            #             "https://elis.rossum.ai/api/v1/queues/XXXXXX",
            #             "https://elis.rossum.ai/api/v1/queues/XXXXXX"
            #         ]
            #     }
            # },
            # {
            #     "field.document_id.string": {
            #         "$emptyOrMissing": False
            #     }
            # },
            {
                "status": {
                    "$in": [
                        "confirmed",
                        "exported",
                        "to_review"
                    ]
                }
            }
        ]
    }
}
field_id = ['item_description']
annotations_collection = await process_annotations(client, token_input, url_input, query, field_id, bool_toggle, dropdown)

Cached https://levchenko.rossum.app/api/v1/annotations/search
Cached https://levchenko.rossum.app/api/v1/annotations/search?search_after=eyJxdWVyeV9oYXNoIjogIjAwNjg5ZmFmMmViM2Y5Nzk0MjkwYmM1Y2Q3YTI2MDI2IiwgInNlYXJjaF9hZnRlciI6IFsyNzk1MzldLCAicmV2ZXJzZWQiOiBmYWxzZX0%3D%3AUJBTkbO8LIcjjuruckihRSmUtnAr_V6YDI0MfJ3kc3s
Cached https://levchenko.rossum.app/api/v1/annotations/search?search_after=eyJxdWVyeV9oYXNoIjogIjAwNjg5ZmFmMmViM2Y5Nzk0MjkwYmM1Y2Q3YTI2MDI2IiwgInNlYXJjaF9hZnRlciI6IFs2NDA0NDZdLCAicmV2ZXJzZWQiOiBmYWxzZX0%3D%3AJTrOqIsy83-WNYfL8A8LZm0NuW8Dyy1yRBTWgGSCwio
Cached https://levchenko.rossum.app/api/v1/annotations/search?search_after=eyJxdWVyeV9oYXNoIjogIjAwNjg5ZmFmMmViM2Y5Nzk0MjkwYmM1Y2Q3YTI2MDI2IiwgInNlYXJjaF9hZnRlciI6IFs2NDA0ODFdLCAicmV2ZXJzZWQiOiBmYWxzZX0%3D%3A4x61fSdhrWHzXvE2QeASYHiZx7xl8BfwfY_sLBTfIO4
Cached https://levchenko.rossum.app/api/v1/annotations/search?search_after=eyJxdWVyeV9oYXNoIjogIjAwNjg5ZmFmMmViM2Y5Nzk0MjkwYmM1Y2Q3YTI2MDI2IiwgInNlYXJjaF9hZnRlciI6IFs2NDA1MDJdLCAicmV

In [49]:
for i,y in annotations_collection.items():
    print(i,y.get_positions("item_description"))

279516 [[306.0, 726.0, 443.0, 755.0], [306.0, 792.0, 443.0, 815.0], [306.0, 846.0, 443.0, 875.0]]
279517 [[132.0, 626.0, 239.0, 653.0], [132.0, 666.0, 239.0, 695.0]]
279518 [[404.0, 588.0, 617.0, 611.0]]
279519 [[198.0, 1099.0, 449.0, 1145.0], [198.0, 1169.0, 449.0, 1193.0]]
279520 [[444.0, 1026.0, 576.0, 1055.0]]
279521 []
279522 [[264, 816, 393, 872], [264, 936, 396, 975], [264, 996, 442, 1057], [264, 1074, 402, 1121], [264, 1134, 454, 1205], [264, 1224, 455, 1257]]
279523 [[332.0, 746.0, 719.0, 793.0], [332.0, 794.0, 719.0, 841.0], [332.0, 842.0, 719.0, 889.0], [332.0, 890.0, 719.0, 937.0], [331.0, 938.0, 719.0, 985.0], [332.0, 986.0, 719.0, 1039.0], [332.0, 1040.0, 719.0, 1081.0], [331.0, 1082.0, 719.0, 1134.0], [332.0, 1134.0, 719.0, 1182.0], [332.0, 1182.0, 719.0, 1231.0], [332.0, 1232.0, 719.0, 1279.0], [332.0, 1280.0, 719.0, 1327.0], [332.0, 1328.0, 719.0, 1375.0], [332.0, 1376.0, 719.0, 1423.0], [332.0, 1424.0, 719.0, 1471.0], [332.0, 1472.0, 719.0, 1523.0], [332.0, 1530.0, 71

{'id': 118391551, 'category': 'datapoint', 'schema_id': 'item_description', 'time_spent': 0.0, 'time_spent_overall': 0.0, 'validation_sources': [], 'content': {'value': 'Sendvičovač ETA Sorento 3151 90010\nčerný/nerez', 'page': 1, 'position': [205.0, 734.0, 515.0, 768.0], 'rir_text': 'Sendvičovač ETA Sorento 3151 90010\nčerný/nerez', 'rir_raw_text': 'Sendvičovač ETA Sorento 3151 90010\nčerný/nerez', 'ocr_text': None, 'ocr_raw_text': None, 'rir_page': 1, 'rir_position': [205.0, 734.0, 515.0, 768.0], 'ocr_position': None, 'rir_confidence': 0.7973150204943078, 'connector_position': None, 'connector_text': None, 'normalized_value': None}, 'url': 'https://levchenko.rossum.app/api/v1/annotations/1298371/content/118391551'}
{'id': 118391563, 'category': 'datapoint', 'schema_id': 'item_description', 'time_spent': 0.0, 'time_spent_overall': 0.0, 'validation_sources': [], 'content': {'value': 'Recyklační příspěvek 2-11 (malá bílá - aku\nvysavače)', 'page': 1, 'position': [210.0, 769.0, 515.0, 81