In [None]:
import dotenv
import importlib.util
import logging
import json
import os
import pathlib
import typing

import airbyte as ab
import pathway as pw
from pathway.xpacks.llm import embedders, parsers, splitters, vector_store
from unstructured.chunking.title import chunk_by_title
# from unstructured.documents.elements import Title

In [None]:
# Make sure libmagic is available
LIBMAGIC_AVAILABLE = bool(importlib.util.find_spec("magic"))
assert LIBMAGIC_AVAILABLE

In [None]:
logger = logging.getLogger(__name__)

In [None]:
config = dotenv.dotenv_values('.env')

## Parameters

In [None]:
protocol_name = 'aave'
prop_order_by = 'asc'
api_key = config['BOARDROOM_API_KEY']

## Schemas

In [None]:
class AirbyteSchema(pw.Schema):
    _airbyte_raw_id: str
    _airbyte_extracted_at: pw.DateTimeNaive
    _airbyte_meta: dict

class BoardroomAPI(AirbyteSchema):
    stream: str
    data: pw.Json
    nextcursor: str | None


## Data Extractor

In [None]:
from airbyte.caches import CacheBase
from airbyte.sources import Source, get_source
import pandas as pd
from pathway.io.python import ConnectorSubject

In [None]:
class AirbyteAPIConector(ConnectorSubject):

    source: Source

    def __init__(
        self,
        # Airbyte specific
        name: str,
        config: dict[str, typing.Any] | None = None,
        *args,
        streams: str | list[str] | None = None,
        version: str | None = None,
        source_manifest: bool | dict | pathlib.Path | str = False,
        install_if_missing: bool = True,
        install_root: pathlib.Path | None = None,
        # Airbyte source.read specific
        cache: CacheBase | None = None,
        force_full_refresh: bool = False,
        skip_validation: bool = False,
        # Pathway specific
        # mode: str, # TODO: how to perform streaming vs static?
        # refresh_interval_ms: int, # used for time.sleep. This is handled by airbyte through correct params in manifest
        # *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.source = get_source(
            name=name,
            config=config,
            streams=streams,
            source_manifest=source_manifest,
            install_if_missing=install_if_missing,
            install_root=install_root
        )
        # check source yaml definition is correct
        self.source.check()

        # select streams
        if streams: self.source.select_streams(streams)
        else: self.source.select_all_streams()

        # read parameters
        self.cache = cache
        self.force_full_refresh = force_full_refresh
        self.skip_validation = skip_validation

        # connector parameters
        # self.mode = mode
        # self.refresh_interval = refresh_interval_ms / 1000.0

    def run(self):
        result = self.source.read(
            cache=self.cache,
            force_full_refresh=self.force_full_refresh,
            skip_validation=self.skip_validation
        )
        for stream_name, data in result.streams.items():
            # NOTICE: workaround to remove duplicate records that contains the latest page
            # with null cursor. This happens when running multiple times with force_full_refresh
            # TODO: how to clean up cache completely so recods with null nextCursor are removed
            df = data.to_pandas()
            df_clean = df.dropna(how='any', ignore_index=True) # drop record pages with null nextCursor
            df_null = df[df.isna().any(axis=1)] # get record pages with null nextCursor
            df_final = pd.concat([
                df_clean,
                df_null.iloc[[0], :] # include the latest page
            ])
            records = df_final.to_dict(orient='records')

            for row in records:
                self.next(stream=stream_name, **row)
            # for row in list(data):
            #     self.next(stream=stream_name, **dict(row))
            

    def on_stop(self):
        pass
        


## Parser Config

### Custom Parser UDF

In [None]:
from collections.abc import Callable
from io import BytesIO
from pathway.optional_import import optional_imports
# from typing import TYPE_CHECKING, Any, Literal

from typing import Any


