Skip to content

Commit

Permalink
Merge pull request #498 from aureamunoz/fix-knative-svc-url
Browse files Browse the repository at this point in the history
Remove schema for host and use 0 as default port in knative
  • Loading branch information
aureamunoz authored Feb 20, 2023
2 parents 6897801 + 50b1fe4 commit 5838ca2
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 25 deletions.
1 change: 0 additions & 1 deletion service-discovery/knative/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>knative-client</artifactId>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>io.vertx</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.stork.servicediscovery.knative;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -147,9 +148,21 @@ private List<ServiceInstance> toStorkServiceInstances(List<Service> knServices,
: Collections.emptyMap());

Metadata<KnativeMetadataKey> knativeMetadata = Metadata.of(KnativeMetadataKey.class);
String host = knService.getStatus().getUrl();
try {
URI uri = new URI(knService.getStatus().getUrl());
if (uri != null && uri.getScheme() != null) {
host = uri.getHost();
if (host == null) { // invalid URI
throw new IllegalArgumentException("Invalid URL used: '" + uri + "'");
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage() + " for service: " + application);
}

serviceInstances
.add(new DefaultServiceInstance(ServiceInstanceIds.next(), knService.getStatus().getUrl(), 8080, secure,
.add(new DefaultServiceInstance(ServiceInstanceIds.next(), host, -1, secure,
labels,
knativeMetadata
.with(KnativeMetadataKey.META_KNATIVE_SERVICE_ID, knService.getFullResourceName())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.smallrye.stork.servicediscovery.knative;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.client.Config;
import io.smallrye.stork.Stork;
import io.smallrye.stork.api.ServiceInstance;
import io.smallrye.stork.test.StorkTestUtils;
import io.smallrye.stork.test.TestConfigProvider;

@Disabled
public class KnativeServiceDiscoveryRealClusterTest {

String k8sMasterUrl;
String namespace;

@BeforeEach
void setUp() {
TestConfigProvider.clear();
System.setProperty(Config.KUBERNETES_TRUST_CERT_SYSTEM_PROPERTY, "true");
}

@Test
void shouldDiscoverHeroesKnative() {
String svc = "hero-service";

TestConfigProvider.addServiceConfig(svc, null, "knative",
null, Map.of("knative-host", "https://api.sandbox-m2.ll9k.p1.openshiftapps.com:6443", "knative-namespace",
"amunozhe-dev", "application", "rest-heroes"));

Stork stork = StorkTestUtils.getNewStorkInstance();

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

io.smallrye.stork.api.Service service = stork.getService(svc);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from the cluster", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(40))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
ServiceInstance svcInstance = instances.get().get(0);

// This code needs the smallrye-mutiny-vertx-web-client dependency
// WebClient client = WebClient.create(Vertx.vertx(), new WebClientOptions()
// .setDefaultHost(svcInstance.getHost()).setDefaultPort(svcInstance.getPort()).setSsl(false).setTrustAll(false));
//
// await().untilAsserted(() -> Assertions.assertEquals(200,
// client.get("/api/heroes/random").sendAndAwait().statusCode()));

}

// @Test
void shouldDiscoverNamespacedKnativeServicesWithApp() {
String svc = "my-service";

TestConfigProvider.addServiceConfig("my-service", null, "knative",
null, Map.of("knative-namespace", "default", "application", "helloworld-go"));

Stork stork = StorkTestUtils.getNewStorkInstance();

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

io.smallrye.stork.api.Service service = stork.getService(svc);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from the cluster", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
ServiceInstance svcInstance = instances.get().get(0);

// WebClient client = WebClient.create(Vertx.vertx(), new WebClientOptions()
// .setDefaultHost(svcInstance.getHost()).setSsl(false).setTrustAll(false));
//
// await().untilAsserted(() -> Assertions.assertEquals(200,
// client.get("").sendAndAwait().statusCode()));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void shouldDiscoverNamespacedKnativeServices() {

String knSvcName = "my-knservice";

registerKnativeServices(knSvcName, "http://hello.test.127.0.0.1.sslip.io", null);
registerKnativeServices(knSvcName, "hello.test.127.0.0.1.sslip.io", null);

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -70,8 +70,8 @@ void shouldDiscoverNamespacedKnativeServices() {
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("http://hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(8080);
assertThat(instances.get().get(0).getHost()).isEqualTo("hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(-1);
Map<String, String> labels = instances.get().get(0).getLabels();
assertThat(labels).contains(entry("serving.knative.dev/creator", "kubernetes-admin"),
entry("serving.knative.dev/lastModifier", "kubernetes-admin"));
Expand Down Expand Up @@ -109,8 +109,8 @@ void shouldDiscoverKnativeServicesInAllNs() {

String knativeService = "my-knservice";

registerKnativeServices(knativeService, "http://hello.ns1.127.0.0.1.sslip.io", "ns1");
registerKnativeServices(knativeService, "http://hello.ns2.127.0.0.1.sslip.io", "ns2");
registerKnativeServices(knativeService, "hello.ns1.127.0.0.1.sslip.io", "ns1");
registerKnativeServices(knativeService, "hello.ns2.127.0.0.1.sslip.io", "ns2");

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -124,7 +124,7 @@ void shouldDiscoverKnativeServicesInAllNs() {

assertThat(instances.get()).hasSize(2);
assertThat(instances.get().stream().map(ServiceInstance::getHost))
.containsExactlyInAnyOrder("http://hello.ns1.127.0.0.1.sslip.io", "http://hello.ns2.127.0.0.1.sslip.io");
.containsExactlyInAnyOrder("hello.ns1.127.0.0.1.sslip.io", "hello.ns2.127.0.0.1.sslip.io");
}

@Test
Expand All @@ -135,7 +135,7 @@ void shouldGetServiceFromK8sDefaultNamespaceUsingProgrammaticAPI() {

String knativeService = "my-knservice";

registerKnativeServices(knativeService, "http://hello.test.127.0.0.1.sslip.io", null);
registerKnativeServices(knativeService, "hello.test.127.0.0.1.sslip.io", null);

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -148,8 +148,8 @@ void shouldGetServiceFromK8sDefaultNamespaceUsingProgrammaticAPI() {
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("http://hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(8080);
assertThat(instances.get().get(0).getHost()).isEqualTo("hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(-1);
Map<String, String> labels = instances.get().get(0).getLabels();
assertThat(labels).contains(entry("serving.knative.dev/creator", "kubernetes-admin"),
entry("serving.knative.dev/lastModifier", "kubernetes-admin"));
Expand All @@ -165,7 +165,7 @@ void shouldHandleSecureAttribute() {

String knSvcName = "my-knservice";

registerKnativeServices(knSvcName, "http://hello.test.127.0.0.1.sslip.io", null);
registerKnativeServices(knSvcName, "hello.test.127.0.0.1.sslip.io", null);

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -178,8 +178,8 @@ void shouldHandleSecureAttribute() {
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("http://hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(8080);
assertThat(instances.get().get(0).getHost()).isEqualTo("hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(-1);
Map<String, String> labels = instances.get().get(0).getLabels();
assertThat(labels).contains(entry("serving.knative.dev/creator", "kubernetes-admin"),
entry("serving.knative.dev/lastModifier", "kubernetes-admin"));
Expand All @@ -204,7 +204,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() {

String knSvcName = "my-knservice";

registerKnativeServices(knSvcName, "http://hello.test.127.0.0.1.sslip.io", null);
registerKnativeServices(knSvcName, "hello.test.127.0.0.1.sslip.io", null);

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Expand All @@ -217,8 +217,8 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() {
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(1);
assertThat(instances.get().get(0).getHost()).isEqualTo("http://hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(8080);
assertThat(instances.get().get(0).getHost()).isEqualTo("hello.test.127.0.0.1.sslip.io");
assertThat(instances.get().get(0).getPort()).isEqualTo(-1);
Map<String, String> labels = instances.get().get(0).getLabels();
assertThat(labels).contains(entry("serving.knative.dev/creator", "kubernetes-admin"),
entry("serving.knative.dev/lastModifier", "kubernetes-admin"));
Expand Down Expand Up @@ -252,7 +252,7 @@ void shouldFetchInstancesFromTheCache() throws InterruptedException {
server.expect().get().withPath("/apis/serving.knative.dev/v1/namespaces/test/services/my-knservice")
.andReply(200, r -> {
serverHit.incrementAndGet();
return buildKnService(knSvcName, "http://hello.test.127.0.0.1.sslip.io", "test");
return buildKnService(knSvcName, "hello.test.127.0.0.1.sslip.io", "test");
}).always();

TestConfigProvider.addServiceConfig("my-knservice", null, "knative",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.smallrye.stork.test.StorkTestUtils;
import io.smallrye.stork.test.TestConfigProvider;

@Disabled
public class KubernetesServiceDiscoveryRealClusterIT {

@BeforeEach
Expand All @@ -34,7 +35,6 @@ void setUp() {
}

@Test
@Disabled
void shouldGetServiceFromK8sDefaultNamespace() {

TestConfigProvider.addServiceConfig("rest-service", null, "kubernetes",
Expand All @@ -60,7 +60,6 @@ void shouldGetServiceFromK8sDefaultNamespace() {
}

@Test
@Disabled
void shouldGetServicesForDefaultNamespaceOnNonSpecified() {
String serviceName = "pod1";

Expand All @@ -82,7 +81,6 @@ void shouldGetServicesForDefaultNamespaceOnNonSpecified() {
}

@Test
@Disabled("doesn't work yet")
void shouldRegisterServiceInstancesInDefaultNamespace() throws InterruptedException {
TestConfigProvider.addServiceConfig("svc", null, "kubernetes",
null, Map.of("k8s-host", "https://127.0.0.1:41711/", "k8s-namespace", "stork"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ void shouldGetServiceFromK8sDefaultNamespace() {
assertThat(instances.get()).allSatisfy(si -> assertThat(si.isSecure()).isFalse());
}

@Test
void shouldGetServiceFromK8sWithApplicationNameConfig() {
TestConfigProvider.addServiceConfig("svc", null, "kubernetes",
null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "application", "greetingApp"));
Stork stork = StorkTestUtils.getNewStorkInstance();

String serviceName = "svc";
String[] ips = { "10.96.96.231", "10.96.96.232", "10.96.96.233" };

registerKubernetesResources("greetingApp", defaultNamespace, ips);

AtomicReference<List<ServiceInstance>> instances = new AtomicReference<>();

Service service = stork.getService(serviceName);
service.getServiceDiscovery().getServiceInstances()
.onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th))
.subscribe().with(instances::set);

await().atMost(Duration.ofSeconds(5))
.until(() -> instances.get() != null);

assertThat(instances.get()).hasSize(3);
assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080);
assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231",
"10.96.96.232", "10.96.96.233");
for (ServiceInstance serviceInstance : instances.get()) {
Map<String, String> labels = serviceInstance.getLabels();
assertThat(labels).contains(entry("app.kubernetes.io/name", "greetingApp"),
entry("app.kubernetes.io/version", "1.0"),
entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost())));
}
instances.get().stream().map(ServiceInstance::getMetadata).forEach(metadata -> {
Metadata<KubernetesMetadataKey> k8sMetadata = (Metadata<KubernetesMetadataKey>) metadata;
assertThat(k8sMetadata.getMetadata()).containsKey(META_K8S_SERVICE_ID);
});
assertThat(instances.get()).allSatisfy(si -> assertThat(si.isSecure()).isFalse());
}

@Test
void shouldGetServiceFromK8sDefaultNamespaceUsingProgrammaticAPI() {
Stork stork = StorkTestUtils.getNewStorkInstance();
Expand Down Expand Up @@ -472,23 +510,23 @@ private Map<String, Long> mapHostnameToIds(List<ServiceInstance> serviceInstance
return result;
}

private Endpoints buildAndRegisterKubernetesService(String serviceName, String namespace, boolean register,
private Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, boolean register,
String... ipAdresses) {

Map<String, String> serviceLabels = new HashMap<>();
serviceLabels.put("app.kubernetes.io/name", serviceName);
serviceLabels.put("app.kubernetes.io/name", applicationName);
serviceLabels.put("app.kubernetes.io/version", "1.0");

List<EndpointAddress> endpointAddresses = Arrays.stream(ipAdresses)
.map(ipAddress -> {
ObjectReference targetRef = new ObjectReference(null, null, "Pod",
serviceName + "-" + ipAsSuffix(ipAddress), namespace, null, UUID.randomUUID().toString());
applicationName + "-" + ipAsSuffix(ipAddress), namespace, null, UUID.randomUUID().toString());
EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(ipAddress).withTargetRef(targetRef)
.build();
return endpointAddress;
}).collect(Collectors.toList());
Endpoints endpoint = new EndpointsBuilder()
.withNewMetadata().withName(serviceName).withLabels(serviceLabels).endMetadata()
.withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata()
.addToSubsets(new EndpointSubsetBuilder().withAddresses(endpointAddresses)
.addToPorts(new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build())
.build())
Expand Down

0 comments on commit 5838ca2

Please sign in to comment.