Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Filtering by partition key fails to return results for bulk imported data #33237

Closed
1 task done
cstub opened this issue May 21, 2024 · 7 comments
Closed
1 task done
Assignees
Labels
kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.

Comments

@cstub
Copy link

cstub commented May 21, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Environment

- Milvus version: 2.4.1
- Deployment mode(standalone or cluster): standalone
- MQ type(rocksmq, pulsar or kafka): -
- SDK version(e.g. pymilvus v2.0.0rc2): pymilvus 2.4.3
- OS(Ubuntu or CentOS): Ubuntu 18.04
- CPU/Memory: CPU (AMD Ryzen Threadripper 1920X 12-Core Processor) / 64 GB memory
- GPU: -
- Others: -

Current Behavior

I import data using the bulk import feature with a parquet file containing the following fields:

  • item_id (primary key)
  • partition_id (partition key)
  • embedding (float vector)

I can successfully query and filter by item_id.
However, when I attempt to filter by partition_id (partition key) with an existing value, the result set is empty.

This behavior is observed using both the Python SDK and the Attu UI.

This behavior cannot be reproduced when I insert the same data using the Python SDK.

Expected Behavior

When importing data using the bulk import feature and filtering by the partition key with an existing value, the result set should contain all entries for the queried partition key.

Steps To Reproduce

The following script can be used to 
1. create the test data for a bulk import,
2. import the data into a new collection and 
3. query the collection using an existing partition key returning an empty result.

Steps:
1. Create the test data by running the script as is (output is written to a new folder `data`)
2. Upload the bulk data file to the Milvus MinIO
3. Comment `step_1` and uncomment `step_2` in the script, fill out MILVUS_HOST, ITEM_ID (from write_bulk_data output) and PATH_TO_BULK_DATA_FILE and run the script to create and populate the collection
3. Comment `step_2` and uncomment `step_3` to perform the test query.