In [None]:
class CustomParseUnstructured(pw.UDF):
    """
    Parse document using `https://unstructured.io/ <https://unstructured.io/>`_.

    All arguments can be overridden during UDF application.

    Args:
        - mode: single, elements or paged.
          When single, each document is parsed as one long text string.
          When elements, each document is split into unstructured's elements.
          When paged, each pages's text is separately extracted.
        - post_processors: list of callables that will be applied to all extracted texts.
        - **unstructured_kwargs: extra kwargs to be passed to unstructured.io's `partition` function
    """

    def __init__(
        self,
        mode: str = "single",
        post_processors: list[Callable] | None = None,
        **unstructured_kwargs: Any,
    ):
        with optional_imports("xpack-llm-docs"):
            import unstructured.partition.auto  # noqa:F401

        super().__init__()
        _valid_modes = {"single", "elements", "paged"}
        if mode not in _valid_modes:
            raise ValueError(
                f"Got {mode} for `mode`, but should be one of `{_valid_modes}`"
            )

        self.kwargs = dict(
            mode=mode,
            post_processors=post_processors or [],
            unstructured_kwargs=unstructured_kwargs,
        )

    # # `links` and `languages` in metadata are lists, so their content should be added.
    # # We don't want return `coordinates`, `parent_id` and `category_depth` - these are
    # # element specific (i.e. they can differ for elements on the same page)
    # def _combine_metadata(self, left: dict, right: dict) -> dict:
    #     result = {}
    #     links = left.pop("links", []) + right.pop("links", [])
    #     languages = list(set(left.pop("languages", []) + right.pop("languages", [])))
    #     result.update(left)
    #     result.update(right)
    #     result["links"] = links
    #     result["languages"] = languages
    #     result.pop("coordinates", None)
    #     result.pop("parent_id", None)
    #     result.pop("category_depth", None)
    #     return result

    # def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
    def __wrapped__(self, contents: bytes, **kwargs) -> list[dict]:
        """
        Parse the given document:

        Args:
            - contents: document contents
            - **kwargs: override for defaults set in the constructor

        Returns:
            a list of pairs: text chunk and metadata
            The metadata is obtained from Unstructured, you can check possible values
            in the `Unstructed documentation <https://unstructured-io.github.io/unstructured/metadata.html>`
            Note that when `mode` is set to `single` or `paged` some of these fields are
            removed if they are specific to a single element, e.g. `category_depth`.
        """
        import unstructured.partition.auto

        kwargs = {**self.kwargs, **kwargs}

        # print("kwargs", kwargs)

        elements = unstructured.partition.auto.partition(
            file=BytesIO(contents), **kwargs.pop("unstructured_kwargs")
        )

        post_processors = kwargs.pop("post_processors")
        for element in elements:
            for post_processor in post_processors:
                element.apply(post_processor)

        mode = kwargs.pop("mode")

        if kwargs:
            raise ValueError(f"Unknown arguments: {', '.join(kwargs.keys())}")

        if mode == "elements":
            # docs: list[tuple[str, dict]] = list()
            # for element in elements:
            #     # NOTE(MthwRobinson) - the attribute check is for backward compatibility
            #     # with unstructured<0.4.9. The metadata attributed was added in 0.4.9.
            #     # if hasattr(element, "metadata"):
            #     #     metadata = element.metadata.to_dict()
            #     # else:
            #     #     metadata = {}
            #     # if hasattr(element, "category"):
            #     #     metadata["category"] = element.category
            #     # docs.append((str(element), metadata))
            docs: list[dict] = [el.to_dict() for el in elements]
        # elif mode == "paged":
        #     text_dict: dict[int, str] = {}
        #     meta_dict: dict[int, dict] = {}

        #     for idx, element in enumerate(elements):
        #         if hasattr(element, "metadata"):
        #             metadata = element.metadata.to_dict()
        #         else:
        #             metadata = {}
        #         page_number = metadata.get("page_number", 1)

        #         # Check if this page_number already exists in docs_dict
        #         if page_number not in text_dict:
        #             # If not, create new entry with initial text and metadata
        #             text_dict[page_number] = str(element) + "\n\n"
        #             meta_dict[page_number] = metadata
        #         else:
        #             # If exists, append to text and update the metadata
        #             text_dict[page_number] += str(element) + "\n\n"
        #             meta_dict[page_number] = self._combine_metadata(
        #                 meta_dict[page_number], metadata
        #             )

        #     # Convert the dict to a list of dicts representing documents
        #     docs = [(text_dict[key], meta_dict[key]) for key in text_dict.keys()]
        # elif mode == "single":
        #     metadata = {}
        #     for element in elements:
        #         if hasattr(element, "metadata"):
        #             metadata = self._combine_metadata(
        #                 metadata, element.metadata.to_dict()
        #             )
        #     text = "\n\n".join([str(el) for el in elements])
        #     docs = [(text, metadata)]
        else:
            raise ValueError(f"mode of {mode} not supported.")
        return docs

    def __call__(self, contents: pw.ColumnExpression, **kwargs) -> pw.ColumnExpression:
        """
        Parse the given document.

        Args:
            - contents: document contents
            - **kwargs: override for defaults set in the constructor

        Returns:
            A column with a list of pairs for each query. Each pair is a text chunk and
            associated metadata.
            The metadata is obtained from Unstructured, you can check possible values
            in the `Unstructed documentation <https://unstructured-io.github.io/unstructured/metadata.html>`
            Note that when `mode` is set to `single` or `paged` some of these fields are
            removed if they are specific to a single element, e.g. `category_depth`.
        """
        return super().__call__(contents, **kwargs)

