From a0e412d9b1226131110278b8cad4ad0cd3621a07 Mon Sep 17 00:00:00 2001 From: Bragolgirith <6455473+Bragolgirith@users.noreply.github.com> Date: Sun, 10 Mar 2024 11:09:25 +0100 Subject: [PATCH] Fix RedisVectorStore not closing Jedis pipelines --- .../ai/vectorstore/RedisVectorStore.java | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/vector-stores/spring-ai-redis/src/main/java/org/springframework/ai/vectorstore/RedisVectorStore.java b/vector-stores/spring-ai-redis/src/main/java/org/springframework/ai/vectorstore/RedisVectorStore.java index b0c18f9a1c1..ce267ebeb69 100644 --- a/vector-stores/spring-ai-redis/src/main/java/org/springframework/ai/vectorstore/RedisVectorStore.java +++ b/vector-stores/spring-ai-redis/src/main/java/org/springframework/ai/vectorstore/RedisVectorStore.java @@ -303,25 +303,26 @@ public JedisPooled getJedis() { @Override public void add(List documents) { - Pipeline pipeline = this.jedis.pipelined(); - for (Document document : documents) { - var embedding = this.embeddingClient.embed(document); - document.setEmbedding(embedding); - - var fields = new HashMap(); - fields.put(this.config.embeddingFieldName, embedding); - fields.put(this.config.contentFieldName, document.getContent()); - fields.putAll(document.getMetadata()); - pipeline.jsonSetWithEscape(key(document.getId()), JSON_SET_PATH, fields); - } - List responses = pipeline.syncAndReturnAll(); - Optional errResponse = responses.stream().filter(Predicate.not(RESPONSE_OK)).findAny(); - if (errResponse.isPresent()) { - String message = MessageFormat.format("Could not add document: {0}", errResponse.get()); - if (logger.isErrorEnabled()) { - logger.error(message); + try (Pipeline pipeline = this.jedis.pipelined()) { + for (Document document : documents) { + var embedding = this.embeddingClient.embed(document); + document.setEmbedding(embedding); + + var fields = new HashMap(); + fields.put(this.config.embeddingFieldName, embedding); + fields.put(this.config.contentFieldName, document.getContent()); + fields.putAll(document.getMetadata()); + pipeline.jsonSetWithEscape(key(document.getId()), JSON_SET_PATH, fields); + } + List responses = pipeline.syncAndReturnAll(); + Optional errResponse = responses.stream().filter(Predicate.not(RESPONSE_OK)).findAny(); + if (errResponse.isPresent()) { + String message = MessageFormat.format("Could not add document: {0}", errResponse.get()); + if (logger.isErrorEnabled()) { + logger.error(message); + } + throw new RuntimeException(message); } - throw new RuntimeException(message); } } @@ -331,19 +332,20 @@ private String key(String id) { @Override public Optional delete(List idList) { - Pipeline pipeline = this.jedis.pipelined(); - for (String id : idList) { - pipeline.jsonDel(key(id)); - } - List responses = pipeline.syncAndReturnAll(); - Optional errResponse = responses.stream().filter(Predicate.not(RESPONSE_DEL_OK)).findAny(); - if (errResponse.isPresent()) { - if (logger.isErrorEnabled()) { - logger.error("Could not delete document: {}", errResponse.get()); + try (Pipeline pipeline = this.jedis.pipelined()) { + for (String id : idList) { + pipeline.jsonDel(key(id)); + } + List responses = pipeline.syncAndReturnAll(); + Optional errResponse = responses.stream().filter(Predicate.not(RESPONSE_DEL_OK)).findAny(); + if (errResponse.isPresent()) { + if (logger.isErrorEnabled()) { + logger.error("Could not delete document: {}", errResponse.get()); + } + return Optional.of(false); } - return Optional.of(false); + return Optional.of(true); } - return Optional.of(true); } @Override