Skip to content

Commit

Permalink
[gateway] Load the Topic Runtime from nar files in order to not pollu…
Browse files Browse the repository at this point in the history
…te the classpath (LangStream#414)
  • Loading branch information
eolivelli committed Sep 14, 2023
1 parent a7583f7 commit 33cfb1a
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 56 deletions.
29 changes: 29 additions & 0 deletions dev/deploy-gateway-minikube.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Clean up Minikube
# minikube image load --overwrite doens't work sometimes

set -x
rm -f /tmp/langstream-api-gateway.tar
docker save langstream/langstream-api-gateway:latest-dev > /tmp/langstream-api-gateway.tar

eval $(minikube docker-env)
docker image rm -f langstream/langstream-api-gateway:latest-dev
docker load < /tmp/langstream-api-gateway.tar
rm /tmp/langstream-api-gateway.tar

65 changes: 52 additions & 13 deletions langstream-api-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-pulsar</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down Expand Up @@ -115,18 +123,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>ai.langstream</groupId>
<artifactId>langstream-kafka-runtime</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>ai.langstream</groupId>
<artifactId>langstream-pulsar-runtime</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>ai.langstream</groupId>
<artifactId>langstream-google-api-gateway-auth</artifactId>
Expand Down Expand Up @@ -193,6 +189,43 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<!-- main implementation for the TopicConnectionsRuntime-->
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-kafka-runtime</artifactId>
<version>${project.version}</version>
<type>nar</type>
<classifier>nar</classifier>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
</artifactItem>

<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>langstream-pulsar-runtime</artifactId>
<version>${project.version}</version>
<type>nar</type>
<classifier>nar</classifier>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/agents</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand Down Expand Up @@ -249,7 +282,13 @@
</labels>
</container>
<extraDirectories>
<paths>src/main/docker/jib</paths>
<paths>
<path>src/main/docker/jib</path>
<path>
<from>${pom.build.directory}/agents</from>
<into>/app/agents</into>
</path>
</paths>
<permissions>
<permission>
<file>/entrypoint.sh</file>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
import ai.langstream.apigateway.config.StorageProperties;
import ai.langstream.apigateway.runner.CodeConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
Expand All @@ -25,7 +26,11 @@
import org.springframework.core.env.Environment;

@SpringBootApplication
@EnableConfigurationProperties({StorageProperties.class, GatewayTestAuthenticationProperties.class})
@EnableConfigurationProperties({
StorageProperties.class,
GatewayTestAuthenticationProperties.class,
CodeConfiguration.class
})
public class LangStreamApiGateway {

static {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.apigateway.runner;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "application.gateways.code")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CodeConfiguration {
private String path;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,58 @@
package ai.langstream.apigateway.runner;

import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry;
import ai.langstream.impl.nar.NarFileHandler;
import jakarta.annotation.PreDestroy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class TopicConnectionsRuntimeProviderBean {

private final NarFileHandler narFileHandler;

private final TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry;

public TopicConnectionsRuntimeProviderBean(CodeConfiguration agentsConfiguration)
throws Exception {

log.info("Agents configuration: {}", agentsConfiguration);
if (agentsConfiguration.getPath() != null) {
Path directory = Paths.get(agentsConfiguration.getPath());

if (Files.isDirectory(directory)) {
log.info("Agents directory: {}", directory);
this.narFileHandler =
new NarFileHandler(
directory, List.of(), NarFileHandler.class.getClassLoader());
this.narFileHandler.scan();
} else {
log.info("Agents directory: {} does not exist", directory);
this.narFileHandler = null;
}
} else {
this.narFileHandler = null;
}

this.topicConnectionsRuntimeRegistry = new TopicConnectionsRuntimeRegistry();
if (narFileHandler != null) {
topicConnectionsRuntimeRegistry.setPackageLoader(narFileHandler);
}
}

public TopicConnectionsRuntimeRegistry getTopicConnectionsRuntimeRegistry() {
return new TopicConnectionsRuntimeRegistry();
return topicConnectionsRuntimeRegistry;
}

@PreDestroy
public void shutdown() {
if (narFileHandler != null) {
narFileHandler.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ spring.jackson.serialization.indent-output=true
spring.jackson.serialization.order-map-entries-by-keys=true

application.storage.apps.type=kubernetes
application.storage.apps.configuration.namespaceprefix=langstream-
application.storage.apps.configuration.namespaceprefix=langstream-

application.gateways.code.path=/app/agents
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import ai.langstream.apigateway.websocket.api.ProduceRequest;
import ai.langstream.apigateway.websocket.api.ProduceResponse;
import ai.langstream.impl.deploy.ApplicationDeployer;
import ai.langstream.impl.nar.NarFileHandler;
import ai.langstream.impl.parser.ModelBuilder;
import ai.langstream.kafka.extensions.KafkaContainerExtension;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -54,6 +55,7 @@
import jakarta.websocket.DeploymentException;
import jakarta.websocket.Session;
import java.net.URI;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -65,6 +67,7 @@
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -88,8 +91,16 @@
"spring.main.allow-bean-definition-overriding=true",
})
@WireMockTest
@Slf4j
class ProduceConsumeHandlerTest {

public static final Path agentsDirectory;

static {
agentsDirectory = Path.of(System.getProperty("user.dir"), "target", "agents");
log.info("Agents directory is {}", agentsDirectory);
}

protected static final ObjectMapper MAPPER = new ObjectMapper();

@RegisterExtension
Expand Down Expand Up @@ -265,37 +276,43 @@ public void onError(Throwable throwable) {
}
}

private void prepareTopicsForTest(String... topic) {
private void prepareTopicsForTest(String... topic) throws Exception {
topics = List.of(topic);
TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
new TopicConnectionsRuntimeRegistry();
final ApplicationDeployer deployer =
ApplicationDeployer.builder()
.pluginsRegistry(new PluginsRegistry())
.registry(new ClusterRuntimeRegistry())
.topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
.build();
final StreamingCluster streamingCluster =
new StreamingCluster(
"kafka",
Map.of(
"admin",
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"default.api.timeout.ms",
5000)));
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime()
.deploy(
deployer.createImplementation(
"app", store.get("t", "app", false).getInstance()));
try (NarFileHandler narFileHandler =
new NarFileHandler(
agentsDirectory, List.of(), NarFileHandler.class.getClassLoader()); ) {
narFileHandler.scan();
TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
new TopicConnectionsRuntimeRegistry();
topicConnectionsRuntimeRegistry.setPackageLoader(narFileHandler);
final ApplicationDeployer deployer =
ApplicationDeployer.builder()
.pluginsRegistry(new PluginsRegistry())
.registry(new ClusterRuntimeRegistry())
.topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
.build();
final StreamingCluster streamingCluster =
new StreamingCluster(
"kafka",
Map.of(
"admin",
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"default.api.timeout.ms",
5000)));
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime()
.deploy(
deployer.createImplementation(
"app", store.get("t", "app", false).getInstance()));
}
}

@ParameterizedTest
@ValueSource(strings = {"consume", "produce"})
void testParametersRequired(String type) {
void testParametersRequired(String type) throws Exception {
final String topic = genTopic();
prepareTopicsForTest(topic);

Expand Down Expand Up @@ -452,7 +469,7 @@ void testFilterOutMessagesByFixedValue() throws Exception {
}

@Test
void testAuthentication() {
void testAuthentication() throws Exception {
final String topic = genTopic();
prepareTopicsForTest(topic);

Expand Down Expand Up @@ -557,7 +574,7 @@ void testAuthentication() {
}

@Test
void testTestCredentials() {
void testTestCredentials() throws Exception {
wireMock.register(
WireMock.get("/auth/tenant1")
.withHeader("Authorization", WireMock.equalTo("Bearer test-user-password"))
Expand Down Expand Up @@ -831,7 +848,7 @@ void testFilterOutMessagesByParamValue() throws Exception {
}

@Test
void testProduce() {
void testProduce() throws Exception {
final String topic = genTopic();
prepareTopicsForTest(topic);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
application.gateways.code.path=target/agents
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.runtime.agent.nar;
package ai.langstream.impl.nar;

import ai.langstream.api.codestorage.GenericZipFileArchiveFile;
import ai.langstream.api.codestorage.LocalZipFileArchiveFile;
Expand Down
Loading

0 comments on commit 33cfb1a

Please sign in to comment.