In [None]:
# export CFLAGS="-Wno-nullability-completeness" if trying to install pillow-heif missingn module
# libmagic -> Required for having libmagic working:
# - brew install libmagic
# - pip install python-magic-bin

# parser = parsers.ParseUnstructured(mode="elements")
parser = CustomParseUnstructured(mode="elements") # TODO: do we need extra cleaning function as post_processors ?

### Custom Embedder UDF

In [None]:
import asyncio
import contextvars
from functools import partial
from litellm import (
    client,
    embedding,
    exception_type,
)
from litellm.utils import EmbeddingResponse

from pathway.xpacks.llm.embedders import BaseEmbedder, _monkeypatch_openai_async
from pathway.internals import udfs
from pathway.optional_import import optional_imports

In [None]:
@client
async def aembedding(*args, **kwargs) -> EmbeddingResponse:
    """
    Asynchronously calls the `embedding` function with the given arguments and keyword arguments.

    Parameters:
    - `args` (tuple): Positional arguments to be passed to the `embedding` function.
    - `kwargs` (dict): Keyword arguments to be passed to the `embedding` function.

    Returns:
    - `response` (Any): The response returned by the `embedding` function.
    """
    loop = asyncio.get_event_loop()
    model = args[0] if len(args) > 0 else kwargs["model"]
    ### PASS ARGS TO Embedding ###
    kwargs["aembedding"] = True
    
    # custom_llm_provider = None
    custom_llm_provider = kwargs.get("custom_llm_provider", None) # NOTICE: required update

    try:
        # Use a partial function to pass your keyword arguments
        func = partial(embedding, *args, **kwargs)

        # Add the context to the function
        ctx = contextvars.copy_context()
        func_with_context = partial(ctx.run, func)

        # NOTICE: don't need that
        # _, custom_llm_provider, _, _ = get_llm_provider(
        #     model=model, api_base=kwargs.get("api_base", None)
        # )

        if (
            custom_llm_provider == "openai"
            # or custom_llm_provider == "azure"
            # or custom_llm_provider == "xinference"
            # or custom_llm_provider == "voyage"
            # or custom_llm_provider == "mistral"
            # or custom_llm_provider == "custom_openai"
            # or custom_llm_provider == "triton"
            # or custom_llm_provider == "anyscale"
            # or custom_llm_provider == "openrouter"
            # or custom_llm_provider == "deepinfra"
            # or custom_llm_provider == "perplexity"
            # or custom_llm_provider == "groq"
            # or custom_llm_provider == "nvidia_nim"
            # or custom_llm_provider == "volcengine"
            # or custom_llm_provider == "deepseek"
            # or custom_llm_provider == "fireworks_ai"
            # or custom_llm_provider == "ollama"
            # or custom_llm_provider == "vertex_ai"
            # or custom_llm_provider == "databricks"
            # or custom_llm_provider == "watsonx"
        ):  # currently implemented aiohttp calls for just azure and openai, soon all.
            # Await normally
            init_response = await loop.run_in_executor(None, func_with_context)
            if isinstance(init_response, dict):
                response = EmbeddingResponse(**init_response)
            elif isinstance(init_response, EmbeddingResponse):  ## CACHING SCENARIO
                response = init_response
            elif asyncio.iscoroutine(init_response):
                response = await init_response
        else:
            # Call the synchronous function using run_in_executor
            response = await loop.run_in_executor(None, func_with_context)
        if response is not None and hasattr(response, "_hidden_params"):
            response._hidden_params["custom_llm_provider"] = custom_llm_provider
        return response
    except Exception as e:
        custom_llm_provider = custom_llm_provider or "openai"
        raise exception_type(
            model=model,
            custom_llm_provider=custom_llm_provider,
            original_exception=e,
            completion_kwargs=args,
            extra_kwargs=kwargs,
        )