```python
import time
import uuid

import numpy as np
from pymilvus import CollectionSchema, MilvusClient, FieldSchema, DataType, BulkInsertState, connections
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType
from pymilvus.milvus_client import IndexParams
from pymilvus.orm import utility

COLLECTION_SCHEMA = CollectionSchema(
    fields=[
        FieldSchema(name="item_id", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
        FieldSchema(name="partition_id", dtype=DataType.VARCHAR, max_length=100, is_partition_key=True),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    ]
)


def write_bulk_data(output_path: str):
    writer = LocalBulkWriter(
        schema=COLLECTION_SCHEMA,
        local_path=output_path,
        segment_size=4 * 1024 * 1024,
        file_type=BulkFileType.PARQUET,
    )

    for i in range(100):
        _id = uuid.uuid4().hex
        writer.append_row(
            {
                "item_id": _id,
                "partition_id": "partition-id-1",
                "embedding": np.random.rand(768),
            }
        )

    def callback(location):
        print("Commit completed", location)

    writer.commit(call_back=callback)


def create_collection(client: MilvusClient):
    index_params = IndexParams()
    index_params.add_index(
        field_name="embedding",
        index_type="HNSW",
        index_name="embedding_index",
        metric_type="IP",
        params={"M": 32, "efConstruction": 160},
    )
    client.create_collection(
        collection_name="test_data",
        schema=COLLECTION_SCHEMA,
        index_params=index_params,
        num_partitions=10,
        consistency_level="Strong",
    )


def import_data(address, files):
    try:
        connections.connect(
            address=address,
            db_name="default",
            alias="default",
        )

        task_id = utility.do_bulk_insert(
            collection_name="test_data",
            files=files,
        )
        while True:
            insert_state = utility.get_bulk_insert_state(task_id)
            print(f"Insert state: {insert_state}")
            if insert_state.state in [
                BulkInsertState.ImportFailed,
                BulkInsertState.ImportCompleted,
                BulkInsertState.ImportPersisted,
            ]:
                break
            time.sleep(1)

        while True:
            index_status = utility.index_building_progress(collection_name="test_data", index_name="embedding_index")
            print(
                f"Indexing indexed rows: {index_status.get('indexed_rows', 0)}, pending_index_rows: {index_status.get('pending_index_rows', 0)}"
            )
            if index_status.get("pending_index_rows", 0) <= 0:
                break
            time.sleep(1)
    finally:
        connections.disconnect(alias="default")


def query(client: MilvusClient):
    print("Query [filter by partition_id]:")
    res = client.query(collection_name="test_data", filter="partition_id=='partition-id-1'", consistency_level="Strong")
    print("Result: ", res)


def step_1(output_path: str):
    write_bulk_data(output_path=output_path)


def step_2(milvus_host, files_to_import):
    client = MilvusClient(uri=f"http://{milvus_host}", db_name="default")

    try:
        print("Creating collection...")
        create_collection(client)

        print("Importing data...")
        import_data(milvus_host, files_to_import)
    finally:
        client.close()


def step_3(milvus_host):
    client = MilvusClient(uri=f"http://{milvus_host}", db_name="default")

    try:
        print("Querying data...")
        query(client)
    finally:
        client.close()


if __name__ == "__main__":
    # =========================
    # Step 1: create bulk data
    # =========================
    step_1(output_path="./data")

    # upload data to Milvus Minio

    # ==========================================
    # Step 2: uncomment to import data to Milvus
    # ==========================================
    # MILVUS_HOST = "<MILVUS_HOST>"
    # FILES_TO_IMPORT = ["<PATH TO BULK DATA FILE>"]
    #
    # step_2(
    #     milvus_host=MILVUS_HOST,
    #     files_to_import=FILES_TO_IMPORT,
    # )

    # ===============================
    # Step 3: uncomment to query data
    # ===============================
    # step_3(
    #     milvus_host=MILVUS_HOST,
    # )

Milvus Log

No response

Anything else?

No response

@cstub cstub added kind/bug Issues or changes related a bug needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels May 21, 2024
@yanliang567
Copy link
Contributor

/assign @zhuwenxing
is it a known issue, please help to reproduce it in house

@yanliang567 yanliang567 added triage/needs-information Indicates an issue needs more information in order to work on it. and removed needs-triage Indicates an issue or PR lacks a `triage/foo` label and requires one. labels May 22, 2024
@xiaofan-luan
Copy link
Contributor

/assign @bigsheeper
please also take a look at this issue

@zhuwenxing
Copy link
Contributor

Yes, this issue can be reproduced.
I tried op like >=, == and like

res, _ = self.collection_wrap.query(expr='string_scalar like "0%"', output_fields=[df.string_field])
res
data: ["{'string_scalar': '0_ztK6q5mT', 'uid': 449929140861090687}"] ..., extra_info: {'cost': 0}
res, _ = self.collection_wrap.query(expr=f"{df.string_field} == '0_ztK6q5mT'", output_fields=[df.string_field])
res
data: [] , extra_info: {'cost': 0}
res, _ = self.collection_wrap.query(expr=f"{df.string_field} >= '0'", output_fields=[df.string_field])

res
data: ["{'string_scalar': '0_ztK6q5mT', 'uid': 449929140861090687}", "{'string_scalar': '1_oyct8uIH', 'uid': 449929140861090688}", "{'string_scalar': '2_t9jkRc2w', 'uid': 449929140861090689}", "{'string_scalar': '3_pWq9oVIG', 'uid': 449929140861090690}", "{'string_scalar': '4_Lrb3jzBI', 'uid': 449929140861090691}", "{'string_scalar': '5_oyvSJTIM', 'uid': 449929140861090692}", "{'string_scalar': '6_cgUPjK8c', 'uid': 449929140861090693}", "{'string_scalar': '7_PJnrDVwC', 'uid': 449929140861090694}", "{'string_scalar': '8_cl0k23gb', 'uid': 449929140861090695}", "{'string_scalar': '9_1agks2vt', 'uid': 449929140861090696}"] ..., extra_info: {'cost': 0}

only op == can not get result

@yanliang567 yanliang567 added triage/accepted Indicates an issue or PR is ready to be actively worked on. and removed triage/needs-information Indicates an issue needs more information in order to work on it. labels May 22, 2024
@bigsheeper
Copy link
Contributor

working on it

sre-ci-robot pushed a commit that referenced this issue May 23, 2024
Before executing the import, partition IDs should be reordered according
to partition names. Otherwise, the data might be hashed to the wrong
partition during import. This PR corrects this error.

issue: #33237

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
sre-ci-robot pushed a commit that referenced this issue May 23, 2024
…#33277)

Before executing the import, partition IDs should be reordered according
to partition names. Otherwise, the data might be hashed to the wrong
partition during import. This PR corrects this error.

issue: #33237

pr: #33274

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
@bigsheeper
Copy link
Contributor

/assign @zhuwenxing
/unassign
please help to verify

sre-ci-robot pushed a commit that referenced this issue May 23, 2024
see #33237

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
@yanliang567 yanliang567 removed their assignment May 23, 2024
@zhuwenxing
Copy link
Contributor

/assign @cstub

@cstub
Hi, this issue has been resolved in 2.4-20240524-8990b8b0-amd64. Please review and confirm it.
We will also release a new version shortly.

@zhuwenxing zhuwenxing removed their assignment May 24, 2024
@cstub
Copy link
Author

cstub commented May 24, 2024

@zhuwenxing
I can confirm that filtering by partition key is functioning correctly with the specified version, and the issue can no longer be reproduced.

Thank you for the quick resolution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Issues or changes related a bug triage/accepted Indicates an issue or PR is ready to be actively worked on.
Projects
None yet
Development

No branches or pull requests

5 participants