From c6730a9b8369a21438a0efb2de08c08a9a63d41a Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Tue, 30 Apr 2024 20:34:12 +0800 Subject: [PATCH 1/2] Let thrift client reconnect on insert failure Updated requirements.txt --- python/benchmark/clients/base_client.py | 17 ++++++++++-- python/benchmark/configs/infinity_enwiki.json | 2 +- python/benchmark/requirements.txt | 4 +++ python/infinity/remote_thrift/client.py | 27 +++++++++++++------ python/pyproject.toml | 20 +++++++------- python/requirements.txt | 18 ++++++------- 6 files changed, 58 insertions(+), 30 deletions(-) create mode 100644 python/benchmark/requirements.txt diff --git a/python/benchmark/clients/base_client.py b/python/benchmark/clients/base_client.py index 8099ca54c5..e568959c2c 100644 --- a/python/benchmark/clients/base_client.py +++ b/python/benchmark/clients/base_client.py @@ -8,6 +8,7 @@ from urllib.parse import urlparse import time + class BaseClient: """ Base class for all clients(Qdrant, ES, infinity). @@ -25,6 +26,13 @@ def __init__(self, """ pass + @abstractmethod + def upload(self): + """ + Upload data and build indexes (parameters are parsed by __init__). + """ + pass + @abstractmethod def search(self) -> list[list[Any]]: """ @@ -32,7 +40,7 @@ def search(self) -> list[list[Any]]: The function returns id list. """ pass - + def download_data(self, url, target_path): """ Download dataset and extract it into path. @@ -59,6 +67,11 @@ def run_experiment(self, args): """ run experiment and save results. """ + if args.import_data: + start_time = time.time() + self.upload() + finish_time = time.time() + print(f"upload finish, cost time = {finish_time - start_time}") if args.query: results = self.search() - self.check_and_save_results(results) \ No newline at end of file + self.check_and_save_results(results) diff --git a/python/benchmark/configs/infinity_enwiki.json b/python/benchmark/configs/infinity_enwiki.json index 235a84c0a4..f69ebb25cf 100644 --- a/python/benchmark/configs/infinity_enwiki.json +++ b/python/benchmark/configs/infinity_enwiki.json @@ -9,7 +9,7 @@ "query_link": "to_be_set", "mode": "fulltext", "topK": 10, - "use_import": true, + "use_import": false, "schema": { "doctitle": {"type": "varchar", "default":""}, "docdate": {"type": "varchar", "default":""}, diff --git a/python/benchmark/requirements.txt b/python/benchmark/requirements.txt new file mode 100644 index 0000000000..7c1abfcadc --- /dev/null +++ b/python/benchmark/requirements.txt @@ -0,0 +1,4 @@ +elasticsearch~=8.13.0 +h5py~=3.11.0 +qdrant_client~=1.9.0 + diff --git a/python/infinity/remote_thrift/client.py b/python/infinity/remote_thrift/client.py index c883eca40c..2696bf85ac 100644 --- a/python/infinity/remote_thrift/client.py +++ b/python/infinity/remote_thrift/client.py @@ -27,9 +27,13 @@ class ThriftInfinityClient: def __init__(self, uri: URI): self.uri = uri + self.transport = None self.reconnect() def reconnect(self): + if self.transport is not None: + self.transport.close() + self.transport = None # self.transport = TTransport.TFramedTransport(TSocket.TSocket(self.uri.ip, self.uri.port)) # async self.transport = TTransport.TBufferedTransport( TSocket.TSocket(self.uri.ip, self.uri.port)) # sync @@ -126,7 +130,8 @@ def list_indexes(self, db_name: str, table_name: str): def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]): retry = 0 - while retry <= 10: + inner_ex = None + while retry <= 2: try: res = self.client.Insert(InsertRequest(session_id=self.session_id, db_name=db_name, @@ -135,12 +140,14 @@ def insert(self, db_name: str, table_name: str, column_names: list[str], fields: fields=fields)) return res except TTransportException as ex: - if ex.type == ex.END_OF_FILE: - self.reconnect() - retry += 1 - else: - break - return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "retry insert failed") + #import traceback + #traceback.print_exc() + self.reconnect() + inner_ex = ex + retry += 1 + except Exception as ex: + inner_ex = ex + return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "insert failed with exception: " + str(inner_ex)) # Can be used in compact mode # def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]): @@ -198,7 +205,11 @@ def update(self, db_name: str, table_name: str, where_expr, update_expr_array): update_expr_array=update_expr_array)) def disconnect(self): - res = self.client.Disconnect(CommonRequest(session_id=self.session_id)) + res = None + try: + res = self.client.Disconnect(CommonRequest(session_id=self.session_id)) + except Exception: + pass self.transport.close() return res diff --git a/python/pyproject.toml b/python/pyproject.toml index d1706a5f71..8267f3638e 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -2,16 +2,16 @@ name = "infinity_sdk" version = "0.2.0.dev1" dependencies = [ - "sqlglot==11.7.1", - "pydantic", - "thrift", - "setuptools", - "pytest", - "pandas", - "numpy", - "pyarrow", - "openpyxl", - "polars" + "sqlglot~=11.7.1", + "pydantic~=2.7.1", + "thrift~=0.20.0", + "setuptools~=69.5.1", + "pytest~=8.2.0", + "pandas~=2.2.2", + "numpy~=1.26.4", + "pyarrow~=16.0.0", + "polars~=0.20.23", + "openpyxl~=3.1.2" ] description = "infinity" readme = "README.md" diff --git a/python/requirements.txt b/python/requirements.txt index 46c4cc25d3..fc151a38c6 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,10 +1,10 @@ -sqlglot==11.7.1 -pydantic~=1.10.12 +sqlglot~=11.7.1 +pydantic~=2.7.1 thrift~=0.20.0 -setuptools~=68.0.0 -pytest~=7.4.0 -pandas~=2.1.1 -openpyxl -numpy~=1.26.0 -polars~=0.19.0 -pyarrow +setuptools~=69.5.1 +pytest~=8.2.0 +pandas~=2.2.2 +numpy~=1.26.4 +pyarrow~=16.0.0 +polars~=0.20.23 +openpyxl~=3.1.2 From 0d6c71f1a15249c8b8f40a7420e2c99880972038 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Thu, 2 May 2024 11:09:23 +0800 Subject: [PATCH 2/2] Replace print with logging --- python/benchmark/clients/base_client.py | 8 +++----- python/benchmark/clients/elasticsearch_client.py | 14 +++++++------- python/benchmark/clients/infinity_client.py | 13 ++++++------- python/benchmark/clients/qdrant_client.py | 7 ++++--- python/benchmark/run.py | 9 +++++---- 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/python/benchmark/clients/base_client.py b/python/benchmark/clients/base_client.py index e568959c2c..83c747885a 100644 --- a/python/benchmark/clients/base_client.py +++ b/python/benchmark/clients/base_client.py @@ -1,12 +1,10 @@ import argparse from abc import abstractmethod -from typing import Any, List, Optional, Dict, Union -from enum import Enum +from typing import Any import subprocess -import sys import os -from urllib.parse import urlparse import time +import logging class BaseClient: @@ -71,7 +69,7 @@ def run_experiment(self, args): start_time = time.time() self.upload() finish_time = time.time() - print(f"upload finish, cost time = {finish_time - start_time}") + logging.info(f"upload finish, cost time = {finish_time - start_time}") if args.query: results = self.search() self.check_and_save_results(results) diff --git a/python/benchmark/clients/elasticsearch_client.py b/python/benchmark/clients/elasticsearch_client.py index 2d978233b2..0d7cb5a432 100644 --- a/python/benchmark/clients/elasticsearch_client.py +++ b/python/benchmark/clients/elasticsearch_client.py @@ -3,12 +3,12 @@ from elasticsearch import Elasticsearch, helpers import json import time -from typing import List, Optional +from typing import List import os import h5py import uuid import numpy as np -import csv +import logging from .base_client import BaseClient @@ -74,7 +74,7 @@ def upload(self): for i, line in enumerate(data_file): row = line.strip().split('\t') if len(row) != len(headers): - print(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") + logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") continue row_dict = {header: value for header, value in zip(headers, row)} current_batch.append({"_index": self.collection_name, "_id": uuid.UUID(int=i).hex, "_source": row_dict}) @@ -133,7 +133,7 @@ def search(self) -> list[list[Any]]: The function returns id list. """ query_path = os.path.join(self.path_prefix, self.data["query_path"]) - print(query_path) + logging.info(query_path) results = [] _, ext = os.path.splitext(query_path) if ext == '.json' or ext == '.jsonl': @@ -184,7 +184,7 @@ def search(self) -> list[list[Any]]: latency = (end - start) * 1000 result = [(uuid.UUID(hex=hit['_id']).int, hit['_score']) for hit in result['hits']['hits']] result.append(latency) - print(f"{line[:-1]}, {latency}") + logging.info(f"{line[:-1]}, {latency}") results.append(result) else: raise TypeError("Unsupported file type") @@ -214,7 +214,7 @@ def check_and_save_results(self, results: List[List[Any]]): precisions.append(precision) latencies.append(result[-1]) - print( + logging.info( f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, \n max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, @@ -223,7 +223,7 @@ def check_and_save_results(self, results: List[List[Any]]): latencies = [] for result in results: latencies.append(result[-1]) - print( + logging.info( f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''') diff --git a/python/benchmark/clients/infinity_client.py b/python/benchmark/clients/infinity_client.py index 1c202448b9..e7e8e056e8 100644 --- a/python/benchmark/clients/infinity_client.py +++ b/python/benchmark/clients/infinity_client.py @@ -5,13 +5,12 @@ import time import numpy as np from typing import Any, List +import logging import infinity import infinity.index as index from infinity import NetworkAddress from .base_client import BaseClient -import infinity.remote_thrift.infinity_thrift_rpc.ttypes as ttypes -import csv class InfinityClient(BaseClient): def __init__(self, @@ -93,9 +92,9 @@ def upload(self): for i, line in enumerate(data_file): row = line.strip().split('\t') if (i % 100000 == 0): - print(f"row {i}") + logging.info(f"row {i}") if len(row) != len(headers): - print(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") + logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip") continue row_dict = {header: value for header, value in zip(headers, row)} current_batch.append(row_dict) @@ -166,7 +165,7 @@ def search(self) -> list[list[Any]]: latency = (time.time() - start) * 1000 result = [(row_id[0], score) for row_id, score in zip(res['ROW_ID'], res['SCORE'])] result.append(latency) - print(f"{query}, {latency}") + logging.info(f"{query}, {latency}") results.append(result) else: raise TypeError("Unsupported file type") @@ -197,7 +196,7 @@ def check_and_save_results(self, results: List[List[Any]]): precisions.append(precision) latencies.append(result[-1]) - print( + logging.info( f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)}, std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)}, @@ -206,7 +205,7 @@ def check_and_save_results(self, results: List[List[Any]]): latencies = [] for result in results: latencies.append(result[-1]) - print( + logging.info( f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)}, max_time: {np.max(latencies)}, min_time: {np.min(latencies)}, p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''') diff --git a/python/benchmark/clients/qdrant_client.py b/python/benchmark/clients/qdrant_client.py index 43c5273271..db78665849 100644 --- a/python/benchmark/clients/qdrant_client.py +++ b/python/benchmark/clients/qdrant_client.py @@ -6,7 +6,8 @@ import time import json import h5py -from typing import Any, List, Optional +from typing import Any +import logging from .base_client import BaseClient @@ -137,7 +138,7 @@ def search(self) -> list[list[Any]]: with_payload=False ) end = time.time() - print(f"latency of search: {(end - start)*1000:.2f} milliseconds") + logging.info(f"latency of search: {(end - start)*1000:.2f} milliseconds") results.append(result) elif ext == '.hdf5' and self.data['mode'] == 'vector': with h5py.File(query_path, 'r') as f: @@ -150,7 +151,7 @@ def search(self) -> list[list[Any]]: ) results.append(result) end = time.time() - print(f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds") + logging.info(f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds") else: raise TypeError("Unsupported file type") return results diff --git a/python/benchmark/run.py b/python/benchmark/run.py index a96360fa8d..7f13f665d7 100644 --- a/python/benchmark/run.py +++ b/python/benchmark/run.py @@ -1,10 +1,10 @@ import argparse import os +import logging from clients.elasticsearch_client import ElasticsearchClient from clients.infinity_client import InfinityClient from clients.qdrant_client import QdrantClient -from generate_query_json import generate_query_json from generate_query_json import generate_query_txt ENGINES = ['infinity', 'qdrant', 'elasticsearch'] @@ -53,6 +53,7 @@ def get_client(engine: str, config: str, options: argparse.Namespace): raise ValueError(f"Unknown engine: {engine}") if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(levelname)-8s %(message)s') args = parse_args() config_paths = generate_config_paths(args) @@ -65,9 +66,9 @@ def get_client(engine: str, config: str, options: argparse.Namespace): for path, engine in config_paths: if not os.path.exists(path): - print(f"qdrant does not support full text search") + logging.info("qdrant does not support full text search") continue - print("Running", engine, "with", path) + logging.info("Running {} with {}".format(engine, path)) client = get_client(engine, path, args) client.run_experiment(args) - print("Finished", engine, "with", path) + logging.info("Finished {} with {}".format(engine, path))