In [None]:
class CustomLiteLLMEmbedder(BaseEmbedder):
    """Pathway wrapper for `litellm.embedding`.

    Model has to be specified either in constructor call or in each application, no default
    is provided. The capacity, retry_strategy and cache_strategy need to be specified
    during object construction. All other arguments can be overridden during application.

    Args:
        - capacity: Maximum number of concurrent operations allowed.
            Defaults to None, indicating no specific limit.
        - retry_strategy: Strategy for handling retries in case of failures.
            Defaults to None, meaning no retries.
        - cache_strategy: Defines the caching mechanism. To enable caching,
            a valid `CacheStrategy` should be provided.
            See `Cache strategy <https://pathway.com/developers/api-docs/udfs#pathway.udfs.CacheStrategy>`_
            for more information. Defaults to None.
        - model: The embedding model to use.
        - timeout: The timeout value for the API call, default 10 mins
        - litellm_call_id: The call ID for litellm logging.
        - litellm_logging_obj: The litellm logging object.
        - logger_fn: The logger function.
        - api_base: Optional. The base URL for the API.
        - api_version: Optional. The version of the API.
        - api_key: Optional. The API key to use.
        - api_type: Optional. The type of the API.
        - custom_llm_provider: The custom llm provider.

    Any arguments can be provided either to the constructor or in the UDF call.
    To specify the `model` in the UDF call, set it to None.

    Example:

    >>> import pathway as pw
    >>> from pathway.xpacks.llm import embedders
    >>> embedder = embedders.LiteLLMEmbedder(model="text-embedding-ada-002")
    >>> t = pw.debug.table_from_markdown('''
    ... txt
    ... Text
    ... ''')
    >>> t.select(ret=embedder(pw.this.txt))
    <pathway.Table schema={'ret': list[float]}>

    >>> import pathway as pw
    >>> from pathway.xpacks.llm import embedders
    >>> embedder = embedders.LiteLLMEmbedder()
    >>> t = pw.debug.table_from_markdown('''
    ... txt  | model
    ... Text | text-embedding-ada-002
    ... ''')
    >>> t.select(ret=embedder(pw.this.txt, model=pw.this.model))
    <pathway.Table schema={'ret': list[float]}>
    """

    def __init__(
        self,
        *,
        capacity: int | None = None,
        retry_strategy: udfs.AsyncRetryStrategy | None = None,
        cache_strategy: udfs.CacheStrategy | None = None,
        model: str | None = None,
        **llmlite_kwargs,
    ):
        with optional_imports("xpack-llm"):
            import litellm  # noqa:F401

        _monkeypatch_openai_async()
        executor = udfs.async_executor(capacity=capacity, retry_strategy=retry_strategy)
        super().__init__(
            executor=executor,
            cache_strategy=cache_strategy,
        )
        self.kwargs = dict(llmlite_kwargs)
        if model is not None:
            self.kwargs["model"] = model

    async def __wrapped__(self, input, **kwargs) -> list[float]:
        """Embed the documents

        Args:
            - input: mandatory, the string to embed.
            - **kwargs: optional parameters, if unset defaults from the constructor
              will be taken.
        """
        # import litellm

        kwargs = {**self.kwargs, **kwargs}
        # ret = await litellm.aembedding(input=[input or "."], **kwargs)
        ret = await aembedding(input=[input or "."], **kwargs)
        return ret.data[0]["embedding"]



