From 11ab41a7fb2ca626ac6ac547c1d26b2d3fbcfd07 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jul 2024 14:17:20 -0400 Subject: [PATCH 1/5] working with no filter provided --- .../vector/couchbase/CouchbaseDataSource.java | 92 ++++++++++++++----- .../datasource/impl/CouchbaseWriterTest.java | 6 +- .../src/main/python/pyproject.toml | 1 + 3 files changed, 74 insertions(+), 25 deletions(-) diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java index 8a00d994b..df7f61429 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java @@ -111,15 +111,30 @@ public List> fetchData(String query, List params) { Map filter = (Map) queryMap.get("filter"); String filterField = filter.keySet().iterator().next(); String filterValue = (String) filter.get(filterField); - - // Perform the vector search on the filtered documents - SearchRequest vectorSearchRequest = - SearchRequest.create(SearchQuery.match(filterValue).field(filterField)) - .vectorSearch( - VectorSearch.create( - VectorQuery.create("vector", vector) - .numCandidates(topK))); - + SearchRequest vectorSearchRequest; + // if the values in the filter are empty then remove them from the map + for (Map.Entry entry : filter.entrySet()) { + if (entry.getValue() == null || entry.getValue().toString().isEmpty()) { + filter.remove(entry.getKey()); + } + } + // print the filter map + log.info("Filter: {}", filter); + // if filter is empty, then search all documents in the collection + if (!queryMap.containsKey("filter")) { + vectorSearchRequest = + SearchRequest.create( + VectorSearch.create( + VectorQuery.create("vector", vector) + .numCandidates(topK))); + } else { + vectorSearchRequest = + SearchRequest.create(SearchQuery.match(filterValue).field(filterField)) + .vectorSearch( + VectorSearch.create( + VectorQuery.create("vector", vector) + .numCandidates(topK))); + } SearchResult vectorSearchResult = cluster.search( bucketName + "." + scopeName + "." + vectorIndexName, @@ -161,10 +176,53 @@ public List> fetchData(String query, List params) { // remove the embeddings array from the // output content.removeKey("vector"); - // ensure filter field is = to the query - // filter value - if (content.getString(filterField) - .equals(filterValue)) { + // Ensure all filter fields match their + // corresponding filter values + if (filter != null && !filter.isEmpty()) { + // Ensure all filter fields match their + // corresponding filter values + boolean filtersMatch = true; + + for (Map.Entry entry : + filter.entrySet()) { + String field = entry.getKey(); + String value = + (String) entry.getValue(); + // Ensure the filter field exists in + // the document and isn't "" + + if (content.containsKey(field) + && !content.getString(field) + .isEmpty() + && !content.getString(field) + .equals(value)) { + filtersMatch = false; + log.info( + "Document {} has {} {} instead of {}", + documentId, + field, + content.getString( + field), + value); + break; + } + } + + if (filtersMatch) { + result.put("id", hit.id()); + // Calculate and add cosine + // similarity + double cosineSimilarity = + computeCosineSimilarity( + vector, embeddings); + result.put( + "similarity", + cosineSimilarity); + result.putAll(content.toMap()); + } + } else { + // If there are no filters, process the + // result directly result.put("id", hit.id()); // Calculate and add cosine similarity double cosineSimilarity = @@ -174,14 +232,6 @@ public List> fetchData(String query, List params) { "similarity", cosineSimilarity); result.putAll(content.toMap()); } - - } else { - log.info( - "Document {} has {} {} instead of {}", - documentId, - filterField, - content.getString(filterField), - filterValue); } } } catch (DocumentNotFoundException e) { diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java index 06d167bc9..3276d0189 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java @@ -46,7 +46,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.couchbase.BucketDefinition; @@ -57,7 +56,7 @@ @Slf4j @Testcontainers -@Disabled +// @Disabled class CouchbaseWriterTest { BucketDefinition bucketDefinition = new BucketDefinition("bucket-name"); @@ -304,12 +303,11 @@ void testCouchbaseWrite() throws Exception { "vector": ?, "topK": 5, "bucket-name": "testbucket", - "vecPlanId": "12345", "scope-name": "_default", "collection-name": "_default", "index-name": "semantic", "filter": - {"vecPlanId": "12345"} + {"vecPlanId": ""} } """; List params = List.of(vector); diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/pyproject.toml b/langstream-runtime/langstream-runtime-impl/src/main/python/pyproject.toml index 756fda519..8e7a1b54c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/pyproject.toml +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/pyproject.toml @@ -77,6 +77,7 @@ elasticsearch = "^8.14.0" pinecone-client = {extras = ["grpc"], version = "^4.1.1"} "discord.py" = "^2.3.2" transformers = "^4.42.1" +couchbase = "^4.3.0" # workaround for https://github.com/python-poetry/poetry-plugin-export/issues/183 urllib3 = "<2" From 098163e40729831b0076877cf75990851245298e Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jul 2024 14:44:32 -0400 Subject: [PATCH 2/5] removes null filters --- .../vector/couchbase/CouchbaseDataSource.java | 21 ++++++++++++------- .../datasource/impl/CouchbaseWriterTest.java | 4 ++-- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java index df7f61429..44baecad1 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java @@ -109,25 +109,28 @@ public List> fetchData(String query, List params) { String collectionName = (String) queryMap.remove("collection-name"); String vectorIndexName = (String) queryMap.remove("index-name"); Map filter = (Map) queryMap.get("filter"); - String filterField = filter.keySet().iterator().next(); - String filterValue = (String) filter.get(filterField); SearchRequest vectorSearchRequest; // if the values in the filter are empty then remove them from the map - for (Map.Entry entry : filter.entrySet()) { - if (entry.getValue() == null || entry.getValue().toString().isEmpty()) { - filter.remove(entry.getKey()); - } + if (filter != null) { + filter.entrySet() + .removeIf( + entry -> + entry.getValue() == null + || entry.getValue().toString().isEmpty()); } // print the filter map log.info("Filter: {}", filter); - // if filter is empty, then search all documents in the collection - if (!queryMap.containsKey("filter")) { + // if filter is empty or null, then do a vector search + if (filter == null || filter.isEmpty()) { vectorSearchRequest = SearchRequest.create( VectorSearch.create( VectorQuery.create("vector", vector) .numCandidates(topK))); } else { + // if filter is present do a hybrid search + String filterField = filter.keySet().iterator().next(); + String filterValue = (String) filter.get(filterField); vectorSearchRequest = SearchRequest.create(SearchQuery.match(filterValue).field(filterField)) .vectorSearch( @@ -181,6 +184,7 @@ public List> fetchData(String query, List params) { if (filter != null && !filter.isEmpty()) { // Ensure all filter fields match their // corresponding filter values + boolean filtersMatch = true; for (Map.Entry entry : @@ -210,6 +214,7 @@ public List> fetchData(String query, List params) { if (filtersMatch) { result.put("id", hit.id()); + // Calculate and add cosine // similarity double cosineSimilarity = diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java index 3276d0189..f1806e66e 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java @@ -307,8 +307,8 @@ void testCouchbaseWrite() throws Exception { "collection-name": "_default", "index-name": "semantic", "filter": - {"vecPlanId": ""} - } + {"vecPlanId": "12345"} + } """; List params = List.of(vector); List> results = implementation.fetchData(query, params); From 992080a928ebee9aa76f9063669ddd60632a1e97 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jul 2024 16:13:02 -0400 Subject: [PATCH 3/5] multiple filters working --- .../vector/couchbase/CouchbaseDataSource.java | 51 +++++++++++++------ .../datasource/impl/CouchbaseWriterTest.java | 2 +- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java index 44baecad1..dd614169c 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseDataSource.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -111,6 +112,7 @@ public List> fetchData(String query, List params) { Map filter = (Map) queryMap.get("filter"); SearchRequest vectorSearchRequest; // if the values in the filter are empty then remove them from the map + log.info("Filter: {}", filter); if (filter != null) { filter.entrySet() .removeIf( @@ -118,25 +120,39 @@ public List> fetchData(String query, List params) { entry.getValue() == null || entry.getValue().toString().isEmpty()); } - // print the filter map - log.info("Filter: {}", filter); - // if filter is empty or null, then do a vector search - if (filter == null || filter.isEmpty()) { - vectorSearchRequest = - SearchRequest.create( - VectorSearch.create( - VectorQuery.create("vector", vector) - .numCandidates(topK))); - } else { - // if filter is present do a hybrid search - String filterField = filter.keySet().iterator().next(); - String filterValue = (String) filter.get(filterField); + // if (filter == null || filter.isEmpty()) { + // queryMap.remove("filter"); + // } + log.info("filter after removing empty values: {}", filter); + if (queryMap.containsKey("filter") && filter != null && !filter.isEmpty()) { + List filterQueries = new ArrayList<>(); + + for (Map.Entry entry : filter.entrySet()) { + String filterField = entry.getKey(); + String filterValue = entry.getValue().toString(); + filterQueries.add(SearchQuery.match(filterValue).field(filterField)); + } + + // Combine all filter queries into a conjunctive query + SearchQuery searchQuery = + SearchQuery.conjuncts(filterQueries.toArray(new SearchQuery[0])); + // print search query + log.info("Search query: {}", searchQuery); + + // Perform the vector search on the filtered documents vectorSearchRequest = - SearchRequest.create(SearchQuery.match(filterValue).field(filterField)) + SearchRequest.create(searchQuery) .vectorSearch( VectorSearch.create( VectorQuery.create("vector", vector) .numCandidates(topK))); + } else { + // Perform the vector search without any filter + vectorSearchRequest = + SearchRequest.create( + VectorSearch.create( + VectorQuery.create("vector", vector) + .numCandidates(topK))); } SearchResult vectorSearchResult = cluster.search( @@ -186,15 +202,18 @@ public List> fetchData(String query, List params) { // corresponding filter values boolean filtersMatch = true; - for (Map.Entry entry : filter.entrySet()) { String field = entry.getKey(); String value = (String) entry.getValue(); + log.info("Filter field: {}", field); + log.info("Filter value: {}", value); + log.info( + "(filter) content value {}", + content.getString(field)); // Ensure the filter field exists in // the document and isn't "" - if (content.containsKey(field) && !content.getString(field) .isEmpty() diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java index f1806e66e..afad0a472 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java @@ -307,7 +307,7 @@ void testCouchbaseWrite() throws Exception { "collection-name": "_default", "index-name": "semantic", "filter": - {"vecPlanId": "12345"} + { "vecPlanId": "12345"} } """; List params = List.of(vector); From 92b5a184d52bebf8b8dcd13824ad7f030f290af2 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jul 2024 16:14:54 -0400 Subject: [PATCH 4/5] added back disabled flag --- .../agents/vector/datasource/impl/CouchbaseWriterTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java index afad0a472..7c41fd077 100644 --- a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/CouchbaseWriterTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.couchbase.BucketDefinition; @@ -56,7 +57,7 @@ @Slf4j @Testcontainers -// @Disabled +@Disabled class CouchbaseWriterTest { BucketDefinition bucketDefinition = new BucketDefinition("bucket-name"); From 8267ba83a4fb78f9fe2962d35b36adfe5da2e90a Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jul 2024 16:37:23 -0400 Subject: [PATCH 5/5] will no longer upsert null values --- .../ai/langstream/agents/vector/couchbase/CouchbaseWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java index 3719f9654..11347e555 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/couchbase/CouchbaseWriter.java @@ -150,7 +150,7 @@ public CompletableFuture upsert(Record record, Map context metadataFunctions.forEach( (key, evaluator) -> { Object value = evaluator.evaluate(mutableRecord); - content.put(key, value); + if (value != null) content.put(key, value); }); // Perform the upsert