From 67412341b5c7aff03111e3c8e1b587c92e8b026e Mon Sep 17 00:00:00 2001 From: yhmo Date: Fri, 7 Jun 2024 12:31:01 +0800 Subject: [PATCH] Cache collection schema for insert/upsert Signed-off-by: yhmo --- .../client/AbstractMilvusGrpcClient.java | 116 +++++++++++------- .../java/io/milvus/client/MilvusClient.java | 6 +- .../service/collection/CollectionService.java | 5 +- .../v2/service/vector/VectorService.java | 79 ++++++++++-- .../milvus/client/MilvusClientDockerTest.java | 69 +++++++++++ .../v2/client/MilvusClientV2DockerTest.java | 52 +++++++- 6 files changed, 267 insertions(+), 60 deletions(-) diff --git a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index cdb07a592..66614c7bd 100644 --- a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -54,6 +54,7 @@ import javax.annotation.Nonnull; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -62,12 +63,64 @@ public abstract class AbstractMilvusGrpcClient implements MilvusClient { protected static final Logger logger = LoggerFactory.getLogger(AbstractMilvusGrpcClient.class); protected LogLevel logLevel = LogLevel.Info; + private ConcurrentHashMap cacheCollectionInfo = new ConcurrentHashMap<>(); + protected abstract MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub(); protected abstract MilvusServiceGrpc.MilvusServiceFutureStub futureStub(); protected abstract boolean clientIsReady(); + /** + * This method is for insert/upsert requests to reduce the rpc call of describeCollection() + * Always try to get the collection info from cache. + * If the cache doesn't have the collection info, call describeCollection() and cache it. + * If insert/upsert get server error, remove the cached collection info. + */ + private DescribeCollectionResponse getCollectionInfo(String databaseName, String collectionName) { + String key = combineCacheKey(databaseName, collectionName); + DescribeCollectionResponse info = cacheCollectionInfo.get(key); + if (info == null) { + String msg = String.format("Fail to describe collection '%s'", collectionName); + DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder() + .setCollectionName(collectionName); + if (StringUtils.isNotEmpty(databaseName)) { + builder.setDbName(databaseName); + msg = String.format("Fail to describe collection '%s' in database '%s'", + collectionName, databaseName); + } + DescribeCollectionRequest describeCollectionRequest = builder.build(); + DescribeCollectionResponse response = blockingStub().describeCollection(describeCollectionRequest); + handleResponse(msg, response.getStatus()); + info = response; + cacheCollectionInfo.put(key, info); + } + + return info; + } + + private String combineCacheKey(String databaseName, String collectionName) { + if (collectionName == null || StringUtils.isBlank(collectionName)) { + throw new ParamException("Collection name is empty, not able to get collection info."); + } + String key = collectionName; + if (StringUtils.isNotEmpty(databaseName)) { + key = String.format("%s|%s", databaseName, collectionName); + } + return key; + } + + /** + * insert/upsert return an error, but is not a RateLimit error, + * clean the cache so that the next insert will call describeCollection() to get the latest info. + */ + private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) { + if ((status.getCode() != 0 && status.getCode() != 8) || + (!status.getErrorCode().equals(ErrorCode.Success) && status.getErrorCode() != ErrorCode.RateLimit)) { + cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + } + } + private void waitForLoadingCollection(String databaseName, String collectionName, List partitionNames, long waitingInterval, long timeout) throws IllegalResponseException { long tsBegin = System.currentTimeMillis(); @@ -581,6 +634,7 @@ public R dropCollection(@NonNull DropCollectionParam requestParam) { Status response = blockingStub().dropCollection(dropCollectionRequest); handleResponse(title, response); + cacheCollectionInfo.remove(combineCacheKey(requestParam.getDatabaseName(), requestParam.getCollectionName())); return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG)); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1509,17 +1563,12 @@ public R insert(@NonNull InsertParam requestParam) { String title = String.format("InsertRequest collectionName:%s", requestParam.getCollectionName()); try { - DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder() - .withDatabaseName(requestParam.getDatabaseName()) - .withCollectionName(requestParam.getCollectionName()); - R descResp = describeCollection(builder.build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - return R.failed(descResp.getException()); - } - - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData()); + DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), + requestParam.getCollectionName()); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest()); + cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); handleResponse(title, response.getStatus()); return R.success(response); } catch (StatusRuntimeException e) { @@ -1542,15 +1591,9 @@ public ListenableFuture> insertAsync(InsertParam requestParam) logDebug(requestParam.toString()); String title = String.format("InsertAsyncRequest collectionName:%s", requestParam.getCollectionName()); - DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder() - .withDatabaseName(requestParam.getDatabaseName()) - .withCollectionName(requestParam.getCollectionName()); - R descResp = describeCollection(builder.build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - return Futures.immediateFuture(R.failed(descResp.getException())); - } - - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData()); + DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), + requestParam.getCollectionName()); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); ListenableFuture response = futureStub().insert(builderWraper.buildInsertRequest()); @@ -1559,6 +1602,7 @@ public ListenableFuture> insertAsync(InsertParam requestParam) new FutureCallback() { @Override public void onSuccess(MutationResult result) { + cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); } else { @@ -1596,17 +1640,12 @@ public R upsert(UpsertParam requestParam) { String title = String.format("UpsertRequest collectionName:%s", requestParam.getCollectionName()); try { - DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder() - .withDatabaseName(requestParam.getDatabaseName()) - .withCollectionName(requestParam.getCollectionName()); - R descResp = describeCollection(builder.build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - return R.failed(descResp.getException()); - } - - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData()); + DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), + requestParam.getCollectionName()); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest()); + cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); handleResponse(title, response.getStatus()); return R.success(response); } catch (StatusRuntimeException e) { @@ -1628,15 +1667,9 @@ public ListenableFuture> upsertAsync(UpsertParam requestParam) logDebug(requestParam.toString()); String title = String.format("UpsertAsyncRequest collectionName:%s", requestParam.getCollectionName()); - DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder() - .withDatabaseName(requestParam.getDatabaseName()) - .withCollectionName(requestParam.getCollectionName()); - R descResp = describeCollection(builder.build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - return Futures.immediateFuture(R.failed(descResp.getException())); - } - - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData()); + DescribeCollectionResponse descResp = getCollectionInfo(requestParam.getDatabaseName(), + requestParam.getCollectionName()); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); ParamUtils.InsertBuilderWrapper builderWraper = new ParamUtils.InsertBuilderWrapper(requestParam, wrapper); ListenableFuture response = futureStub().upsert(builderWraper.buildUpsertRequest()); @@ -1645,6 +1678,7 @@ public ListenableFuture> upsertAsync(UpsertParam requestParam) new FutureCallback() { @Override public void onSuccess(MutationResult result) { + cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); } else { @@ -3088,14 +3122,8 @@ public R delete(DeleteIdsParam requestParam) { String title = String.format("DeleteIdsRequest collectionName:%s", requestParam.getCollectionName()); try { - DescribeCollectionParam.Builder builder = DescribeCollectionParam.newBuilder() - .withCollectionName(requestParam.getCollectionName()); - R descResp = describeCollection(builder.build()); - if (descResp.getStatus() != R.Status.Success.getCode()) { - logError("Failed to describe collection: {}", requestParam.getCollectionName()); - return R.failed(descResp.getException()); - } - DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp.getData()); + DescribeCollectionResponse descResp = getCollectionInfo("", requestParam.getCollectionName()); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(descResp); String expr = VectorUtils.convertPksExpr(requestParam.getPrimaryIds(), wrapper); DeleteParam deleteParam = DeleteParam.newBuilder() diff --git a/src/main/java/io/milvus/client/MilvusClient.java b/src/main/java/io/milvus/client/MilvusClient.java index ff9209efa..9b0e98ae9 100644 --- a/src/main/java/io/milvus/client/MilvusClient.java +++ b/src/main/java/io/milvus/client/MilvusClient.java @@ -138,14 +138,16 @@ default void close() { R listDatabases(); /** - * Alter database with key value pair + * Alter database with key value pair. (Available from Milvus v2.4.4) + * * @param requestParam {@link AlterDatabaseParam} * @return {status:result code, data:RpcStatus{msg: result message}} */ R alterDatabase(AlterDatabaseParam requestParam); /** - * show detail of database base, such as replica number and resource groups + * Show detail of database base, such as replica number and resource groups. (Available from Milvus v2.4.4) + * * @param requestParam {@link DescribeDatabaseParam} * @return {status:result code, data:DescribeDatabaseResponse{replica_number,resource_groups}} */ diff --git a/src/main/java/io/milvus/v2/service/collection/CollectionService.java b/src/main/java/io/milvus/v2/service/collection/CollectionService.java index 7ff0eca53..c769305c5 100644 --- a/src/main/java/io/milvus/v2/service/collection/CollectionService.java +++ b/src/main/java/io/milvus/v2/service/collection/CollectionService.java @@ -183,6 +183,10 @@ public DescribeCollectionResp describeCollection(MilvusServiceGrpc.MilvusService .build(); DescribeCollectionResponse response = milvusServiceBlockingStub.describeCollection(describeCollectionRequest); rpcUtils.handleResponse(title, response.getStatus()); + return convertDescCollectionResp(response); + } + + public static DescribeCollectionResp convertDescCollectionResp(DescribeCollectionResponse response) { DescribeCollectionResp describeCollectionResp = DescribeCollectionResp.builder() .collectionName(response.getCollectionName()) .description(response.getSchema().getDescription()) @@ -195,7 +199,6 @@ public DescribeCollectionResp describeCollection(MilvusServiceGrpc.MilvusService .primaryFieldName(response.getSchema().getFieldsList().stream().filter(FieldSchema::getIsPrimaryKey).map(FieldSchema::getName).collect(java.util.stream.Collectors.toList()).get(0)) .createTime(response.getCreatedTimestamp()) .build(); - return describeCollectionResp; } diff --git a/src/main/java/io/milvus/v2/service/vector/VectorService.java b/src/main/java/io/milvus/v2/service/vector/VectorService.java index 2179692b0..9aa4f135a 100644 --- a/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -19,6 +19,7 @@ package io.milvus.v2.service.vector; +import io.milvus.exception.ParamException; import io.milvus.grpc.*; import io.milvus.response.DescCollResponseWrapper; import io.milvus.v2.exception.ErrorCode; @@ -30,38 +31,91 @@ import io.milvus.v2.service.index.IndexService; import io.milvus.v2.service.vector.request.*; import io.milvus.v2.service.vector.response.*; +import io.milvus.v2.utils.RpcUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; public class VectorService extends BaseService { Logger logger = LoggerFactory.getLogger(VectorService.class); public CollectionService collectionService = new CollectionService(); public IndexService indexService = new IndexService(); + private ConcurrentHashMap cacheCollectionInfo = new ConcurrentHashMap<>(); + + /** + * This method is for insert/upsert requests to reduce the rpc call of describeCollection() + * Always try to get the collection info from cache. + * If the cache doesn't have the collection info, call describeCollection() and cache it. + * If insert/upsert get server error, remove the cached collection info. + */ + private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, + String databaseName, String collectionName) { + String key = combineCacheKey(databaseName, collectionName); + DescribeCollectionResponse info = cacheCollectionInfo.get(key); + if (info == null) { + String msg = String.format("Fail to describe collection '%s'", collectionName); + DescribeCollectionRequest.Builder builder = DescribeCollectionRequest.newBuilder() + .setCollectionName(collectionName); + if (StringUtils.isNotEmpty(databaseName)) { + builder.setDbName(databaseName); + msg = String.format("Fail to describe collection '%s' in database '%s'", + collectionName, databaseName); + } + DescribeCollectionRequest describeCollectionRequest = builder.build(); + DescribeCollectionResponse response = blockingStub.describeCollection(describeCollectionRequest); + new RpcUtils().handleResponse(msg, response.getStatus()); + info = response; + cacheCollectionInfo.put(key, info); + } + + return info; + } + + private String combineCacheKey(String databaseName, String collectionName) { + if (collectionName == null || StringUtils.isBlank(collectionName)) { + throw new ParamException("Collection name is empty, not able to get collection info."); + } + String key = collectionName; + if (StringUtils.isNotEmpty(databaseName)) { + key = String.format("%s|%s", databaseName, collectionName); + } + return key; + } + + /** + * insert/upsert return an error, but is not a RateLimit error, + * clean the cache so that the next insert will call describeCollection() to get the latest info. + */ + private void cleanCacheIfFailed(Status status, String databaseName, String collectionName) { + if ((status.getCode() != 0 && status.getCode() != 8) || + (!status.getErrorCode().equals(io.milvus.grpc.ErrorCode.Success) && + status.getErrorCode() != io.milvus.grpc.ErrorCode.RateLimit)) { + cacheCollectionInfo.remove(combineCacheKey(databaseName, collectionName)); + } + } public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) { String title = String.format("InsertRequest collectionName:%s", request.getCollectionName()); - DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder() - .setCollectionName(request.getCollectionName()).build(); - DescribeCollectionResponse descResp = blockingStub.describeCollection(describeCollectionRequest); - + // TODO: set the database name + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); MutationResult response = blockingStub.insert(dataUtils.convertGrpcInsertRequest(request, new DescCollResponseWrapper(descResp))); + cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); return InsertResp.builder() .InsertCnt(response.getInsertCnt()) .build(); } - public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub milvusServiceBlockingStub, UpsertReq request) { + public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) { String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName()); - DescribeCollectionRequest describeCollectionRequest = DescribeCollectionRequest.newBuilder() - .setCollectionName(request.getCollectionName()).build(); - DescribeCollectionResponse descResp = milvusServiceBlockingStub.describeCollection(describeCollectionRequest); - - MutationResult response = milvusServiceBlockingStub.upsert(dataUtils.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp))); + // TODO: set the database name + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); + MutationResult response = blockingStub.upsert(dataUtils.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp))); + cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); return UpsertResp.builder() .upsertCnt(response.getInsertCnt()) @@ -129,7 +183,8 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub milvusServi throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time"); } - DescribeCollectionResp respR = collectionService.describeCollection(milvusServiceBlockingStub, DescribeCollectionReq.builder().collectionName(request.getCollectionName()).build()); + DescribeCollectionResponse descResp = getCollectionInfo(milvusServiceBlockingStub, "", request.getCollectionName()); + DescribeCollectionResp respR = CollectionService.convertDescCollectionResp(descResp); if (request.getFilter() == null) { request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds())); } diff --git a/src/test/java/io/milvus/client/MilvusClientDockerTest.java b/src/test/java/io/milvus/client/MilvusClientDockerTest.java index c61b81333..86b8a934e 100644 --- a/src/test/java/io/milvus/client/MilvusClientDockerTest.java +++ b/src/test/java/io/milvus/client/MilvusClientDockerTest.java @@ -2852,4 +2852,73 @@ void testDatabase() { R dropResponse = client.dropDatabase(dropDatabaseParam); Assertions.assertEquals(R.Status.Success.getCode(), dropResponse.getStatus().intValue()); } + + @Test + void testCacheCollectionSchema() { + String randomCollectionName = generator.generate(10); + + // collection schema + List fieldsSchema = new ArrayList<>(); + fieldsSchema.add(FieldType.newBuilder() + .withPrimaryKey(true) + .withAutoID(true) + .withDataType(DataType.Int64) + .withName("id") + .build()); + + fieldsSchema.add(FieldType.newBuilder() + .withDataType(DataType.FloatVector) + .withName("vector") + .withDimension(dimension) + .build()); + + // create collection + R createR = client.createCollection(CreateCollectionParam.newBuilder() + .withCollectionName(randomCollectionName) + .withFieldTypes(fieldsSchema) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + + // insert + JsonObject row = new JsonObject(); + row.add("vector", GSON_INSTANCE.toJsonTree(generateFloatVectors(1).get(0))); + R insertR = client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + + // drop collection + client.dropCollection(DropCollectionParam.newBuilder() + .withCollectionName(randomCollectionName) + .build()); + + // create a new collection with the same name, different schema + fieldsSchema.add(FieldType.newBuilder() + .withDataType(DataType.VarChar) + .withName("title") + .withMaxLength(100) + .build()); + + createR = client.createCollection(CreateCollectionParam.newBuilder() + .withCollectionName(randomCollectionName) + .withFieldTypes(fieldsSchema) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), createR.getStatus().intValue()); + + // insert wrong data + insertR = client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertNotEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + + // insert correct data + row.addProperty("title", "hello world"); + insertR = client.insert(InsertParam.newBuilder() + .withCollectionName(randomCollectionName) + .withRows(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(R.Status.Success.getCode(), insertR.getStatus().intValue()); + } } diff --git a/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java b/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java index db581614c..3f29a77a2 100644 --- a/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java +++ b/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java @@ -64,7 +64,7 @@ class MilvusClientV2DockerTest { private static final Random RANDOM = new Random(); @Container - private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:v2.4.1"); + private static final MilvusContainer milvus = new MilvusContainer("milvusdb/milvus:2.4-20240605-443197bd-amd64"); @BeforeAll public static void setUp() { @@ -1059,4 +1059,54 @@ void testIndex() { Assertions.assertTrue(extraParams.containsKey("nlist")); Assertions.assertEquals("64", extraParams.get("nlist")); } + + @Test + void testCacheCollectionSchema() { + String randomCollectionName = generator.generate(10); + + client.createCollection(CreateCollectionReq.builder() + .collectionName(randomCollectionName) + .autoID(true) + .dimension(dimension) + .build()); + + // insert + JsonObject row = new JsonObject(); + row.add("vector", GSON_INSTANCE.toJsonTree(generateFloatVectors(1).get(0))); + InsertResp insertResp = client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(1L, insertResp.getInsertCnt()); + + // drop collection + client.dropCollection(DropCollectionReq.builder() + .collectionName(randomCollectionName) + .build()); + + // create a new collection with the same name, different schema + client.createCollection(CreateCollectionReq.builder() + .collectionName(randomCollectionName) + .autoID(true) + .dimension(100) + .build()); + + // insert wrong data + Assertions.assertThrows(MilvusClientException.class, ()->client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build())); + + // insert correct data + List vector = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + vector.add(RANDOM.nextFloat()); + } + row.add("vector", GSON_INSTANCE.toJsonTree(vector)); + insertResp = client.insert(InsertReq.builder() + .collectionName(randomCollectionName) + .data(Collections.singletonList(row)) + .build()); + Assertions.assertEquals(1L, insertResp.getInsertCnt()); + } }