### Ad-hoc UDFs

In [None]:
@pw.udf
def to_json(val: pw.Json) -> pw.Json:
    return pw.Json(json.loads(val.as_str()))

In [None]:
# @pw.udf(executor=pw.udfs.async_executor())

@pw.udf
def filter_document(document: pw.Json, fields: list[str]) -> pw.Json:
    data = { **document.as_dict() }
    # data = { "refId": document["refId"] }
    for field in fields:
        if field in data:
            data.pop(field)
    return data

In [None]:
# u_logger = logging.getLogger("unstructured")
# u_logger.setLevel(logging.INFO)

### Custom reducers

In [None]:
class JSONAccumulator(pw.BaseCustomAccumulator):
  def __init__(self, initialData: pw.Json):
    self.data: list[dict] = list()
    self.value: dict = { **initialData.as_dict() }

  @classmethod
  def from_row(self, row):
    [val] = row
    return JSONAccumulator(val)

  def update(self, other):
    self.data.append(other.value)

  def compute_result(self) -> list[dict]:
    return self.data


json_acc = pw.reducers.udf_reducer(JSONAccumulator)

## Protocol

In [None]:
class Protocol(pw.Schema):
    cname: str
    name: str
    categories: str
    is_enabled: bool
    active_on_website: bool
    total_proposals: int
    total_votes: int
    unique_voters: int
    # tokens: list[object]
    ptype: str
    # delegated_support: dict


In [None]:
def protocol_mapper(raw_data: bytes) -> bytes:
    # logger.info(raw_data.decode())
    data = json.loads(raw_data.decode())["data"]
    return json.dumps(
        {
            "cname": data["cname"],
            "name": data["name"],
            "categories": ",".join(data["categories"]),
            "is_enabled": data["isEnabled"],
            "active_on_website": data["activeOnWebsite"],
            "total_proposals": data["totalProposals"],
            "total_votes": data["totalVotes"],
            "unique_voters": data["uniqueVoters"],
            "ptype": data["type"],
        }
    ).encode()


In [None]:

protocol = pw.io.http.read(
    f"https://api.boardroom.info/v1/protocols/{protocol_name}?key={api_key}",
    method='GET',
    headers={"Accept": "application/json"},
    # format="raw",
    schema=Protocol,
    response_mapper=protocol_mapper
)

In [None]:
protocol.schema

In [None]:
protocol

## Proposals

### Extraction

In [None]:
@pw.udf
def append_parent_id(content: pw.Json, parent_id: str) -> pw.Json:
    data = { "parent_id": parent_id, **content.as_dict() }
    return data

In [None]:
proposals = None

#### Using Pathway HTTP connector (data exploration/test mode)

In [None]:
# NOTICE: need to override base schema
# class BoardroomAPI(pw.Schema):
#     data: pw.Json
#     nextCursor: str

In [None]:
# proposals = pw.io.http.read(
#     f"https://api.boardroom.info/v1/protocols/{protocol_name}/proposals?key={api_key}&orderByIndexedAt{prop_order_by}",
#     method='GET',
#     headers={"Accept": "application/json"},
#     format="json",
#     schema=BoardroomAPI
#     # schema=Proposal,
#     # response_mapper=proposal_mapper
# )
# proposals = proposals.flatten(proposals.data)

#### Using Airbyte Connector

In [None]:
cache = ab.caches.new_local_cache(
    cache_name='boardroom_cache',
    cache_dir='./boardroom_ab_cache',
    cleanup=False # NOTICE: CLI param
)

