diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..aa3ecb0 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.idea +.vscode +.github diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml deleted file mode 100644 index 0885752..0000000 --- a/.github/workflows/gradle.yml +++ /dev/null @@ -1,67 +0,0 @@ -# This workflow uses actions that are not certified by GitHub. -# They are provided by a third-party and are governed by -# separate terms of service, privacy policy, and support -# documentation. -# This workflow will build a Java project with Gradle and cache/restore any dependencies to improve the workflow execution time -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-java-with-gradle - -name: Java CI with Gradle - -on: - push: - branches: [ "master" ] - pull_request: - branches: [ "master" ] - -jobs: - build: - - runs-on: ubuntu-latest - permissions: - contents: read - - steps: - - uses: actions/checkout@v4 - - name: Set up JDK 17 - uses: actions/setup-java@v4 - with: - java-version: '17' - distribution: 'temurin' - - # Configure Gradle for optimal use in GitHub Actions, including caching of downloaded dependencies. - # See: https://github.com/gradle/actions/blob/main/setup-gradle/README.md - - name: Setup Gradle - uses: gradle/actions/setup-gradle@af1da67850ed9a4cedd57bfd976089dd991e2582 # v4.0.0 - - - name: Build with Gradle Wrapper - run: ./gradlew build - - # NOTE: The Gradle Wrapper is the default and recommended way to run Gradle (https://docs.gradle.org/current/userguide/gradle_wrapper.html). - # If your project does not have the Gradle Wrapper configured, you can use the following configuration to run Gradle with a specified version. - # - # - name: Setup Gradle - # uses: gradle/actions/setup-gradle@af1da67850ed9a4cedd57bfd976089dd991e2582 # v4.0.0 - # with: - # gradle-version: '8.9' - # - # - name: Build with Gradle 8.9 - # run: gradle build - - dependency-submission: - - runs-on: ubuntu-latest - permissions: - contents: write - - steps: - - uses: actions/checkout@v4 - - name: Set up JDK 17 - uses: actions/setup-java@v4 - with: - java-version: '17' - distribution: 'temurin' - - # Generates and submits a dependency graph, enabling Dependabot Alerts for all project dependencies. - # See: https://github.com/gradle/actions/blob/main/dependency-submission/README.md - - name: Generate and submit dependency graph - uses: gradle/actions/dependency-submission@af1da67850ed9a4cedd57bfd976089dd991e2582 # v4.0.0 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..e764f22 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,47 @@ +name: test + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +jobs: + unit-tests: + runs-on: ubuntu-latest + permissions: + contents: read + + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v4 + - name: Run Unit Tests + run: ./gradlew :lib:test --parallel + + integration-tests: + runs-on: ubuntu-latest + permissions: + contents: read + + steps: + - uses: actions/checkout@v4 + - name: Set up JDK 17 + uses: actions/setup-java@v4 + with: + java-version: '17' + distribution: 'temurin' + - name: Setup Minikube + uses: medyagh/setup-minikube@latest + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v4 + - name: Run Integration Tests + run: | + eval $(minikube docker-env) + ./gradlew buildDockerImages --parallel + ./gradlew :test:test diff --git a/.gitignore b/.gitignore index 1b6985c..584ab5d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ # Ignore Gradle build output directory build + +.idea +.vscode diff --git a/README.md b/README.md index c418e38..7bf1bde 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# kuberesolver +# kuberesolver-java Kuberesolver for Java diff --git a/integration/app/Dockerfile b/integration/app/Dockerfile new file mode 100644 index 0000000..960dbb1 --- /dev/null +++ b/integration/app/Dockerfile @@ -0,0 +1,30 @@ +FROM gradle:8.5.0-jdk21 AS build + +WORKDIR /kuberesolver/ +COPY ../../ ./ + +WORKDIR /kuberesolver/integration/app +RUN gradle :app:shadowJar + +FROM eclipse-temurin:21-jre AS client + +WORKDIR /app +COPY --from=build /kuberesolver/integration/app/build/libs/app-all.jar app.jar +ENTRYPOINT ["java", \ + "--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED", \ + "-cp", "app.jar", \ + "io.github.lothar1998.kuberesolver.integration.app.Client", \ + "--target", "kubernetes:///server", \ + "--port", "50052" \ +] + +FROM eclipse-temurin:21-jre AS server + +WORKDIR /app +COPY --from=build /kuberesolver/integration/app/build/libs/app-all.jar app.jar +ENTRYPOINT ["java", \ + "--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED", \ + "-cp", "app.jar", \ + "io.github.lothar1998.kuberesolver.integration.app.Server", \ + "--port", "50051" \ +] diff --git a/integration/app/build.gradle b/integration/app/build.gradle new file mode 100644 index 0000000..bd4d6bc --- /dev/null +++ b/integration/app/build.gradle @@ -0,0 +1,92 @@ +plugins { + id 'application' + id 'com.google.protobuf' version '0.9.5' + id 'com.gradleup.shadow' version '8.3.6' +} + +repositories { + mavenCentral() + mavenLocal() +} + +application { + mainClass = '' +} + +dependencies { + testImplementation libs.junit.jupiter + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + compileOnly 'org.projectlombok:lombok:1.18.38' + annotationProcessor 'org.projectlombok:lombok:1.18.38' + + implementation project(":lib") + + if (JavaVersion.current().isJava9Compatible()) { + // Workaround for @javax.annotation.Generated + // see: https://github.com/grpc/grpc-java/issues/3633 + implementation 'javax.annotation:javax.annotation-api:1.3.1' + } + + implementation 'commons-cli:commons-cli:1.9.0' + implementation 'ch.qos.logback:logback-classic:1.5.18' + implementation 'com.google.protobuf:protobuf-java:4.30.2' + implementation 'io.grpc:grpc-protobuf:1.72.0' + implementation 'io.grpc:grpc-stub:1.72.0' + implementation 'io.grpc:grpc-netty-shaded:1.72.0' + implementation 'io.projectreactor.netty:reactor-netty-http:1.2.5' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.3' + + protobuf files("proto/") +} + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } +} + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc:4.30.2' + } + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java:1.72.0' + } + } + generateProtoTasks { + ofSourceSet('main').configureEach { + plugins { + grpc {} + } + } + } +} + +shadowJar { + mergeServiceFiles() +} + +def dockerfilePath = file("Dockerfile").absolutePath +def dockerContext = rootProject.projectDir.absolutePath + +tasks.register('buildClientDockerImage', Exec) { + group = 'docker' + description = 'Builds Docker image for client target in integration app' + workingDir = dockerContext + commandLine 'sh', '-c', "docker build -f ${dockerfilePath} -t client --target client ${dockerContext}" +} + +tasks.register('buildServerDockerImage', Exec) { + group = 'docker' + description = 'Builds Docker image for server target in integration app' + workingDir = dockerContext + commandLine 'sh', '-c', "docker build -f ${dockerfilePath} -t server --target server ${dockerContext}" +} + +tasks.register('buildDockerImages') { + group = 'docker' + description = 'Builds both client and server Docker images' + dependsOn 'buildClientDockerImage', 'buildServerDockerImage' +} diff --git a/integration/app/proto/ip-service.proto b/integration/app/proto/ip-service.proto new file mode 100644 index 0000000..97de403 --- /dev/null +++ b/integration/app/proto/ip-service.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +option java_package = "io.github.lothar1998.kuberesolver.integration.app.ip"; + +service IPService { + rpc WhatIsYourIP(IPRequest) returns (IPResponse); +} + +message IPRequest { + +} + +message IPResponse { + repeated string ip_addresses = 1; +} diff --git a/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Client.java b/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Client.java new file mode 100644 index 0000000..1bc951e --- /dev/null +++ b/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Client.java @@ -0,0 +1,136 @@ +package io.github.lothar1998.kuberesolver.integration.app; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.lothar1998.kuberesolver.KubernetesNameResolverProvider; +import io.github.lothar1998.kuberesolver.integration.app.ip.IPServiceGrpc; +import io.github.lothar1998.kuberesolver.integration.app.ip.IpService; +import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolverRegistry; +import io.netty.handler.codec.http.HttpResponseStatus; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import reactor.core.publisher.Mono; +import reactor.netty.DisposableChannel; +import reactor.netty.http.server.HttpServer; + +@Slf4j +public class Client { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + static { + NameResolverRegistry.getDefaultRegistry().register(new KubernetesNameResolverProvider()); + } + + public static void main(String[] args) throws ParseException, JsonProcessingException { + var config = parseArgs(args); + log.info("starting client with configuration: target={}, port={}", config.target(), config.port()); + + Map stats = new ConcurrentHashMap<>(); + + TypeReference> typeRef = new TypeReference<>() {}; + Map serviceConfig = OBJECT_MAPPER.readValue(""" + { + "loadBalancingConfig": [ { + "round_robin": {} + } ] + } + """, typeRef); + + var channel = ManagedChannelBuilder + .forTarget(config.target()) + .defaultServiceConfig(serviceConfig) + .usePlaintext() + .build(); + var server = runHTTPServer(config.port(), stats); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + channel.shutdownNow(); + server.disposeNow(); + log.info("client closed"); + })); + + log.info("starting discovering IP address"); + var stub = IPServiceGrpc.newBlockingStub(channel); + + try { + while (true) { + var response = stub.whatIsYourIP(IpService.IPRequest.newBuilder().build()); + log.info("discovered addresses {}", response.getIpAddressesList()); + response.getIpAddressesList().forEach(ip -> stats.merge(ip, 1, Integer::sum)); + + try { + Thread.sleep(TimeUnit.MILLISECONDS.toMillis(100)); + } catch (InterruptedException ignored) { + } + } + } catch (Exception e) { + log.error("encountered error when discovering the IPs", e); + } + } + + private static DisposableChannel runHTTPServer(int port, Map stats) { + log.info("starting HTTP server on port {}", port); + return HttpServer.create() + .port(port) + .route(routes -> { + routes.get("/ip", (req, res) -> { + try { + var json = OBJECT_MAPPER.writeValueAsString(stats); + log.info("handling request {}", json); + return res.status(HttpResponseStatus.OK).sendString(Mono.just(json)); + } catch (JsonProcessingException e) { + return res.status(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + }); + routes.delete("/ip", (req, res) -> { + stats.clear(); + return res.status(HttpResponseStatus.OK); + }); + }) + .bindNow() + .onDispose(() -> log.info("HTTP server closed")); + } + + + private static Config parseArgs(String[] args) throws ParseException { + var options = new Options(); + var targetOption = Option.builder() + .longOpt("target") + .hasArg() + .desc("gRPC target") + .required() + .build(); + options.addOption(targetOption); + + var portOption = Option.builder() + .longOpt("port") + .hasArg() + .desc("HTTP server port") + .required() + .build(); + options.addOption(portOption); + + var parser = new DefaultParser(); + + try { + var cmd = parser.parse(options, args); + return new Config(cmd.getOptionValue("target"), Integer.parseInt(cmd.getOptionValue("port"))); + } catch (ParseException e) { + log.error("cannot parse cli flags"); + new HelpFormatter().printHelp("kuberesolver-test-client", options); + throw e; + } + } + + private record Config(String target, int port) { + } +} diff --git a/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Server.java b/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Server.java new file mode 100644 index 0000000..969e919 --- /dev/null +++ b/integration/app/src/main/java/io/github/lothar1998/kuberesolver/integration/app/Server.java @@ -0,0 +1,81 @@ +package io.github.lothar1998.kuberesolver.integration.app; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import io.github.lothar1998.kuberesolver.integration.app.ip.IPServiceGrpc; +import io.github.lothar1998.kuberesolver.integration.app.ip.IpService; +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +@Slf4j +public class Server { + public static void main(String[] args) throws ParseException, InterruptedException, IOException { + var config = parseConfig(args, System.getenv()); + log.info("starting server with configuration: port={}, ip_addr={}", config.port(), config.ipAddr()); + + var server = Grpc.newServerBuilderForPort(config.port(), InsecureServerCredentials.create()) + .addService(new IPService(config.ipAddr())) + .build(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + server.shutdown(); + try { + server.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + server.shutdownNow(); + })); + + server.start(); + server.awaitTermination(); + } + + private static Config parseConfig(String[] args, Map envVariables) throws ParseException { + var options = new Options(); + var portOption = Option.builder() + .longOpt("port") + .hasArg() + .desc("gRPC port") + .required() + .build(); + + portOption.setRequired(true); + options.addOption(portOption); + + var parser = new DefaultParser(); + + try { + var cmd = parser.parse(options, args); + return new Config(envVariables.get("IP_ADDR"), Integer.parseInt(cmd.getOptionValue("port"))); + } catch (ParseException e) { + log.error("cannot parse cli flags"); + new HelpFormatter().printHelp("kuberesolver-test-server", options); + throw e; + } + } + + private record Config(String ipAddr, int port) { + } + + @RequiredArgsConstructor + private static class IPService extends IPServiceGrpc.IPServiceImplBase { + private final String ipAddr; + + @Override + public void whatIsYourIP(IpService.IPRequest request, StreamObserver responseObserver) { + log.info("handling request for IP addresses"); + responseObserver.onNext(IpService.IPResponse.newBuilder().addIpAddresses(ipAddr).build()); + responseObserver.onCompleted(); + } + } +} diff --git a/integration/app/src/main/resources/logback.xml b/integration/app/src/main/resources/logback.xml new file mode 100644 index 0000000..2f22e0f --- /dev/null +++ b/integration/app/src/main/resources/logback.xml @@ -0,0 +1,15 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + + + + diff --git a/integration/test/build.gradle b/integration/test/build.gradle new file mode 100644 index 0000000..6b0dceb --- /dev/null +++ b/integration/test/build.gradle @@ -0,0 +1,38 @@ +plugins { + id 'java-library' +} + +repositories { + mavenCentral() +} + +dependencies { + testImplementation libs.junit.jupiter + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + + testImplementation 'ch.qos.logback:logback-classic:1.5.18' + testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.18.3' + testImplementation 'io.fabric8:kubernetes-client:7.2.0' + testImplementation 'org.awaitility:awaitility:4.3.0' + + testCompileOnly 'org.projectlombok:lombok:1.18.38' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.38' + +} + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } +} + +test { + testLogging { + events "passed", "skipped", "failed", "standardOut", "standardError" + showStandardStreams = true + } +} + +tasks.named('test') { + useJUnitPlatform() +} diff --git a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/IPsDiscoverer.java b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/IPsDiscoverer.java new file mode 100644 index 0000000..4b0f6ff --- /dev/null +++ b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/IPsDiscoverer.java @@ -0,0 +1,90 @@ +package io.github.lothar1998.kuberesolver.integration.test; + +import static org.awaitility.Awaitility.await; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class IPsDiscoverer { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final HttpClient client = HttpClient.newHttpClient(); + + private final URI uri; + + public IPsDiscoverer(InetAddress address, int port) { + String host = address.getHostAddress(); + + if (address instanceof Inet6Address) { + host = "[" + host + "]"; + } + + this.uri = URI.create(String.format("http://%s:%d/ip", host, port)); + } + + public Set getDiscoveredIPs() { + return await().until(this::fetchDiscoveredIPs, Optional::isPresent).get(); + } + + public void reset() { + await().until(this::clearDiscoveredIPs); + } + + private Optional> fetchDiscoveredIPs() { + var request = HttpRequest.newBuilder() + .uri(this.uri) + .GET() + .build(); + + try { + var response = client.send(request, HttpResponse.BodyHandlers.ofInputStream()); + + if (response.statusCode() != 200) { + return Optional.empty(); + } + + TypeReference> typeRef = new TypeReference<>() { + }; + + var ips = OBJECT_MAPPER.readValue(response.body(), typeRef) + .entrySet() + .stream() + .filter(this::hasBeenCalledAtLeastOnce) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + return Optional.of(ips); + } catch (Exception e) { + return Optional.empty(); + } + } + + private boolean clearDiscoveredIPs() { + var request = HttpRequest.newBuilder() + .uri(this.uri) + .DELETE() + .build(); + + try { + var response = client.send(request, HttpResponse.BodyHandlers.discarding()); + return response.statusCode() == 200; + } catch (Exception e) { + return false; + } + } + + private boolean hasBeenCalledAtLeastOnce(Map.Entry ipStat) { + return ipStat.getValue() > 0; + } +} diff --git a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java new file mode 100644 index 0000000..05b92b4 --- /dev/null +++ b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java @@ -0,0 +1,85 @@ +package io.github.lothar1998.kuberesolver.integration.test; + +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.LocalPortForward; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +public class KuberesolverTest { + + private static KubernetesManager manager; + private LocalPortForward portForward; + private IPsDiscoverer discoverer; + + @BeforeAll + static void setupAll() { + var config = new ConfigBuilder().build(); + log.info("Building Kubernetes client with context: {}", config.getCurrentContext().getName()); + var client = new KubernetesClientBuilder().build(); + manager = new KubernetesManager(client); + } + + @BeforeEach + void setup() { + log.info("Setting up environment"); + manager.startEnvironment(); + portForward = manager.portForwardToClient(); + discoverer = new IPsDiscoverer(portForward.getLocalAddress(), portForward.getLocalPort()); + } + + @AfterEach + void teardown() throws IOException { + log.info("Tearing down environment"); + manager.stopEnvironment(); + portForward.close(); + } + + @DisplayName("should continuously resolve all addresses of deployment behind a service") + @ParameterizedTest(name = "replicas changes = {0}") + @MethodSource(value = "testCases") + void continuouslyResolveAllAddressesTest(List replicasHistory) throws IOException, InterruptedException { + for (Integer replicas : replicasHistory) { + log.info("Scaling server to {} replicas", replicas); + final var serverIPs = manager.scaleServer(replicas); + log.info("Replicas have the following IPs: {}", String.join(", ", serverIPs)); + + discoverer.reset(); + + await() + .atMost(1, TimeUnit.MINUTES) + .until( + () -> { + var ips = discoverer.getDiscoveredIPs(); + if (!ips.isEmpty()) { + log.info("Discovered the following IPs: {}", String.join(", ", ips)); + } + return ips; + }, + discoveredIPs -> discoveredIPs.equals(serverIPs) + ); + } + } + + private static Stream testCases() { + return Stream.of( + List.of(1, 2, 3), + List.of(3, 2, 1), + List.of(1, 3, 2), + List.of(3, 1, 2) + ).map(Arguments::of); + } +} diff --git a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java new file mode 100644 index 0000000..d2ee2ce --- /dev/null +++ b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java @@ -0,0 +1,96 @@ +package io.github.lothar1998.kuberesolver.integration.test; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.discovery.v1.Endpoint; +import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.LocalPortForward; +import io.fabric8.kubernetes.client.dsl.NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.awaitility.Awaitility.await; + +public class KubernetesManager { + private static final String SERVER_DEPLOYMENT_NAME = "server"; + private static final String CLIENT_DEPLOYMENT_NAME = "client"; + + private static final int CLIENT_HTTP_SERVER_PORT = 50052; + + private final KubernetesClient client; + private final NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable allResources; + + public KubernetesManager(KubernetesClient client) { + this.client = client; + var manifests = KubernetesManager.class.getClassLoader().getResourceAsStream("k8s-manifests.yaml"); + this.allResources = client.load(manifests); + } + + public void startEnvironment() { + allResources.serverSideApply(); + awaitReadyDeployment(CLIENT_DEPLOYMENT_NAME); + awaitReadyDeployment(SERVER_DEPLOYMENT_NAME); + } + + public void stopEnvironment() { + allResources.delete(); + } + + public Set scaleServer(int replicasCount) { + client.apps().deployments() + .withName(SERVER_DEPLOYMENT_NAME) + .scale(replicasCount); + + return awaitScaledReadyDeployment(SERVER_DEPLOYMENT_NAME, replicasCount) + .stream() + .flatMap(e -> e.getAddresses().stream()) + .collect(Collectors.toSet()); + } + + public void awaitReadyDeployment(String deploymentName) { + awaitScaledReadyDeployment(deploymentName, 1); + } + + public List awaitScaledReadyDeployment(String deploymentName, int replicasCount) { + return await() + .atMost(1, TimeUnit.MINUTES) + .until( + () -> client.discovery().v1().endpointSlices() + .withLabel("app", deploymentName) + .list() + .getItems() + .stream() + .findAny() + .map(EndpointSlice::getEndpoints) + .stream() + .flatMap(Collection::stream) + .filter(e -> e.getConditions().getReady()) + .toList(), + readyEndpoints -> (long) readyEndpoints.size() == replicasCount + ); + } + + public LocalPortForward portForwardToClient() { + Pod pod = await() + .atMost(1, TimeUnit.MINUTES) + .until(() -> client.pods() + .withLabel("app", CLIENT_DEPLOYMENT_NAME) + .list() + .getItems() + .stream() + .filter(p -> "Running".equals(p.getStatus().getPhase())) + .findFirst() + .orElse(null), + Objects::nonNull); + + return client.pods() + .withName(pod.getMetadata().getName()) + .portForward(CLIENT_HTTP_SERVER_PORT); + } +} diff --git a/integration/test/src/test/resources/k8s-manifests.yaml b/integration/test/src/test/resources/k8s-manifests.yaml new file mode 100644 index 0000000..ceaaddf --- /dev/null +++ b/integration/test/src/test/resources/k8s-manifests.yaml @@ -0,0 +1,108 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: server + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: server + template: + metadata: + labels: + app: server + spec: + containers: + - name: server + image: server:latest + imagePullPolicy: Never + env: + - name: IP_ADDR + valueFrom: + fieldRef: + fieldPath: status.podIP + ports: + - containerPort: 50051 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: client + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: client + template: + metadata: + labels: + app: client + spec: + containers: + - name: client + image: client:latest + imagePullPolicy: Never + ports: + - containerPort: 50052 + serviceAccountName: client +--- +apiVersion: v1 +kind: Service +metadata: + name: server + namespace: default + labels: + app: server +spec: + clusterIP: None + selector: + app: server + ports: + - port: 50051 + targetPort: 50051 +--- +apiVersion: v1 +kind: Service +metadata: + name: client + namespace: default + labels: + app: client +spec: + selector: + app: client + ports: + - port: 50052 + targetPort: 50052 +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: client + namespace: default +rules: + - apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["get", "watch"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: client + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: client + namespace: default +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: client +subjects: + - kind: ServiceAccount + name: client + namespace: default diff --git a/integration/test/src/test/resources/logback.xml b/integration/test/src/test/resources/logback.xml new file mode 100644 index 0000000..452ec3f --- /dev/null +++ b/integration/test/src/test/resources/logback.xml @@ -0,0 +1,15 @@ + + + + %d{HH:mm:ss.SSS} - %msg%n%ex + + + + + + + + + + + diff --git a/lib/build.gradle b/lib/build.gradle index 5b2100c..46871a4 100644 --- a/lib/build.gradle +++ b/lib/build.gradle @@ -1,34 +1,22 @@ -/* - * This file was generated by the Gradle 'init' task. - * - * This generated file contains a sample Java library project to get you started. - * For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.13/userguide/building_java_projects.html in the Gradle documentation. - */ - plugins { - // Apply the java-library plugin for API and implementation separation. id 'java-library' } repositories { - // Use Maven Central for resolving dependencies. mavenCentral() } dependencies { - // Use JUnit Jupiter for testing. testImplementation libs.junit.jupiter - testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation 'org.wiremock:wiremock:3.12.1' + testImplementation 'org.mockito:mockito-core:5.17.0' + testImplementation 'ch.qos.logback:logback-classic:1.5.18' - // This dependency is exported to consumers, that is to say found on their compile classpath. - api libs.commons.math3 - - // This dependency is used internally, and not exposed to consumers on their own compile classpath. - implementation libs.guava + compileOnly 'io.grpc:grpc-stub:1.72.0' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.18.3' } -// Apply a specific Java toolchain to ease working on different environments. java { toolchain { languageVersion = JavaLanguageVersion.of(17) @@ -36,6 +24,5 @@ java { } tasks.named('test') { - // Use JUnit Platform for unit tests. useJUnitPlatform() } diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java new file mode 100644 index 0000000..b442477 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java @@ -0,0 +1,164 @@ +package io.github.lothar1998.kuberesolver; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher; +import io.github.lothar1998.kuberesolver.kubernetes.InClusterEndpointSliceWatcher; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointPort; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointSlice; +import io.github.lothar1998.kuberesolver.kubernetes.model.Event; +import io.github.lothar1998.kuberesolver.kubernetes.model.EventType; + +import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; +import io.grpc.NameResolver; +import io.grpc.Status; + +public final class KubernetesNameResolver extends NameResolver { + + private static final Logger LOGGER = Logger.getLogger(KubernetesNameResolver.class.getName()); + + private static final Set SUPPORTED_KUBERNETES_EVENTS = Set.of(EventType.ADDED, EventType.MODIFIED, + EventType.DELETED); + + private final Executor executor; + private final ResolverTarget params; + + private final EndpointSliceWatcher watcher; + private final Semaphore semaphore = new Semaphore(1); + + private boolean defaultExecutorUsed = false; + private Listener listener; + + public KubernetesNameResolver(ResolverTarget params) throws IOException { + this(Executors.newSingleThreadExecutor(), params); + this.defaultExecutorUsed = true; + } + + public KubernetesNameResolver(Executor executor, ResolverTarget params) + throws IOException { + this.executor = executor; + this.params = params; + if (params.namespace() != null) { + this.watcher = new InClusterEndpointSliceWatcher(params.namespace()); + } else { + this.watcher = new InClusterEndpointSliceWatcher(); + } + } + + @Override + public void start(Listener listener) { + this.listener = listener; + resolve(); + } + + @Override + public void refresh() { + if (semaphore.tryAcquire()) { + resolve(); + } + } + + private void resolve() { + executor.execute(this::watch); + } + + private void watch() { + watcher.watch(params.service(), new EndpointSliceWatcher.Subscriber() { + @Override + public void onEvent(Event event) { + // watch event occurred + if (!SUPPORTED_KUBERNETES_EVENTS.contains(event.type())) { + LOGGER.log(Level.FINER, "Unsupported Kubernetes event type {0}", + new Object[] { event.type().toString() }); + return; + } + + if (event.type().equals(EventType.DELETED)) { + LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted", + new Object[] { event.endpointSlice().metadata().name() }); + return; + } + + if (event.endpointSlice() == null) { + LOGGER.log(Level.FINE, "No EndpointSlice found in watch event"); + return; + } + + LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[] { params.service() }); + buildAddresses(event.endpointSlice()).ifPresentOrElse(a -> listener.onAddresses(a, Attributes.EMPTY), + () -> LOGGER.log(Level.FINE, "No usable addresses found for Kubernetes service {0}", + new Object[] { params.service() })); + } + + @Override + public void onError(Throwable throwable) { + // watch encountered an error + LOGGER.log(Level.FINE, "Encountered an error when watching EndpointSlice", throwable); + listener.onError(Status.fromThrowable(throwable)); + semaphore.release(); + } + + @Override + public void onCompleted() { + // watch was finished and it should be performed again after some backoff + LOGGER.log(Level.FINER, "Watch stream of EndpointSlice was finished by server"); + listener.onError(Status.UNAVAILABLE); + semaphore.release(); + } + }); + } + + @Override + public void shutdown() { + if (defaultExecutorUsed && executor instanceof ExecutorService executor) { + executor.shutdownNow(); + } + } + + @Override + public String getServiceAuthority() { + return ""; + } + + private Optional> buildAddresses(EndpointSlice endpointSlice) { + return findPort(endpointSlice.ports()) + .map(port -> endpointSlice.endpoints().stream() + .filter(endpoint -> endpoint.conditions().isReady()) + .map(endpoint -> buildAddressGroup(endpoint.addresses(), port)) + .toList()); + } + + private Optional findPort(List ports) { + if (params.port() == null) { + return ports.stream().map(EndpointPort::port).findFirst(); + } + + try { + return Optional.of(Integer.parseInt(params.port())); + } catch (NumberFormatException e) { + return ports.stream() + .filter(port -> port.name().equals(params.port())) + .map(EndpointPort::port) + .findFirst(); + } + } + + private EquivalentAddressGroup buildAddressGroup(List addresses, int port) { + var socketAddresses = addresses.stream() + .map(address -> (SocketAddress) new InetSocketAddress(address, port)) + .toList(); + return new EquivalentAddressGroup(socketAddresses, Attributes.EMPTY); + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolverProvider.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolverProvider.java new file mode 100644 index 0000000..1881bf8 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolverProvider.java @@ -0,0 +1,56 @@ +package io.github.lothar1998.kuberesolver; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Executor; + +import io.grpc.NameResolver; +import io.grpc.NameResolver.Args; +import io.grpc.NameResolverProvider; + +public class KubernetesNameResolverProvider extends NameResolverProvider { + + private String scheme = "kubernetes"; + + public KubernetesNameResolverProvider(String schema) { + this.scheme = schema; + } + + public KubernetesNameResolverProvider() { + } + + @Override + protected boolean isAvailable() { + return true; + } + + @Override + protected int priority() { + return 5; + } + + @Override + public NameResolver newNameResolver(URI targetUri, Args args) { + if (targetUri.getScheme().equals(this.scheme)) { + var params = ResolverTarget.parse(targetUri); + return buildResolver(args.getOffloadExecutor(), params); + } + return null; + } + + private NameResolver buildResolver(Executor executor, ResolverTarget params) { + try { + if (executor != null) { + return new KubernetesNameResolver(executor, params); + } + return new KubernetesNameResolver(params); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getDefaultScheme() { + return this.scheme; + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/ResolverTarget.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/ResolverTarget.java new file mode 100644 index 0000000..9d7a8b2 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/ResolverTarget.java @@ -0,0 +1,57 @@ +package io.github.lothar1998.kuberesolver; + +import java.net.URI; +import javax.annotation.Nullable; +import javax.annotation.Nonnull; + +public record ResolverTarget(@Nullable String namespace, + @Nonnull String service, + @Nullable String port) { + + public static ResolverTarget parse(URI uri) throws IllegalArgumentException { + ResolverTarget params; + if (uri.getAuthority() == null || uri.getAuthority().isEmpty()) { + // kubernetes:///service.namespace:port + params = parse(trimLeadingSlash(uri.getPath())); + } else if (uri.getPort() == -1 && (uri.getPath() != null && !uri.getPath().isEmpty())) { + // kubernetes://namespace/service:port + params = parse(trimLeadingSlash(uri.getPath())); + params = new ResolverTarget(uri.getAuthority(), params.service, params.port); + } else { + // kubernetes://service.namespace:port + params = parse(uri.getAuthority()); + } + + if (params.service.isEmpty()) { + throw new IllegalArgumentException(String.format("cannot parse service name from URI '%s'", uri)); + } + return params; + } + + private static ResolverTarget parse(String s) { + String service = s; + String namespace = null; + String port = null; + + int colonIndex = s.lastIndexOf(':'); + if (colonIndex != -1) { + service = s.substring(0, colonIndex); + port = s.substring(colonIndex + 1); + } + + var parts = service.split("\\.", 3); + if (parts.length >= 2) { + service = parts[0]; + namespace = parts[1]; + } + + return new ResolverTarget(namespace, service, port); + } + + private static String trimLeadingSlash(String s) { + if (s.startsWith("/")) { + return s.substring(1); + } + return s; + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/EndpointSliceWatcher.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/EndpointSliceWatcher.java new file mode 100644 index 0000000..4f604e1 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/EndpointSliceWatcher.java @@ -0,0 +1,80 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Scanner; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.lothar1998.kuberesolver.kubernetes.model.Event; + +public abstract sealed class EndpointSliceWatcher permits InsecureEndpointSliceWatcher, SecureEndpointSliceWatcher { + + private static final String KUBERNETES_WATCH_ENDPOINT_SLICES_URL_PATTERN = "%s/apis/discovery.k8s.io/v1/watch/namespaces/%s/endpointslices?labelSelector=kubernetes.io/service-name=%s"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE, true); + + private final String host; + private final String namespace; + + public EndpointSliceWatcher(String host, String namespace) { + this.host = host; + this.namespace = namespace; + } + + public void watch(String serviceName, Subscriber subscriber) throws UnexpectedStatusCodeException { + try { + var request = getRequest(serviceName); + var response = getClient().send(request, HttpResponse.BodyHandlers.ofInputStream()); + + if (response.statusCode() != 200) { + throw new UnexpectedStatusCodeException( + String.format("Got HTTP %s status code in response from kube-apiserver", + response.statusCode())); + } + + var responseBody = response.body(); + + try (Scanner scanner = new Scanner(new InputStreamReader(responseBody))) { + while (scanner.hasNextLine()) { + var line = scanner.nextLine(); + var event = OBJECT_MAPPER.readValue(line, Event.class); + subscriber.onEvent(event); + } + } + subscriber.onCompleted(); + } catch (Exception e) { + subscriber.onError(e); + } + } + + protected abstract HttpRequest getRequest(String serviceName) throws Exception; + + protected abstract HttpClient getClient() throws Exception; + + protected URI getURI(String serviceName) throws URISyntaxException, MalformedURLException { + var url = new URL(String.format(KUBERNETES_WATCH_ENDPOINT_SLICES_URL_PATTERN, host, namespace, serviceName)); + return url.toURI(); + } + + public interface Subscriber { + void onEvent(Event event); + + void onError(Throwable throwable); + + void onCompleted(); + } + + public static class UnexpectedStatusCodeException extends RuntimeException { + public UnexpectedStatusCodeException(String message) { + super(message); + } + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InClusterEndpointSliceWatcher.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InClusterEndpointSliceWatcher.java new file mode 100644 index 0000000..9e34637 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InClusterEndpointSliceWatcher.java @@ -0,0 +1,61 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Optional; + +public final class InClusterEndpointSliceWatcher extends SecureEndpointSliceWatcher { + + private static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST"; + private static final String KUBERNETES_SERVICE_PORT = "KUBERNETES_SERVICE_PORT"; + + private static final String KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"; + private static final String KUBERNETES_SERVICE_ACCOUNT_CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"; + private static final String KUBERNETES_NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"; + + public InClusterEndpointSliceWatcher() throws IOException { + this(getNamespace()); + } + + public InClusterEndpointSliceWatcher(String namespace) { + super(getHost(), namespace, getAuthConfigProvider()); + } + + private static String getHost() { + var address = Optional.of(System.getenv(KUBERNETES_SERVICE_HOST)).orElseThrow( + () -> new RuntimeException(String.format("%s env variable not set", KUBERNETES_SERVICE_HOST))); + + var port = Optional.of(System.getenv(KUBERNETES_SERVICE_PORT)).orElseThrow( + () -> new RuntimeException(String.format("%s env variable not set", KUBERNETES_SERVICE_PORT))); + + return String.format("https://%s:%s", address, port); + } + + private static String getNamespace() throws IOException { + try { + return Files.readString(Paths.get(KUBERNETES_NAMESPACE_PATH), StandardCharsets.UTF_8); + } catch (FileNotFoundException e) { + return "default"; + } + } + + private static AuthConfigProvider getAuthConfigProvider() { + return new AuthConfigProvider() { + + @Override + public InputStream getCaCert() throws Exception { + return new FileInputStream(KUBERNETES_SERVICE_ACCOUNT_CA_CERT_PATH); + } + + @Override + public InputStream getToken() throws Exception { + return new FileInputStream(KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH); + } + }; + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcher.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcher.java new file mode 100644 index 0000000..9e4400d --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcher.java @@ -0,0 +1,27 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpClient.Version; + +public final class InsecureEndpointSliceWatcher extends EndpointSliceWatcher { + + public InsecureEndpointSliceWatcher(String host, String namespace) { + super(host, namespace); + } + + @Override + protected HttpClient getClient() { + return HttpClient.newBuilder() + .version(Version.HTTP_1_1) + .build(); + } + + @Override + protected HttpRequest getRequest(String serviceName) throws Exception { + return HttpRequest.newBuilder(getURI(serviceName)) + .setHeader("Accept", "application/json") + .build(); + } + +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcher.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcher.java new file mode 100644 index 0000000..3bf4fcd --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcher.java @@ -0,0 +1,69 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import java.io.InputStream; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Version; +import java.nio.charset.StandardCharsets; +import java.net.http.HttpRequest; +import java.security.KeyStore; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; + +public sealed class SecureEndpointSliceWatcher extends EndpointSliceWatcher permits InClusterEndpointSliceWatcher { + + private final AuthConfigProvider authConfig; + + public SecureEndpointSliceWatcher(String host, String namespace, AuthConfigProvider authConfig) { + super(host, namespace); + this.authConfig = authConfig; + } + + @Override + protected HttpClient getClient() throws Exception { + var cf = CertificateFactory.getInstance("X.509"); + + X509Certificate x509Cert; + try (var inputStream = authConfig.getCaCert()) { + x509Cert = (X509Certificate) cf.generateCertificate(inputStream); + } + + var keyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + keyStore.load(null); + keyStore.setCertificateEntry("caCert", x509Cert); + + var tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(keyStore); + + var sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + + return HttpClient.newBuilder() + .version(Version.HTTP_1_1) + .sslContext(sslContext) + .build(); + } + + @Override + protected HttpRequest getRequest(String serviceName) throws Exception { + return HttpRequest.newBuilder(getURI(serviceName)) + .GET() + .setHeader("Authorization", String.format("Bearer %s", getToken())) + .setHeader("Accept", "application/json") + .build(); + } + + private String getToken() throws Exception { + try (var token = authConfig.getToken()) { + var bytes = token.readAllBytes(); + return new String(bytes, StandardCharsets.UTF_8); + } + } + + public interface AuthConfigProvider { + InputStream getCaCert() throws Exception; + + InputStream getToken() throws Exception; + } +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Conditions.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Conditions.java new file mode 100644 index 0000000..588a8ae --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Conditions.java @@ -0,0 +1,6 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record Conditions(@JsonProperty("ready") Boolean isReady) { +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Endpoint.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Endpoint.java new file mode 100644 index 0000000..33122ec --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Endpoint.java @@ -0,0 +1,12 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; + +public record Endpoint(@JsonProperty("addresses") @JsonSetter(nulls = Nulls.AS_EMPTY) List addresses, + @JsonProperty("conditions") Conditions conditions) { + +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointPort.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointPort.java new file mode 100644 index 0000000..0093121 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointPort.java @@ -0,0 +1,6 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record EndpointPort(@JsonProperty("name") String name, @JsonProperty("port") int port) { +}; diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointSlice.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointSlice.java new file mode 100644 index 0000000..205fe7f --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EndpointSlice.java @@ -0,0 +1,13 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; + +public record EndpointSlice( + @JsonProperty("metadata") Metadata metadata, + @JsonProperty("endpoints") @JsonSetter(nulls = Nulls.AS_EMPTY) List endpoints, + @JsonProperty("ports") @JsonSetter(nulls = Nulls.AS_EMPTY) List ports) { +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Event.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Event.java new file mode 100644 index 0000000..58970ca --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Event.java @@ -0,0 +1,6 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record Event(@JsonProperty("type") EventType type, @JsonProperty("object") EndpointSlice endpointSlice) { +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EventType.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EventType.java new file mode 100644 index 0000000..2e44138 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/EventType.java @@ -0,0 +1,19 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import com.fasterxml.jackson.annotation.JsonEnumDefaultValue; +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum EventType { + @JsonProperty("ADDED") + ADDED, + @JsonProperty("MODIFIED") + MODIFIED, + @JsonProperty("DELETED") + DELETED, + @JsonProperty("ERROR") + ERROR, + @JsonProperty("BOOKMARK") + BOOKMARK, + @JsonEnumDefaultValue + UNKNOWN +} diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Metadata.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Metadata.java new file mode 100644 index 0000000..1a14a16 --- /dev/null +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/kubernetes/model/Metadata.java @@ -0,0 +1,6 @@ +package io.github.lothar1998.kuberesolver.kubernetes.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record Metadata(@JsonProperty("name") String name) { +} diff --git a/lib/src/main/java/org/example/Library.java b/lib/src/main/java/org/example/Library.java deleted file mode 100644 index b98461b..0000000 --- a/lib/src/main/java/org/example/Library.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * This source file was generated by the Gradle 'init' task - */ -package org.example; - -public class Library { - public boolean someLibraryMethod() { - return true; - } -} diff --git a/lib/src/test/java/io/github/lothar1998/kuberesolver/ResolverTargetTest.java b/lib/src/test/java/io/github/lothar1998/kuberesolver/ResolverTargetTest.java new file mode 100644 index 0000000..5f4b195 --- /dev/null +++ b/lib/src/test/java/io/github/lothar1998/kuberesolver/ResolverTargetTest.java @@ -0,0 +1,57 @@ +package io.github.lothar1998.kuberesolver; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class ResolverTargetTest { + + @ParameterizedTest + @MethodSource("testCases") + void parseURIParamsTest(String inputURL, ResolverTarget expectedParams) throws URISyntaxException { + var uri = new URI(inputURL); + + var params = ResolverTarget.parse(uri); + + assertEquals(expectedParams, params); + } + + @Test + void parseInvalidURIParamsTest() throws URISyntaxException { + var uri = new URI(""); + + assertThrows(IllegalArgumentException.class, () -> ResolverTarget.parse(uri)); + } + + private static Stream testCases() { + return Stream.of( + Arguments.of("kubernetes:///service-name:8080", + new ResolverTarget(null, "service-name", "8080")), + Arguments.of("kubernetes:///service-name:portname", + new ResolverTarget(null, "service-name", "portname")), + Arguments.of("kubernetes:///service-name.namespace:8080", + new ResolverTarget("namespace", "service-name", "8080")), + Arguments.of("kubernetes:///service-name.namespace.svc.cluster_name", + new ResolverTarget("namespace", "service-name", null)), + Arguments.of("kubernetes:///service-name.namespace.svc.cluster_name:8080", + new ResolverTarget("namespace", "service-name", "8080")), + Arguments.of("kubernetes://namespace/service-name:8080", + new ResolverTarget("namespace", "service-name", "8080")), + Arguments.of("kubernetes://service-name:8080/", + new ResolverTarget(null, "service-name", "8080")), + Arguments.of("kubernetes://service-name.namespace:8080/", + new ResolverTarget("namespace", "service-name", "8080")), + Arguments.of("kubernetes://service-name.namespace.svc.cluster_name", + new ResolverTarget("namespace", "service-name", null)), + Arguments.of("kubernetes://service-name.namespace.svc.cluster_name:8080", + new ResolverTarget("namespace", "service-name", "8080"))); + } +} diff --git a/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcherTest.java b/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcherTest.java new file mode 100644 index 0000000..b46c88e --- /dev/null +++ b/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/InsecureEndpointSliceWatcherTest.java @@ -0,0 +1,155 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.status; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathTemplate; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher.UnexpectedStatusCodeException; +import io.github.lothar1998.kuberesolver.kubernetes.model.Conditions; +import io.github.lothar1998.kuberesolver.kubernetes.model.Endpoint; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointPort; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointSlice; +import io.github.lothar1998.kuberesolver.kubernetes.model.Event; +import io.github.lothar1998.kuberesolver.kubernetes.model.EventType; +import io.github.lothar1998.kuberesolver.kubernetes.model.Metadata; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; + +@WireMockTest +class InsecureEndpointSliceWatcherTest { + + private static final String PATH_TEMPLATE = "/apis/discovery.k8s.io/v1/watch/namespaces/{namespace}/endpointslices"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @DisplayName("should watch on events from HTTP server until the server ends the stream") + @Test + void watchAllStreamedEventsTest(WireMockRuntimeInfo wmRuntimeInfo) { + var event1 = new Event( + EventType.ADDED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of(new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true))), + List.of(new EndpointPort(null, 8080)))); + + var event2 = new Event( + EventType.MODIFIED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of( + new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true)), + new Endpoint(List.of("10.0.0.2", "10.0.1.2"), new Conditions(false))), + List.of(new EndpointPort("port-name", 8080)))); + + var event3 = new Event( + EventType.DELETED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of( + new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true)), + new Endpoint(List.of("10.0.0.2", "10.0.1.2"), new Conditions(false))), + List.of(new EndpointPort("port-name", 8080)))); + + var chunkedBody = Stream.of(event1, event2, event3) + .map(event -> { + try { + return OBJECT_MAPPER.writeValueAsString(event); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.joining("\n")); + + stubFor( + get(urlPathTemplate(PATH_TEMPLATE)) + .withPathParam("namespace", equalTo("my-namespace")) + .withQueryParam("labelSelector", equalTo("kubernetes.io/service-name=my-service")) + .withHeader("Accept", equalTo("application/json")) + .willReturn(ok(chunkedBody) + .withHeader("Content-Type", "application/json") + .withChunkedDribbleDelay(3, 1))); + + var watcher = new InsecureEndpointSliceWatcher(wmRuntimeInfo.getHttpBaseUrl(), "my-namespace"); + var subscriber = mock(InsecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("my-service", subscriber); + + var inOrder = inOrder(subscriber); + inOrder.verify(subscriber).onEvent(event1); + inOrder.verify(subscriber).onEvent(event2); + inOrder.verify(subscriber).onEvent(event3); + inOrder.verify(subscriber).onCompleted(); + } + + @DisplayName("should fail watching events due to invalid url") + @Test + void watchFailsDueToInvalidURL() { + var watcher = new InsecureEndpointSliceWatcher("invalid_path", "default"); + var subscriber = mock(InsecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(MalformedURLException.class, captor.getValue()); + } + + @DisplayName("should fail watching events due to invalid uri") + @Test + void watchFailsDueToInvalidURI() { + var watcher = new InsecureEndpointSliceWatcher("http://invalid path", "default"); + var subscriber = mock(InsecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(URISyntaxException.class, captor.getValue()); + } + + @DisplayName("should fail watching events due to non ok HTTP status code") + @Test + void watchFailsDueToNotOkHTTPStatusCode(WireMockRuntimeInfo wmRuntimeInfo) { + stubFor( + get(urlPathTemplate(PATH_TEMPLATE)) + .withPathParam("namespace", equalTo("my-namespace")) + .withQueryParam("labelSelector", equalTo("kubernetes.io/service-name=my-service")) + .withHeader("Accept", equalTo("application/json")) + .willReturn(status(500))); + + var watcher = new InsecureEndpointSliceWatcher(wmRuntimeInfo.getHttpBaseUrl(), "my-namespace"); + var subscriber = mock(InsecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("my-service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(UnexpectedStatusCodeException.class, captor.getValue()); + } +} diff --git a/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcherTest.java b/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcherTest.java new file mode 100644 index 0000000..8c89682 --- /dev/null +++ b/lib/src/test/java/io/github/lothar1998/kuberesolver/kubernetes/SecureEndpointSliceWatcherTest.java @@ -0,0 +1,189 @@ +package io.github.lothar1998.kuberesolver.kubernetes; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.status; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathTemplate; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher.UnexpectedStatusCodeException; +import io.github.lothar1998.kuberesolver.kubernetes.SecureEndpointSliceWatcher.AuthConfigProvider; +import io.github.lothar1998.kuberesolver.kubernetes.model.Conditions; +import io.github.lothar1998.kuberesolver.kubernetes.model.Endpoint; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointPort; +import io.github.lothar1998.kuberesolver.kubernetes.model.EndpointSlice; +import io.github.lothar1998.kuberesolver.kubernetes.model.Event; +import io.github.lothar1998.kuberesolver.kubernetes.model.EventType; +import io.github.lothar1998.kuberesolver.kubernetes.model.Metadata; +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; + +public class SecureEndpointSliceWatcherTest { + + private static final String PATH_TEMPLATE = "/apis/discovery.k8s.io/v1/watch/namespaces/{namespace}/endpointslices"; + private static final String TOKEN = "token1234"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @RegisterExtension + private static final WireMockExtension WIREMOCK = WireMockExtension.newInstance() + .options( + wireMockConfig() + .dynamicHttpsPort() + .httpDisabled(true) + .keystorePath(getResourcePath("wiremock.jks")) + .keystorePassword("password")) + .build(); + + private static final AuthConfigProvider AUTH_CONFIG_PROVIDER = new AuthConfigProvider() { + @Override + public InputStream getCaCert() throws Exception { + return new FileInputStream(getResourcePath("wiremock.crt")); + } + + @Override + public InputStream getToken() { + return new ByteArrayInputStream(TOKEN.getBytes()); + } + }; + + @DisplayName("should watch on events from HTTPS server until the server ends the stream") + @Test + void watchAllStreamedEventsTest() { + var event1 = new Event( + EventType.ADDED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of(new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true))), + List.of(new EndpointPort(null, 8080)))); + + var event2 = new Event( + EventType.MODIFIED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of( + new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true)), + new Endpoint(List.of("10.0.0.2", "10.0.1.2"), new Conditions(false))), + List.of(new EndpointPort("port-name", 8080)))); + + var event3 = new Event( + EventType.DELETED, + new EndpointSlice( + new Metadata("my-service-endpoint-slice"), + List.of( + new Endpoint(List.of("10.0.0.1", "10.0.1.1"), new Conditions(true)), + new Endpoint(List.of("10.0.0.2", "10.0.1.2"), new Conditions(false))), + List.of(new EndpointPort("port-name", 8080)))); + + var chunkedBody = Stream.of(event1, event2, event3) + .map(event -> { + try { + return OBJECT_MAPPER.writeValueAsString(event); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.joining("\n")); + + WIREMOCK.stubFor( + get(urlPathTemplate(PATH_TEMPLATE)) + .withPathParam("namespace", equalTo("my-namespace")) + .withQueryParam("labelSelector", equalTo("kubernetes.io/service-name=my-service")) + .withHeader("Accept", equalTo("application/json")) + .withHeader("Authorization", equalTo(String.format("Bearer %s", TOKEN))) + .willReturn(ok(chunkedBody) + .withHeader("Content-Type", "application/json") + .withChunkedDribbleDelay(3, 1))); + + var watcher = new SecureEndpointSliceWatcher(WIREMOCK.getRuntimeInfo().getHttpsBaseUrl(), "my-namespace", + AUTH_CONFIG_PROVIDER); + var subscriber = mock(InsecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("my-service", subscriber); + + var inOrder = inOrder(subscriber); + inOrder.verify(subscriber).onEvent(event1); + inOrder.verify(subscriber).onEvent(event2); + inOrder.verify(subscriber).onEvent(event3); + inOrder.verify(subscriber).onCompleted(); + } + + @DisplayName("should fail watching events due to invalid url") + @Test + void watchFailsDueToInvalidURL() { + var watcher = new SecureEndpointSliceWatcher("invalid_path", "default", AUTH_CONFIG_PROVIDER); + var subscriber = mock(SecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(MalformedURLException.class, captor.getValue()); + } + + @DisplayName("should fail watching events due to invalid uri") + @Test + void watchFailsDueToInvalidURI() { + var watcher = new SecureEndpointSliceWatcher("http://invalid path", "default", AUTH_CONFIG_PROVIDER); + var subscriber = mock(SecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(URISyntaxException.class, captor.getValue()); + } + + @DisplayName("should fail watching events due to non ok HTTP status code") + @Test + void watchFailsDueToNotOkHTTPStatusCode() { + WIREMOCK.stubFor( + get(urlPathTemplate(PATH_TEMPLATE)) + .withPathParam("namespace", equalTo("my-namespace")) + .withQueryParam("labelSelector", equalTo("kubernetes.io/service-name=my-service")) + .withHeader("Accept", equalTo("application/json")) + .withHeader("Authorization", equalTo(String.format("Bearer %s", TOKEN))) + .willReturn(status(500))); + + var watcher = new SecureEndpointSliceWatcher(WIREMOCK.getRuntimeInfo().getHttpsBaseUrl(), "my-namespace", + AUTH_CONFIG_PROVIDER); + var subscriber = mock(SecureEndpointSliceWatcher.Subscriber.class); + watcher.watch("my-service", subscriber); + + var captor = ArgumentCaptor.forClass(Throwable.class); + verify(subscriber).onError(captor.capture()); + verify(subscriber, never()).onEvent(any()); + verify(subscriber, never()).onCompleted(); + + assertInstanceOf(UnexpectedStatusCodeException.class, captor.getValue()); + } + + private static String getResourcePath(String filename) { + return SecureEndpointSliceWatcherTest.class.getClassLoader().getResource(filename).getPath(); + } +} diff --git a/lib/src/test/java/org/example/LibraryTest.java b/lib/src/test/java/org/example/LibraryTest.java deleted file mode 100644 index ef34950..0000000 --- a/lib/src/test/java/org/example/LibraryTest.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * This source file was generated by the Gradle 'init' task - */ -package org.example; - -import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; - -class LibraryTest { - @Test void someLibraryMethodReturnsTrue() { - Library classUnderTest = new Library(); - assertTrue(classUnderTest.someLibraryMethod(), "someLibraryMethod should return 'true'"); - } -} diff --git a/lib/src/test/resources/wiremock.crt b/lib/src/test/resources/wiremock.crt new file mode 100644 index 0000000..dcc2ce7 --- /dev/null +++ b/lib/src/test/resources/wiremock.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDczCCAlsCFEBR2WcljJtB5sEK1mNohs0ZX9XnMA0GCSqGSIb3DQEBCwUAMH0x +CzAJBgNVBAYTAlBMMRYwFAYDVQQIDA1MZXNzZXIgUG9sYW5kMQ8wDQYDVQQHDAZL +cmFrb3cxEjAQBgNVBAoMCU15TG9jYWxDQTEZMBcGA1UECwwQUm9vdCBDZXJ0aWZp +Y2F0ZTEWMBQGA1UEAwwNTXlMb2NhbFJvb3RDQTAgFw0yNTA0MjEyMTIwMDhaGA80 +NzYzMDMxOTIxMjAwOFowbTELMAkGA1UEBhMCUEwxFjAUBgNVBAgMDUxlc3NlciBQ +b2xhbmQxDzANBgNVBAcMBkNyYWNvdzEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUA +A4IBDwAwggEKAoIBAQDew0KDCtPMth+rKNY29mXxRXpQ5vOJ7NJQIuzHwDIp4Xrv +oYiEhHVCWeTknGjB7XXkum2Cuin3OUEcuWbD80W8uRNRb2lag0e5me7oNy6RI0nc +67niGpTJUZkbCwCPe+GgGQChJ9qmnZGTMWn2AF2OXL4o8X8ViHq2TdqYGgM/NBBy +aPvm3LkRWBmg/dRrYfZy1S6qTIGWV2dM2ulWgkWIQP8FGbUn+ZXsqlbRKjAFkbUC +yMQ7vVHcO/Xl7cfCH/oN6Qa+bNJfBtuvXJmlc9SioxBMThJX2LoXug+T89HJ5CuR +Ee6G9+e/WWh0rxCqXr26cG5Ms0LrND0bfJnqC2rtAgMBAAEwDQYJKoZIhvcNAQEL +BQADggEBAJRC9kG4uEkqH6Id2IL/1elX4jIVxAxOUMBxk2KtCz2dUTo6bwxESaHa +nW2ycktyAsDo/e/7kJl6JteOIYNIfc+Kt6w6Fwp7tE1lWGF+S8E4d9n9brjX8T6c +yKbvlDfjLSBF9f3iIkyranYVj45/VVU7GH51+FGGYVXg+fWFrwJwAcwHpiSnOGib +arrMh2Jhd2CQashR5y2dLWSHv+tHdeMFdpdGmu5uXcsklbAVPbX7x4BzASTN0x9Q +tCTfsNklYnH7FNMs3MhHtHQv8rs3pXGaiXTMN9R9o27UG9kWMT6rbJyqPFrMGNJQ +HWGBnEBwSC73AzhNoW/TGJxjqI7vNAw= +-----END CERTIFICATE----- diff --git a/lib/src/test/resources/wiremock.jks b/lib/src/test/resources/wiremock.jks new file mode 100644 index 0000000..4ee4383 Binary files /dev/null and b/lib/src/test/resources/wiremock.jks differ diff --git a/settings.gradle b/settings.gradle index 2c3eac9..41e2840 100644 --- a/settings.gradle +++ b/settings.gradle @@ -11,4 +11,6 @@ plugins { } rootProject.name = 'kuberesolver' -include('lib') +include(':lib', ':app', ':test') +project(':app').projectDir = file('integration/app') +project(':test').projectDir = file('integration/test')