Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let thrift client reconnect on insert failure #1156

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions python/benchmark/clients/base_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import argparse
from abc import abstractmethod
from typing import Any, List, Optional, Dict, Union
from enum import Enum
from typing import Any
import subprocess
import sys
import os
from urllib.parse import urlparse
import time
import logging


class BaseClient:
"""
Expand All @@ -25,14 +24,21 @@ def __init__(self,
"""
pass

@abstractmethod
def upload(self):
"""
Upload data and build indexes (parameters are parsed by __init__).
"""
pass

@abstractmethod
def search(self) -> list[list[Any]]:
"""
Execute the corresponding query tasks (vector search, full-text search, hybrid search) based on the parsed parameters.
The function returns id list.
"""
pass

def download_data(self, url, target_path):
"""
Download dataset and extract it into path.
Expand All @@ -59,6 +65,11 @@ def run_experiment(self, args):
"""
run experiment and save results.
"""
if args.import_data:
start_time = time.time()
self.upload()
finish_time = time.time()
logging.info(f"upload finish, cost time = {finish_time - start_time}")
if args.query:
results = self.search()
self.check_and_save_results(results)
self.check_and_save_results(results)
14 changes: 7 additions & 7 deletions python/benchmark/clients/elasticsearch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from elasticsearch import Elasticsearch, helpers
import json
import time
from typing import List, Optional
from typing import List
import os
import h5py
import uuid
import numpy as np
import csv
import logging

from .base_client import BaseClient

Expand Down Expand Up @@ -74,7 +74,7 @@ def upload(self):
for i, line in enumerate(data_file):
row = line.strip().split('\t')
if len(row) != len(headers):
print(f"row = {i}, row_len = {len(row)}, not equal headers len, skip")
logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip")
continue
row_dict = {header: value for header, value in zip(headers, row)}
current_batch.append({"_index": self.collection_name, "_id": uuid.UUID(int=i).hex, "_source": row_dict})
Expand Down Expand Up @@ -133,7 +133,7 @@ def search(self) -> list[list[Any]]:
The function returns id list.
"""
query_path = os.path.join(self.path_prefix, self.data["query_path"])
print(query_path)
logging.info(query_path)
results = []
_, ext = os.path.splitext(query_path)
if ext == '.json' or ext == '.jsonl':
Expand Down Expand Up @@ -184,7 +184,7 @@ def search(self) -> list[list[Any]]:
latency = (end - start) * 1000
result = [(uuid.UUID(hex=hit['_id']).int, hit['_score']) for hit in result['hits']['hits']]
result.append(latency)
print(f"{line[:-1]}, {latency}")
logging.info(f"{line[:-1]}, {latency}")
results.append(result)
else:
raise TypeError("Unsupported file type")
Expand Down Expand Up @@ -214,7 +214,7 @@ def check_and_save_results(self, results: List[List[Any]]):
precisions.append(precision)
latencies.append(result[-1])