In [None]:
api_connector = AirbyteAPIConector(
    name='boardroom-api',
    cache=cache,
    config={
        "api_key": api_key, # NOTICE: CLI param
        "cname": protocol_name, # NOTICE: CLI param
        "page_size": 1 # TODO: currently not used
    },
    source_manifest=pathlib.Path("../boardroom/connector.yaml"), # NOTICE: CLI parma
    streams="proposals", # NOTICE: doing just an individual strem
    force_full_refresh=False, # NOTICE: CLI param
)

In [None]:
proposals = pw.io.python.read(
    api_connector,
    schema=BoardroomAPI
)

# NOTICE: With Airbyte we need to parse data to Json during pre-processing
proposals = proposals.with_columns(
    data=to_json(proposals.data)
)

proposals = proposals.flatten(proposals.data)

#### Continue pre-processing

In [None]:
proposals = proposals.with_columns(
    refId=pw.this.data.get("refId", default=pw.Json("")).as_str(),
    title=pw.this.data.get("title", default=pw.Json("")).as_str(),
    # metadata=pw.apply_with_type(lambda x: filter_document(x, ["refId", "title", "content"]), dict, pw.this.data),
    # metadata=pw.apply_with_type(lambda x: filter_document(x), dict, pw.this.data),
    metadata=filter_document(pw.this.data, ["refId", "title", "content"]),
)

In [None]:
proposals.schema

In [None]:
proposals_table = proposals.select(
    element_id=pw.this.refId,
    text=pw.this.title,
    metadata=pw.this.metadata,
    type="Title"
    # content=build_main_element(pw.this.refId, pw.this.title, pw.this.metadata),
)

In [None]:
proposals_table.schema

In [None]:
proposals_table

### Partitioning content

In [None]:
proposal_contents = None

In [None]:
# Proposal content
proposal_contents = proposals.select(
    refId=pw.this.refId,
    # content=pw.apply_with_type(lambda x: f"{x}".encode() if x else b"", bytes, pw.this.data.get("content", default=None)),
    content=parser(pw.apply_with_type(lambda x: f"{x.as_str()}".encode() if x else b"", bytes, pw.this.data.get("content", default=None))),
)
proposal_contents = proposal_contents.flatten(pw.this.content)
# # proposals = proposals.select(refId=pw.this.refId, title=pw.this.title, text=pw.this.content[0], metadata=pw.this.content[1])
# # proposals = proposals.select(refId=pw.this.refId, title=pw.this.title, text=pw.this.document['text'].as_str(), document=pw.this.document)
# proposals = proposals.select(refId=pw.this.refId, document=pw.this.content)

In [None]:
proposal_contents.schema

#### Partition analysis

* Which filetype is detected?

In [None]:
from unstructured.file_utils.filetype import detect_filetype, is_json_processable

In [None]:
# cheking file type detection during partition

@pw.udf
def detect(data: pw.Json) -> str:
    encoded = data.as_str().encode()
    filetype = detect_filetype(file=BytesIO(encoded))
    return str(filetype)
    

meta = proposals.select(
    metadata=pw.this.data.get("content", default=None)
)
meta = meta.with_columns(
    filetype=detect(pw.this.metadata),
)
meta.schema

In [None]:
# TODO:
# For now, all text data is being recognized as txt files instead of md.
meta

- Grouping partitioned elements by proposal

In [None]:
grouped_p = proposal_contents.groupby(proposal_contents.refId).reduce(proposal_contents.refId, contents=json_acc(proposal_contents.content))

In [None]:
grouped_p.schema

In [None]:
pw.io.jsonlines.write(grouped_p, "proposals-partitioned.jsonl")

### Partition + Chunking content

In [None]:
from typing import Optional

In [None]:
# Details on chunking techniques
# - basic -> combines sequential elements to maximally fill each chunk
# - by_title -> preserves section boundaries and optionally page boundaries
# https://docs.unstructured.io/open-source/core-functionality/chunking

