diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/jdbc/JdbcWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/jdbc/JdbcWriter.java index 0756b17f1..b7c6c2dad 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/jdbc/JdbcWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/jdbc/JdbcWriter.java @@ -24,8 +24,6 @@ import ai.langstream.api.database.VectorDatabaseWriterProvider; import ai.langstream.api.runner.code.Record; import ai.langstream.api.util.ConfigurationUtils; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.ArrayList; @@ -38,9 +36,6 @@ @Slf4j public class JdbcWriter implements VectorDatabaseWriterProvider { - private static final ObjectMapper MAPPER = - new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - @Override public boolean supports(Map dataSourceConfig) { return "jdbc".equals(dataSourceConfig.get("service")); diff --git a/langstream-api/src/main/java/ai/langstream/api/doc/AgentConfigurationModel.java b/langstream-api/src/main/java/ai/langstream/api/doc/AgentConfigurationModel.java index 56471f7df..4631a1b02 100644 --- a/langstream-api/src/main/java/ai/langstream/api/doc/AgentConfigurationModel.java +++ b/langstream-api/src/main/java/ai/langstream/api/doc/AgentConfigurationModel.java @@ -25,6 +25,7 @@ @AllArgsConstructor public class AgentConfigurationModel { + private String type; private String name; private String description; private Map properties; diff --git a/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java b/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java index c69a61089..b6daaeb1f 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java +++ b/langstream-core/src/main/java/ai/langstream/impl/uti/ClassConfigValidator.java @@ -150,15 +150,8 @@ public static void validateAgentModelFromClass( Class modelClazz, Map asMap, boolean allowUnknownProperties) { - final EntityRef ref = - () -> - "agent configuration (agent: '%s', type: '%s')" - .formatted( - agentConfiguration.getName() == null - ? agentConfiguration.getId() - : agentConfiguration.getName(), - agentConfiguration.getType()); - validateModelFromClass(ref, modelClazz, asMap, allowUnknownProperties); + validateModelFromClass( + new AgentEntityRef(agentConfiguration), modelClazz, asMap, allowUnknownProperties); } @AllArgsConstructor @@ -199,6 +192,22 @@ public String ref() { } } + @AllArgsConstructor + public static class AgentEntityRef implements EntityRef { + + private final AgentConfiguration agentConfiguration; + + @Override + public String ref() { + return "agent configuration (agent: '%s', type: '%s')" + .formatted( + agentConfiguration.getName() == null + ? agentConfiguration.getId() + : agentConfiguration.getName(), + agentConfiguration.getType()); + } + } + @SneakyThrows public static void validateAssetModelFromClass( AssetDefinition asset, diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java index 0d0e23415..219d30e22 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java @@ -16,6 +16,7 @@ package ai.langstream.runtime.impl.k8s.agents; import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.AgentConfigurationModel; import ai.langstream.api.doc.ConfigProperty; import ai.langstream.api.model.AgentConfiguration; import ai.langstream.api.model.Application; @@ -28,18 +29,58 @@ import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.agents.AbstractComposableAgentProvider; import ai.langstream.impl.agents.ai.steps.QueryConfiguration; +import ai.langstream.impl.uti.ClassConfigValidator; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import ai.langstream.runtime.impl.k8s.agents.vectors.CassandraVectorDatabaseWriterConfig; +import ai.langstream.runtime.impl.k8s.agents.vectors.JDBCVectorDatabaseWriterConfig; +import ai.langstream.runtime.impl.k8s.agents.vectors.MilvusVectorDatabaseWriterConfig; +import ai.langstream.runtime.impl.k8s.agents.vectors.OpenSearchVectorDatabaseWriterConfig; +import ai.langstream.runtime.impl.k8s.agents.vectors.PineconeVectorDatabaseWriterConfig; +import ai.langstream.runtime.impl.k8s.agents.vectors.SolrVectorDatabaseWriterConfig; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @Slf4j public class QueryVectorDBAgentProvider extends AbstractComposableAgentProvider { + protected static final ObjectMapper MAPPER = new ObjectMapper(); + + @Getter + @Setter + public abstract static class VectorDatabaseWriterConfig { + @ConfigProperty( + description = + """ + The defined datasource ID to use to store the vectors. + """, + required = true) + String datasource; + + public abstract Class getAgentConfigModelClass(); + + public abstract boolean isAgentConfigModelAllowUnknownProperties(); + } + protected static final String QUERY_VECTOR_DB = "query-vector-db"; protected static final String VECTOR_DB_SINK = "vector-db-sink"; + protected static final Map + SUPPORTED_VECTOR_DB_SINK_DATASOURCES = + Map.of( + "cassandra", CassandraVectorDatabaseWriterConfig.CASSANDRA, + "astra", CassandraVectorDatabaseWriterConfig.ASTRA, + "jdbc", JDBCVectorDatabaseWriterConfig.INSTANCE, + "pinecone", PineconeVectorDatabaseWriterConfig.INSTANCE, + "opensearch", OpenSearchVectorDatabaseWriterConfig.INSTANCE, + "solr", SolrVectorDatabaseWriterConfig.INSTANCE, + "milvus", MilvusVectorDatabaseWriterConfig.INSTANCE); public QueryVectorDBAgentProvider() { super( @@ -76,30 +117,79 @@ protected Map computeAgentConfiguration( // get the datasource configuration and inject it into the agent configuration String resourceId = (String) originalConfiguration.remove("datasource"); if (resourceId == null) { - throw new IllegalStateException( - "datasource is required but this exception should have been raised before ?"); + throw new IllegalArgumentException( + ClassConfigValidator.formatErrString( + new ClassConfigValidator.AgentEntityRef(agentConfiguration), + "datasource", + "is required")); } generateDataSourceConfiguration( resourceId, executionPlan.getApplication(), originalConfiguration, clusterRuntime, - pluginsRegistry); + pluginsRegistry, + agentConfiguration); return originalConfiguration; } + private boolean isAgentConfigModelAllowUnknownProperties(String type, String service) { + switch (type) { + case QUERY_VECTOR_DB: + return false; + case VECTOR_DB_SINK: + { + final VectorDatabaseWriterConfig vectorDatabaseSinkConfig = + SUPPORTED_VECTOR_DB_SINK_DATASOURCES.get(service); + if (vectorDatabaseSinkConfig == null) { + throw new IllegalArgumentException( + "Unsupported vector database service: " + + service + + ". Supported services are: " + + SUPPORTED_VECTOR_DB_SINK_DATASOURCES.keySet()); + } + return vectorDatabaseSinkConfig.isAgentConfigModelAllowUnknownProperties(); + } + default: + throw new IllegalStateException(); + } + } + + private Class getAgentConfigModelClass(String type, String service) { + switch (type) { + case QUERY_VECTOR_DB: + return QueryVectorDBConfig.class; + case VECTOR_DB_SINK: + { + final VectorDatabaseWriterConfig vectorDatabaseSinkConfig = + SUPPORTED_VECTOR_DB_SINK_DATASOURCES.get(service); + if (vectorDatabaseSinkConfig == null) { + throw new IllegalArgumentException( + "Unsupported vector database service: " + + service + + ". Supported services are: " + + SUPPORTED_VECTOR_DB_SINK_DATASOURCES.keySet()); + } + return vectorDatabaseSinkConfig.getAgentConfigModelClass(); + } + default: + throw new IllegalStateException(); + } + } + private void generateDataSourceConfiguration( String resourceId, Application applicationInstance, Map configuration, ComputeClusterRuntime computeClusterRuntime, - PluginsRegistry pluginsRegistry) { + PluginsRegistry pluginsRegistry, + AgentConfiguration agentConfiguration) { Resource resource = applicationInstance.getResources().get(resourceId); log.info("Generating datasource configuration for {}", resourceId); if (resource != null) { - Map resourceImplementation = + Map resourceConfiguration = computeClusterRuntime.getResourceImplementation(resource, pluginsRegistry); if (!resource.type().equals("datasource") && !resource.type().equals("vector-database")) { @@ -108,57 +198,60 @@ private void generateDataSourceConfiguration( + resourceId + "' is not type=datasource or type=vector-database"); } - if (configuration.containsKey("datasource")) { - throw new IllegalArgumentException("Only one datasource is supported"); + configuration.put("datasource", resourceConfiguration); + final String type = agentConfiguration.getType(); + final String service = (String) resourceConfiguration.get("service"); + final Class modelClass = getAgentConfigModelClass(type, service); + if (modelClass != null) { + ClassConfigValidator.validateAgentModelFromClass( + agentConfiguration, + modelClass, + agentConfiguration.getConfiguration(), + isAgentConfigModelAllowUnknownProperties(type, service)); } - configuration.put("datasource", resourceImplementation); } else { throw new IllegalArgumentException("Resource '" + resourceId + "' not found"); } } - @Override - protected Class getAgentConfigModelClass(String type) { - return switch (type) { - case QUERY_VECTOR_DB -> QueryVectorDBConfig.class; - case VECTOR_DB_SINK -> VectorDBSinkConfig.class; - default -> throw new IllegalStateException(type); - }; - } - - @Override - protected boolean isAgentConfigModelAllowUnknownProperties(String type) { - return switch (type) { - case QUERY_VECTOR_DB -> false; - case VECTOR_DB_SINK -> true; - default -> throw new IllegalStateException(type); - }; - } - @AgentConfig( name = "Query a vector database", description = """ - Query a vector database using Vector Search capabilities. - """) + Query a vector database using Vector Search capabilities. + """) @Data public static class QueryVectorDBConfig extends QueryConfiguration {} - @AgentConfig( - name = "Vector database sink", - description = - """ - Store vectors in a vector database. - Configuration properties depends on the vector database implementation, specified by the "datasource" property. - """) - @Data - public static class VectorDBSinkConfig { - @ConfigProperty( - description = - """ - The defined datasource ID to use to store the vectors. - """, - required = true) - private String datasource; + @Override + public Map generateSupportedTypesDocumentation() { + Map result = new LinkedHashMap<>(); + result.put( + QUERY_VECTOR_DB, + ClassConfigValidator.generateAgentModelFromClass(QueryVectorDBConfig.class)); + + for (Map.Entry datasource : + SUPPORTED_VECTOR_DB_SINK_DATASOURCES.entrySet()) { + final String service = datasource.getKey(); + AgentConfigurationModel value = + ClassConfigValidator.generateAgentModelFromClass( + datasource.getValue().getAgentConfigModelClass()); + value = deepCopy(value); + value.getProperties() + .get("datasource") + .setDescription( + "Resource id. The target resource must be type: 'datasource' or 'vector-database' and " + + "service: '" + + service + + "'."); + value.setType(VECTOR_DB_SINK); + result.put(VECTOR_DB_SINK + "_" + service, value); + } + return result; + } + + @SneakyThrows + private static AgentConfigurationModel deepCopy(AgentConfigurationModel instance) { + return MAPPER.readValue(MAPPER.writeValueAsBytes(instance), AgentConfigurationModel.class); } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/CassandraVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/CassandraVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..dc1f2871d --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/CassandraVectorDatabaseWriterConfig.java @@ -0,0 +1,83 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public abstract class CassandraVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + @AgentConfig( + name = "Cassandra", + description = + """ + Writes data to Apache Cassandra. + All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html + """) + public static class ApacheCassandraVectorDatabaseWriterConfig + extends CassandraVectorDatabaseWriterConfig { + @Override + public Class getAgentConfigModelClass() { + return ApacheCassandraVectorDatabaseWriterConfig.class; + } + } + + @AgentConfig( + name = "Astra", + description = + """ + Writes data to DataStax Astra service. + All the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html + """) + public static class AstraVectorDatabaseWriterConfig + extends CassandraVectorDatabaseWriterConfig { + + @Override + public Class getAgentConfigModelClass() { + return AstraVectorDatabaseWriterConfig.class; + } + } + + public static final ApacheCassandraVectorDatabaseWriterConfig CASSANDRA = + new ApacheCassandraVectorDatabaseWriterConfig(); + public static final AstraVectorDatabaseWriterConfig ASTRA = + new AstraVectorDatabaseWriterConfig(); + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return true; + } + + @ConfigProperty( + description = "The name of the table to write to. The table must already exist.", + required = true) + @JsonProperty("table-name") + String table; + + @ConfigProperty(description = "The keyspace of the table to write to.") + String keyspace; + + @ConfigProperty( + description = + "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", + required = true) + String mapping; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/JDBCVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/JDBCVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..9af5f0b55 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/JDBCVectorDatabaseWriterConfig.java @@ -0,0 +1,71 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import lombok.Data; + +@Data +@AgentConfig(name = "JDBC", description = """ + Writes data to any JDBC compatible database. +""") +public class JDBCVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final JDBCVectorDatabaseWriterConfig INSTANCE = + new JDBCVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return JDBCVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return false; + } + + @Data + public static class TableField { + + @ConfigProperty( + description = "Is this field part of the primary key?", + defaultValue = "false") + @JsonProperty("primary-key") + boolean primaryKey; + + @ConfigProperty(description = "Field name", required = true) + String name; + + @ConfigProperty( + description = "JSTL Expression for computing the field value.", + required = true) + String expression; + } + + @ConfigProperty( + description = "The name of the table to write to. The table must already exist.", + required = true) + @JsonProperty("table-name") + String table; + + @ConfigProperty(description = "Fields of the table to write to.", required = true) + List fields; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/MilvusVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/MilvusVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..130784893 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/MilvusVectorDatabaseWriterConfig.java @@ -0,0 +1,67 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import lombok.Data; + +@Data +@AgentConfig(name = "Milvus", description = """ + Writes data to Milvus/Zillis service. +""") +public class MilvusVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final MilvusVectorDatabaseWriterConfig INSTANCE = + new MilvusVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return MilvusVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return false; + } + + @Data + public static class MilvusField { + + @ConfigProperty(description = "Field name", required = true) + String name; + + @ConfigProperty( + description = "JSTL Expression for computing the field value.", + required = true) + String expression; + } + + @ConfigProperty(description = "Fields definition.", required = true) + List fields; + + @ConfigProperty(description = "Collection name") + @JsonProperty("collection-name") + String collectionName; + + @ConfigProperty(description = "Collection name") + @JsonProperty("database-name") + String databaseName; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/OpenSearchVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/OpenSearchVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..303f53b42 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/OpenSearchVectorDatabaseWriterConfig.java @@ -0,0 +1,139 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import lombok.Data; + +@Data +@AgentConfig( + name = "OpenSearch", + description = "Writes data to OpenSearch or AWS OpenSearch serverless.") +public class OpenSearchVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final OpenSearchVectorDatabaseWriterConfig INSTANCE = + new OpenSearchVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return OpenSearchVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return false; + } + + @Data + public static class IndexField { + + @ConfigProperty(description = "Field name", required = true) + String name; + + @ConfigProperty( + description = "JSTL Expression for computing the field value.", + required = true) + String expression; + } + + @Data + public static class BulkParameters { + @ConfigProperty( + description = + """ + The pipeline ID for preprocessing documents. + Refer to the OpenSearch documentation for more details. + """) + String pipeline; + + @ConfigProperty( + description = + """ + Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer. + Note that AWS OpenSearch supports only false. + Refer to the OpenSearch documentation for more details. + """) + String refresh; + + @ConfigProperty( + description = + """ + Set to true to require that all actions target an index alias rather than an index. + Refer to the OpenSearch documentation for more details. + """) + @JsonProperty("require_alias") + Boolean requireAlias; + + @ConfigProperty( + description = + """ + Routes the request to the specified shard. + Refer to the OpenSearch documentation for more details. + """) + String routing; + + @ConfigProperty( + description = + """ + How long to wait for the request to return. + Refer to the OpenSearch documentation for more details. + """) + String timeout; + + @ConfigProperty( + description = + """ + Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed. + Refer to the OpenSearch documentation for more details. + """) + @JsonProperty("wait_for_active_shards") + String waitForActiveShards; + } + + @ConfigProperty( + description = "The name of the index to write to. The index must already exist.", + required = true) + @JsonProperty("index-name") + String indexName; + + @ConfigProperty(description = "Index fields definition.", required = true) + List fields; + + @ConfigProperty( + description = + "JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.") + String id; + + @ConfigProperty(description = "OpenSearch bulk URL parameters.") + @JsonProperty("bulk-parameters") + BulkParameters bulkParameters; + + @ConfigProperty(description = "Flush interval in milliseconds", defaultValue = "1000") + @JsonProperty("flush-interval") + int flushInterval; + + @ConfigProperty( + description = + "Batch size for bulk operations. Hitting the batch size will trigger a flush.", + defaultValue = "10") + @JsonProperty("batch-size") + int batchSize; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/PineconeVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/PineconeVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..ab3c334f1 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/PineconeVectorDatabaseWriterConfig.java @@ -0,0 +1,62 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import lombok.Data; + +@Data +@AgentConfig(name = "Pinecone", description = """ + Writes data to Pinecone service. +""") +public class PineconeVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final PineconeVectorDatabaseWriterConfig INSTANCE = + new PineconeVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return PineconeVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return false; + } + + @ConfigProperty(description = "JSTL Expression to compute the id.") + @JsonProperty("vector.id") + String id; + + @ConfigProperty(description = "JSTL Expression to compute the vector.") + @JsonProperty("vector.vector") + String vector; + + @ConfigProperty(description = "JSTL Expression to compute the namespace.") + @JsonProperty("vector.namespace") + String namespace; + + @ConfigProperty( + description = + "Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.") + @JsonProperty("vector.metadata") + Map metadata; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/SolrVectorDatabaseWriterConfig.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/SolrVectorDatabaseWriterConfig.java new file mode 100644 index 000000000..e7cc2c8d4 --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/vectors/SolrVectorDatabaseWriterConfig.java @@ -0,0 +1,67 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents.vectors; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.runtime.impl.k8s.agents.QueryVectorDBAgentProvider; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import lombok.Data; + +@Data +@AgentConfig( + name = "Apache Solr", + description = + """ + Writes data to Apache Solr service. + The collection-name is configured at datasource level. +""") +public class SolrVectorDatabaseWriterConfig + extends QueryVectorDBAgentProvider.VectorDatabaseWriterConfig { + + public static final SolrVectorDatabaseWriterConfig INSTANCE = + new SolrVectorDatabaseWriterConfig(); + + @Override + public Class getAgentConfigModelClass() { + return SolrVectorDatabaseWriterConfig.class; + } + + @Override + public boolean isAgentConfigModelAllowUnknownProperties() { + return false; + } + + @Data + public static class SolrField { + + @ConfigProperty(description = "Field name", required = true) + String name; + + @ConfigProperty( + description = "JSTL Expression for computing the field value.", + required = true) + String expression; + } + + @ConfigProperty(description = "Fields definition.", required = true) + List fields; + + @ConfigProperty(description = "Commit within option", defaultValue = "1000") + @JsonProperty("commit-within") + int commitWithin; +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java index fa50ad38f..ebb3f83e4 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java @@ -101,6 +101,8 @@ public void testWriteDb() { configuration: datasource: "cassandra" unknown-field: "..." + table-name: "my-table" + mapping: "..." """, null); } @@ -175,14 +177,311 @@ public void testDocumentation() { } } }, - "vector-db-sink" : { - "name" : "Vector database sink", - "description" : "Store vectors in a vector database.\\nConfiguration properties depends on the vector database implementation, specified by the \\"datasource\\" property.", + "vector-db-sink_astra" : { + "type" : "vector-db-sink", + "name" : "Astra", + "description" : "Writes data to DataStax Astra service.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", "properties" : { "datasource" : { - "description" : "The defined datasource ID to use to store the vectors.", + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'astra'.", "required" : true, "type" : "string" + }, + "keyspace" : { + "description" : "The keyspace of the table to write to.", + "required" : false, + "type" : "string" + }, + "mapping" : { + "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", + "required" : true, + "type" : "string" + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_cassandra" : { + "type" : "vector-db-sink", + "name" : "Cassandra", + "description" : "Writes data to Apache Cassandra.\\nAll the options from DataStax Kafka Sink are supported: https://docs.datastax.com/en/kafka/doc/kafka/kafkaConfigTasksTOC.html", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'cassandra'.", + "required" : true, + "type" : "string" + }, + "keyspace" : { + "description" : "The keyspace of the table to write to.", + "required" : false, + "type" : "string" + }, + "mapping" : { + "description" : "Comma separated list of mapping between the table column and the record field. e.g. my_colum_id=key, my_column_name=value.name.", + "required" : true, + "type" : "string" + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_jdbc" : { + "type" : "vector-db-sink", + "name" : "JDBC", + "description" : "Writes data to any JDBC compatible database.", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'jdbc'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the table to write to.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields of the table to write to.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + }, + "primary-key" : { + "description" : "Is this field part of the primary key?", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + } + } + } + }, + "table-name" : { + "description" : "The name of the table to write to. The table must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_milvus" : { + "type" : "vector-db-sink", + "name" : "Milvus", + "description" : "Writes data to Milvus/Zillis service.", + "properties" : { + "collection-name" : { + "description" : "Collection name", + "required" : false, + "type" : "string" + }, + "database-name" : { + "description" : "Collection name", + "required" : false, + "type" : "string" + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'milvus'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + } + } + }, + "vector-db-sink_opensearch" : { + "type" : "vector-db-sink", + "name" : "OpenSearch", + "description" : "Writes data to OpenSearch or AWS OpenSearch serverless.", + "properties" : { + "batch-size" : { + "description" : "Batch size for bulk operations. Hitting the batch size will trigger a flush.", + "required" : false, + "type" : "integer", + "defaultValue" : "10" + }, + "bulk-parameters" : { + "description" : "OpenSearch bulk URL parameters.", + "required" : false, + "type" : "object", + "properties" : { + "pipeline" : { + "description" : "The pipeline ID for preprocessing documents.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "refresh" : { + "description" : "Whether to refresh the affected shards after performing the indexing operations. Default is false. true makes the changes show up in search results immediately, but hurts cluster performance. wait_for waits for a refresh. Requests take longer to return, but cluster performance doesn’t suffer.\\nNote that AWS OpenSearch supports only false.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "require_alias" : { + "description" : "Set to true to require that all actions target an index alias rather than an index.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "boolean" + }, + "routing" : { + "description" : "Routes the request to the specified shard.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "timeout" : { + "description" : "How long to wait for the request to return.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + }, + "wait_for_active_shards" : { + "description" : "Specifies the number of active shards that must be available before OpenSearch processes the bulk request. Default is 1 (only the primary shard). Set to all or a positive integer. Values greater than 1 require replicas. For example, if you specify a value of 3, the index must have two replicas distributed across two additional nodes for the request to succeed.\\nRefer to the OpenSearch documentation for more details.", + "required" : false, + "type" : "string" + } + } + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'opensearch'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Index fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Index fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } + }, + "flush-interval" : { + "description" : "Flush interval in milliseconds", + "required" : false, + "type" : "integer", + "defaultValue" : "1000" + }, + "id" : { + "description" : "JSTL Expression to compute the index _id field. Leave it empty to let OpenSearch auto-generate the _id field.", + "required" : false, + "type" : "string" + }, + "index-name" : { + "description" : "The name of the index to write to. The index must already exist.", + "required" : true, + "type" : "string" + } + } + }, + "vector-db-sink_pinecone" : { + "type" : "vector-db-sink", + "name" : "Pinecone", + "description" : "Writes data to Pinecone service.", + "properties" : { + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'pinecone'.", + "required" : true, + "type" : "string" + }, + "vector.id" : { + "description" : "JSTL Expression to compute the id.", + "required" : false, + "type" : "string" + }, + "vector.metadata" : { + "description" : "Metadata to append. The key is the metadata name and the value the JSTL Expression to compute the actual value.", + "required" : false, + "type" : "object" + }, + "vector.namespace" : { + "description" : "JSTL Expression to compute the namespace.", + "required" : false, + "type" : "string" + }, + "vector.vector" : { + "description" : "JSTL Expression to compute the vector.", + "required" : false, + "type" : "string" + } + } + }, + "vector-db-sink_solr" : { + "type" : "vector-db-sink", + "name" : "Apache Solr", + "description" : "Writes data to Apache Solr service.\\n The collection-name is configured at datasource level.", + "properties" : { + "commit-within" : { + "description" : "Commit within option", + "required" : false, + "type" : "integer", + "defaultValue" : "1000" + }, + "datasource" : { + "description" : "Resource id. The target resource must be type: 'datasource' or 'vector-database' and service: 'solr'.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields definition.", + "required" : true, + "type" : "array", + "items" : { + "description" : "Fields definition.", + "required" : true, + "type" : "object", + "properties" : { + "expression" : { + "description" : "JSTL Expression for computing the field value.", + "required" : true, + "type" : "string" + }, + "name" : { + "description" : "Field name", + "required" : true, + "type" : "string" + } + } + } } } }