Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement]: langchain milvus col.flush slowly #31407

Closed
1 task done
JamesBonddu opened this issue Mar 19, 2024 · 3 comments
Closed
1 task done

[Enhancement]: langchain milvus col.flush slowly #31407

JamesBonddu opened this issue Mar 19, 2024 · 3 comments
Labels
kind/enhancement Issues or changes related to enhancement stale indicates no udpates for 30 days

Comments

@JamesBonddu
Copy link

JamesBonddu commented Mar 19, 2024

Is there an existing issue for this?

  • I have searched the existing issues

What would you like to be added?

def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[List[dict]] = None,
        timeout: Optional[int] = None,
        batch_size: int = 1000,
        **kwargs: Any,
    ) -> List[str]:
        """Insert text data into Milvus.

        Inserting data when the collection has not be made yet will result
        in creating a new Collection. The data of the first entity decides
        the schema of the new collection, the dim is extracted from the first
        embedding and the columns are decided by the first metadata dict.
        Metadata keys will need to be present for all inserted values. At
        the moment there is no None equivalent in Milvus.

        Args:
            texts (Iterable[str]): The texts to embed, it is assumed
                that they all fit in memory.
            metadatas (Optional[List[dict]]): Metadata dicts attached to each of
                the texts. Defaults to None.
            timeout (Optional[int]): Timeout for each batch insert. Defaults
                to None.
            batch_size (int, optional): Batch size to use for insertion.
                Defaults to 1000.

        Raises:
            MilvusException: Failure to add texts

        Returns:
            List[str]: The resulting keys for each inserted element.
        """
        from pymilvus import Collection, MilvusException

        texts = list(texts)

        try:
            embeddings = self.embedding_func.embed_documents(texts)
        except NotImplementedError:
            embeddings = [self.embedding_func.embed_query(x) for x in texts]

        if len(embeddings) == 0:
            logger.debug("Nothing to insert, skipping.")
            return []

        # If the collection hasn't been initialized yet, perform all steps to do so
        if not isinstance(self.col, Collection):
            kwargs = {"embeddings": embeddings, "metadatas": metadatas}
            if self.partition_names:
                kwargs["partition_names"] = self.partition_names
            if self.replica_number:
                kwargs["replica_number"] = self.replica_number
            if self.timeout:
                kwargs["timeout"] = self.timeout
            self._init(**kwargs)

        # Dict to hold all insert columns
        insert_dict: dict[str, list] = {
            self._text_field: texts,
            self._vector_field: embeddings,
        }

        if self._metadata_field is not None:
            for d in metadatas:
                insert_dict.setdefault(self._metadata_field, []).append(d)
        else:
            # Collect the metadata into the insert dict.
            if metadatas is not None:
                for d in metadatas:
                    for key, value in d.items():
                        if key in self.fields:
                            insert_dict.setdefault(key, []).append(value)

        # Total insert count
        vectors: list = insert_dict[self._vector_field]
        total_count = len(vectors)

        pks: list[str] = []

        assert isinstance(self.col, Collection)
        for i in range(0, total_count, batch_size):
            # Grab end index
            end = min(i + batch_size, total_count)
            # Convert dict to list of lists batch for insertion
            insert_list = [insert_dict[x][i:end] for x in self.fields]
            # Insert into the collection.
            try:
                res: Collection
                res = self.col.insert(insert_list, timeout=timeout, **kwargs)
                pks.extend(res.primary_keys)
            except MilvusException as e:
                logger.error(
                    "Failed to insert batch starting at entity: %s/%s", i, total_count
                )
                raise e
        self.col.flush()
        return pks

I'm trying to use langchain 0.0.332 and 0.1.12, but col.insert is still slowly.

sofware version:
os: ubuntu 20.04
python: 3.10
milvus: 2.3.11-gpu
milvus-client: 2.3.6

maybe we can use milvus its auto flush

https://milvus.io/docs/configure_quota_limits.md#quotaAndLimitsflushRateenabled

Why is this needed?

No response

Anything else?

No response

@JamesBonddu JamesBonddu added the kind/enhancement Issues or changes related to enhancement label Mar 19, 2024
@xiaofan-luan
Copy link
Contributor

is there a special reason we need to flush?

  1. just FYI, even if you don't do flush your data can be still visible.
  2. Flush will trigger to index build asap, usually you just need to call it once(when finish batch insertion) or never call it
  3. Flush does take time(usually could up to several seconds), so you can also run it asyncly

@JamesBonddu
Copy link
Author

JamesBonddu commented Mar 20, 2024

i made langchain's merge request

langchain-ai/langchain#19300

baskaryan added a commit to langchain-ai/langchain that referenced this issue Mar 26, 2024
rahul-trip pushed a commit to daxa-ai/langchain that referenced this issue Mar 27, 2024
bechbd pushed a commit to bechbd/langchain that referenced this issue Mar 29, 2024
gkorland pushed a commit to FalkorDB/langchain that referenced this issue Mar 30, 2024
chrispy-snps pushed a commit to chrispy-snps/langchain that referenced this issue Mar 30, 2024
chrispy-snps pushed a commit to chrispy-snps/langchain that referenced this issue Mar 30, 2024
Copy link

stale bot commented Apr 20, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Rotten issues close after 30d of inactivity. Reopen the issue with /reopen.

@stale stale bot added the stale indicates no udpates for 30 days label Apr 20, 2024
hinthornw pushed a commit to langchain-ai/langchain that referenced this issue Apr 26, 2024
@stale stale bot closed this as completed May 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Issues or changes related to enhancement stale indicates no udpates for 30 days
Projects
None yet
Development

No branches or pull requests

2 participants