# Chunk Parameters 
# - Common params
include_orig_elements: Optional[bool] = None # Default to True
max_characters: Optional[int] = None # hard-max chars per chunk
new_after_n_chars: Optional[int] = None # soft-max chars per chunk. Cuts off new sections once they reach a length of n characters (soft max). Defaults to  `max_characters` when not specified, which effectively disables any soft window.
overlap: Optional[int] = None # specifies the length of a string ("tail") to be drawn from each chunk and prefixed to the next chunk as a context-preserving mechanism. Must be <= `max_characters`
overlap_all: Optional[bool] = None # Default to False. Apply overlap between "normal" chunks formed from whole elements and not subject to text-splitting. Could produce `pollution` on clean semantic chunks
# - by_title specific params
combine_text_under_n_chars: Optional[int] = None # Default to `max_characters`. Combines elements until a section reaches a length of n characters. Capped at `new_after_n_chars`
multipage_sections: Optional[bool] = None # If True, sections can span multiple pages. Defaults to True

In [None]:
# TODO: consider other chunking parameters
parser_apply_chunking = CustomParseUnstructured(
    mode="elements",
    post_processors=None, # UDF post-processors to be applied to resulting elements coming from the parser
    # Following kwargs will be added to the unstructred_kwargs dict
    chunking_strategy="by_title",
    include_orig_elements=include_orig_elements,
    max_characters=max_characters,
    new_after_n_chars=new_after_n_chars,
    overlap=overlap,
    overlap_all=overlap_all,
    combine_text_under_n_chars=combine_text_under_n_chars,
    multpage_sections=multipage_sections,
)

In [None]:
proposals_chunks = None

In [None]:
proposal_chunks = proposals.select(
    refId=pw.this.refId,
    content=parser_apply_chunking(
        pw.apply_with_type(lambda x: f"{x.as_str()}".encode() if x else b"", bytes, pw.this.data.get("content", default=None))
    ),
)
proposal_chunks = proposal_chunks.flatten(pw.this.content)

In [None]:
proposal_chunks.schema

#### Chunking Analysis

- Groupung chunks by proposal

In [None]:
grouped_c = proposal_chunks.groupby(proposal_chunks.refId).reduce(proposal_chunks.refId, contents=json_acc(proposal_chunks.content))
grouped_c.schema

In [None]:
pw.io.jsonlines.write(grouped_c, "proposals-chunked.jsonl")

### Flatening contents

In [None]:
# Using partition only
# proposal_contents_splitted = proposal_contents.select(
#     element_id=pw.this.content.get("element_id", default=pw.Json("")).as_str(),
#     text=pw.this.content.get("text", default=pw.Json("")).as_str(),
#     metadata=append_parent_id(pw.this.content["metadata"], pw.this.refId),
#     type=pw.this.content.get("type", default=pw.Json("")).as_str(),
# )

# Using partition + chunking
proposal_contents_splitted = proposal_chunks.select(
    element_id=pw.this.content.get("element_id", default=pw.Json("")).as_str(),
    text=pw.this.content.get("text", default=pw.Json("")).as_str(),
    metadata=append_parent_id(pw.this.content["metadata"], pw.this.refId),
    type=pw.this.content.get("type", default=pw.Json("")).as_str(),
)

In [None]:
proposal_contents_splitted.schema

In [None]:
proposal_contents_splitted

### Joining results

In [None]:
proposals_table = proposals_table.concat_reindex(proposal_contents_splitted)
proposals_table.schema

#### Intermediate storage

In [None]:
pw.io.jsonlines.write(proposals_table, "proposals.jsonl")

### Embeddings

In [None]:
EMBEDDINGS_API_KEY = "empty-api-key" # NOTICE: can't be empty otherwise python API throws an error
EMBEDDINGS_MODEL = "Nomic-embed-text-v1.5"
EMBEDDINGS_API_BASE = "https://llama3.gaianet.network/v1"