print(
logging.info(
f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)},
std_time: {np.std(latencies)}, min_time: {np.min(latencies)}, \n
max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)},
Expand All @@ -223,7 +223,7 @@ def check_and_save_results(self, results: List[List[Any]]):
latencies = []
for result in results:
latencies.append(result[-1])
print(
logging.info(
f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)},
max_time: {np.max(latencies)}, min_time: {np.min(latencies)},
p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''')
13 changes: 6 additions & 7 deletions python/benchmark/clients/infinity_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import time
import numpy as np
from typing import Any, List
import logging

import infinity
import infinity.index as index
from infinity import NetworkAddress
from .base_client import BaseClient
import infinity.remote_thrift.infinity_thrift_rpc.ttypes as ttypes
import csv

class InfinityClient(BaseClient):
def __init__(self,
Expand Down Expand Up @@ -93,9 +92,9 @@ def upload(self):
for i, line in enumerate(data_file):
row = line.strip().split('\t')
if (i % 100000 == 0):
print(f"row {i}")
logging.info(f"row {i}")
if len(row) != len(headers):
print(f"row = {i}, row_len = {len(row)}, not equal headers len, skip")
logging.info(f"row = {i}, row_len = {len(row)}, not equal headers len, skip")
continue
row_dict = {header: value for header, value in zip(headers, row)}
current_batch.append(row_dict)
Expand Down Expand Up @@ -166,7 +165,7 @@ def search(self) -> list[list[Any]]:
latency = (time.time() - start) * 1000
result = [(row_id[0], score) for row_id, score in zip(res['ROW_ID'], res['SCORE'])]
result.append(latency)
print(f"{query}, {latency}")
logging.info(f"{query}, {latency}")
results.append(result)
else:
raise TypeError("Unsupported file type")
Expand Down Expand Up @@ -197,7 +196,7 @@ def check_and_save_results(self, results: List[List[Any]]):
precisions.append(precision)
latencies.append(result[-1])

print(
logging.info(
f'''mean_time: {np.mean(latencies)}, mean_precisions: {np.mean(precisions)},
std_time: {np.std(latencies)}, min_time: {np.min(latencies)},
max_time: {np.max(latencies)}, p95_time: {np.percentile(latencies, 95)},
Expand All @@ -206,7 +205,7 @@ def check_and_save_results(self, results: List[List[Any]]):
latencies = []
for result in results:
latencies.append(result[-1])
print(
logging.info(
f'''mean_time: {np.mean(latencies)}, std_time: {np.std(latencies)},
max_time: {np.max(latencies)}, min_time: {np.min(latencies)},
p95_time: {np.percentile(latencies, 95)}, p99_time: {np.percentile(latencies, 99)}''')
7 changes: 4 additions & 3 deletions python/benchmark/clients/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import time
import json
import h5py
from typing import Any, List, Optional
from typing import Any
import logging

from .base_client import BaseClient

Expand Down Expand Up @@ -137,7 +138,7 @@ def search(self) -> list[list[Any]]:
with_payload=False
)
end = time.time()
print(f"latency of search: {(end - start)*1000:.2f} milliseconds")
logging.info(f"latency of search: {(end - start)*1000:.2f} milliseconds")
results.append(result)
elif ext == '.hdf5' and self.data['mode'] == 'vector':
with h5py.File(query_path, 'r') as f:
Expand All @@ -150,7 +151,7 @@ def search(self) -> list[list[Any]]:
)
results.append(result)
end = time.time()
print(f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds")
logging.info(f"latency of KNN search: {(end - start)*1000/len(f['test']):.2f} milliseconds")
else:
raise TypeError("Unsupported file type")
return results
2 changes: 1 addition & 1 deletion python/benchmark/configs/infinity_enwiki.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"query_link": "to_be_set",
"mode": "fulltext",
"topK": 10,
"use_import": true,
"use_import": false,
"schema": {
"doctitle": {"type": "varchar", "default":""},
"docdate": {"type": "varchar", "default":""},
Expand Down
4 changes: 4 additions & 0 deletions python/benchmark/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
elasticsearch~=8.13.0
h5py~=3.11.0
qdrant_client~=1.9.0

9 changes: 5 additions & 4 deletions python/benchmark/run.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import os
import logging

from clients.elasticsearch_client import ElasticsearchClient
from clients.infinity_client import InfinityClient
from clients.qdrant_client import QdrantClient
from generate_query_json import generate_query_json
from generate_query_json import generate_query_txt

ENGINES = ['infinity', 'qdrant', 'elasticsearch']
Expand Down Expand Up @@ -53,6 +53,7 @@ def get_client(engine: str, config: str, options: argparse.Namespace):
raise ValueError(f"Unknown engine: {engine}")

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)-15s %(levelname)-8s %(message)s')
args = parse_args()
config_paths = generate_config_paths(args)

Expand All @@ -65,9 +66,9 @@ def get_client(engine: str, config: str, options: argparse.Namespace):

for path, engine in config_paths:
if not os.path.exists(path):
print(f"qdrant does not support full text search")
logging.info("qdrant does not support full text search")
continue
print("Running", engine, "with", path)
logging.info("Running {} with {}".format(engine, path))
client = get_client(engine, path, args)
client.run_experiment(args)
print("Finished", engine, "with", path)
logging.info("Finished {} with {}".format(engine, path))
27 changes: 19 additions & 8 deletions python/infinity/remote_thrift/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
class ThriftInfinityClient:
def __init__(self, uri: URI):
self.uri = uri
self.transport = None
self.reconnect()

def reconnect(self):
if self.transport is not None:
self.transport.close()
self.transport = None
# self.transport = TTransport.TFramedTransport(TSocket.TSocket(self.uri.ip, self.uri.port)) # async
self.transport = TTransport.TBufferedTransport(
TSocket.TSocket(self.uri.ip, self.uri.port)) # sync
Expand Down Expand Up @@ -126,7 +130,8 @@ def list_indexes(self, db_name: str, table_name: str):

def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
retry = 0
while retry <= 10:
inner_ex = None
while retry <= 2:
try:
res = self.client.Insert(InsertRequest(session_id=self.session_id,
db_name=db_name,
Expand All @@ -135,12 +140,14 @@ def insert(self, db_name: str, table_name: str, column_names: list[str], fields:
fields=fields))
return res
except TTransportException as ex:
if ex.type == ex.END_OF_FILE:
self.reconnect()
retry += 1
else:
break
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "retry insert failed")
#import traceback
#traceback.print_exc()
self.reconnect()
inner_ex = ex
retry += 1
except Exception as ex:
inner_ex = ex
return CommonResponse(ErrorCode.TOO_MANY_CONNECTIONS, "insert failed with exception: " + str(inner_ex))

# Can be used in compact mode
# def insert(self, db_name: str, table_name: str, column_names: list[str], fields: list[Field]):
Expand Down Expand Up @@ -198,7 +205,11 @@ def update(self, db_name: str, table_name: str, where_expr, update_expr_array):
update_expr_array=update_expr_array))

def disconnect(self):
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
res = None
try:
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
except Exception:
pass
self.transport.close()
return res

Expand Down
20 changes: 10 additions & 10 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
name = "infinity_sdk"
version = "0.2.0.dev1"
dependencies = [
"sqlglot==11.7.1",
"pydantic",
"thrift",
"setuptools",
"pytest",
"pandas",
"numpy",
"pyarrow",
"openpyxl",
"polars"
"sqlglot~=11.7.1",
"pydantic~=2.7.1",
"thrift~=0.20.0",
"setuptools~=69.5.1",
"pytest~=8.2.0",
"pandas~=2.2.2",
"numpy~=1.26.4",
"pyarrow~=16.0.0",
"polars~=0.20.23",
"openpyxl~=3.1.2"
]
description = "infinity"
readme = "README.md"
Expand Down
18 changes: 9 additions & 9 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
sqlglot==11.7.1
pydantic~=1.10.12
sqlglot~=11.7.1
pydantic~=2.7.1
thrift~=0.20.0
setuptools~=68.0.0
pytest~=7.4.0
pandas~=2.1.1
openpyxl
numpy~=1.26.0
polars~=0.19.0
pyarrow
setuptools~=69.5.1
pytest~=8.2.0
pandas~=2.2.2
numpy~=1.26.4
pyarrow~=16.0.0
polars~=0.20.23
openpyxl~=3.1.2
Loading