In [None]:
embedder = CustomLiteLLMEmbedder(
    api_base=EMBEDDINGS_API_BASE,
    api_key=EMBEDDINGS_API_KEY,
    custom_llm_provider="openai", # litellm will use the .llms.openai.OpenAIChatCompletion to make the request
    model=EMBEDDINGS_MODEL,
    # NOTICE: tune parallelization
    # capacity=5,
    # retry_strategy=pw.asynchronous.udfs.FixedDelayRetryStrategy(delay_ms=10000),
    # cache_strategy=pw.udfs.DefaultCache(),
)

In [None]:
proposals_vector = proposals_table.select(
    text=pw.this.text,
    embedding=embedder(pw.this.text),
    metadata=pw.this.metadata,
)
proposals_vector.schema

In [None]:
proposals_vector

### Vector Store

In [None]:
# Notes: data sources should match schema (data: bytes, _metadata: any)

# doc_store = VectorStoreServer(
#     # *data_sources(configuration["sources"]),
#     # *data_sources, # TODO:
#     embedder=embedder,
#     # splitter=splitters.TokenCountSplitter(max_tokens=400),
#     parser=parser,
# )

In [None]:
# index = KNNIndex(
#     enriched_documents.vector, enriched_documents, n_dimensions=embedding_dimension
# )
# ...
# query += query.select(
#     vector=embedder(pw.this.query),
# )

# query_context = query + index.get_nearest_items(
#     query.vector, k=3, collapse_rows=True
# ).select(documents_list=pw.this.chunk)

## Voter

In [None]:
# class Voters(pw.Schema):
#     data: pw.Json

## TODO: fix this voter -> voters as data : voters[] @santiago
    
voters_raw = pw.io.http.read(
    f"https://api.boardroom.info/v1/protocols/{protocol_name}/voters?key={api_key}",
    method='GET',
    headers={"Accept": "application/json"},
    format="json",
    schema=BoardroomAPI,
    # response_mapper=voter_mapper
)

In [None]:
voters_splitted = voters_raw.flatten(voters_raw.data)
voters_splitted

In [None]:
def map_all_protocols(protocols: pw.Json):
    return "".join(protocol["protocol"].as_str() + ", " for protocol in protocols)

def mapper(protocols: pw.Json):
    for protocol in protocols:
        if protocol["protocol"].as_str() == protocol_name:
            return protocol

enhanced_voters = voters_splitted.select(
    address=pw.this.data["address"],
    voter_protocol_data=pw.apply(mapper, pw.this.data["protocols"]), 
    all_protocols=pw.apply(map_all_protocols, pw.this.data["protocols"]),
)

In [None]:
pw.debug.compute_and_print(enhanced_voters)

## Delegates

In [None]:
delegations_raw = pw.io.http.read(
    f"https://api.boardroom.info/v1/delegates/getDelegatorsByProtocol/{protocol_name}?key={api_key}",
    method='GET',
    headers={"Accept": "application/json"},
    format="json",
    schema=BoardroomAPI
)

delegations_flattened = delegations_raw.flatten(delegations_raw.data)

delegations = delegations_flattened.select(
    adapter=pw.this.data["adapter"],
    delegatedFrom=pw.this.data["address"],
    delegatedTo=pw.this.data["addressDelegatedTo"],
    protocol=pw.this.data["protocol"],
)

## Delegate Pitches

In [None]:

delegation_pitches_raw = pw.io.http.read(
    f"https://api.boardroom.info/v1/getDelegationPitchesByProtocol/{protocol_name}?key={api_key}",
    method="GET",
    headers={"Accept": "application/json"},
    format="json",
    schema=BoardroomAPI
)


delegation_pitches_data = delegation_pitches_raw.select(
    address = pw.this.data["delegationPitches"]
)

delegation_pitches_flattened = delegation_pitches_data.flatten(delegation_pitches_data.address)


delegation_pitches= delegation_pitches_flattened.select(
    address= pw.this.address["address"],
    delegate_pitch = pw.this.address["delegationPitch"],
    protoocol = pw.this.address["protocol"]
)


In [None]:
pw.debug.compute_and_print(delegation_pitches)

## Run workflow

In [None]:
#%%capture --no-display
pw.run()