Skip to content

Commit

Permalink
Basic tests for K8sOperatorFlinkJobLocator
Browse files Browse the repository at this point in the history
  • Loading branch information
sap1ens committed Jul 7, 2023
1 parent cb64efa commit 673852f
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.sap1ens.heimdall.kubernetes;

import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import jakarta.inject.Singleton;
import java.util.List;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;

@Singleton
public class FlinkDeploymentClient {
private final KubernetesClient ks8Client = new KubernetesClientBuilder().build();

private final MixedOperation<
FlinkDeployment, KubernetesResourceList<FlinkDeployment>, Resource<FlinkDeployment>>
flinkDeploymentK8Client = ks8Client.resources(FlinkDeployment.class);

public List<FlinkDeployment> find(String namespace, ListOptions listOptions) {
return flinkDeploymentK8Client.inNamespace(namespace).list(listOptions).getItems();
}

public List<FlinkDeployment> find(String namespace) {
return find(namespace, new ListOptions());
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package com.sap1ens.heimdall.service;

import com.sap1ens.heimdall.AppConfig;
import com.sap1ens.heimdall.kubernetes.FlinkDeploymentClient;
import com.sap1ens.heimdall.model.FlinkJob;
import com.sap1ens.heimdall.model.FlinkJobResources;
import com.sap1ens.heimdall.model.FlinkJobType;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.quarkus.arc.lookup.LookupIfProperty;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -30,20 +25,12 @@ public class K8sOperatorFlinkJobLocator implements FlinkJobLocator {

@Inject AppConfig appConfig;

private KubernetesClient ks8Client = new KubernetesClientBuilder().build();

private MixedOperation<
FlinkDeployment, KubernetesResourceList<FlinkDeployment>, Resource<FlinkDeployment>>
flinkDeploymentK8Client = ks8Client.resources(FlinkDeployment.class);
@Inject FlinkDeploymentClient flinkDeploymentClient;

@Override
public List<FlinkJob> findAll() {
var listOptions = new ListOptions();
var flinkDeployments =
flinkDeploymentK8Client
.inNamespace(appConfig.joblocator().k8sOperator().namespaceToWatch())
.list(listOptions)
.getItems();
var namespace = appConfig.joblocator().k8sOperator().namespaceToWatch();
var flinkDeployments = flinkDeploymentClient.find(namespace);
return flinkDeployments.stream().map(this::toFlinkJob).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.sap1ens.heimdall.service;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.sap1ens.heimdall.AppConfig;
import com.sap1ens.heimdall.kubernetes.FlinkDeploymentClient;
import com.sap1ens.heimdall.model.FlinkJobType;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.quarkus.test.Mock;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.mockito.InjectMock;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

@QuarkusTest
public class K8sOperatorFlinkJobLocatorTest {

@InjectMock(convertScopes = true)
FlinkDeploymentClient flinkDeploymentClient;

@InjectMock(returnsDeepMocks = true)
AppConfig appConfig;

@Inject K8sOperatorFlinkJobLocator flinkJobLocator;

@Test
public void testFindAllEmpty() {
Mockito.when(appConfig.joblocator().k8sOperator().namespaceToWatch()).thenReturn("empty-one");
Mockito.when(flinkDeploymentClient.find("empty-one")).thenReturn(Collections.emptyList());

assertTrue(flinkJobLocator.findAll().isEmpty());
}

@Test
public void testFindAll() {
var flinkDeployments = generateFlinkDeployments(2);
Mockito.when(appConfig.joblocator().k8sOperator().namespaceToWatch()).thenReturn("default");
Mockito.when(flinkDeploymentClient.find("default")).thenReturn(flinkDeployments);

var flinkJobs = flinkJobLocator.findAll();
assertEquals(2, flinkJobs.size());
var flinkJob = flinkJobs.get(0);
assertEquals("test-flink-deployment", flinkJob.name());
assertEquals("RUNNING", flinkJob.status());
assertEquals(FlinkJobType.APPLICATION, flinkJob.type());
assertEquals("test-image", flinkJob.shortImage());
assertEquals("1.15", flinkJob.flinkVersion());
assertEquals(4, flinkJob.parallelism());
assertEquals(4, flinkJob.resources().get("tm").replicas());
assertEquals("1.0", flinkJob.resources().get("tm").cpu());
assertEquals("2048m", flinkJob.resources().get("tm").mem());
assertEquals("test-flink-deployment", flinkJob.metadata().get("flink-app"));
}

private List<FlinkDeployment> generateFlinkDeployments(int num) {
var flinkDeployments = new ArrayList<FlinkDeployment>();
for (int i = 0; i < num; i++) {
flinkDeployments.add(generateFlinkDeployment());
}
return flinkDeployments;
}

private FlinkDeployment generateFlinkDeployment() {
var flinkDeployment = new FlinkDeployment();
flinkDeployment.setMetadata(
new ObjectMetaBuilder()
.withUid(UUID.randomUUID().toString())
.withName("test-flink-deployment")
.withNamespace("default")
.withLabels(Map.of("flink-app", "test-flink-deployment"))
.build());
var jobSpec = new JobSpec();
jobSpec.setParallelism(4);
var spec = new FlinkDeploymentSpec();
spec.setImage("test-image");
spec.setFlinkVersion(FlinkVersion.v1_15);
spec.setJob(jobSpec);
var resource = new Resource();
resource.setMemory("2048m");
resource.setCpu(1.0);
var jm = new JobManagerSpec();
jm.setResource(resource);
jm.setReplicas(1);
spec.setJobManager(jm);
var tm = new TaskManagerSpec();
tm.setResource(resource);
tm.setReplicas(4);
spec.setTaskManager(tm);
flinkDeployment.setSpec(spec);
var status = new FlinkDeploymentStatus();
var jobStatus = new JobStatus();
jobStatus.setState("RUNNING");
status.setJobStatus(jobStatus);
flinkDeployment.setStatus(status);
return flinkDeployment;
}

@ApplicationScoped
@Mock
public static class MockedAppConfig implements AppConfig {
@Override
public Joblocator joblocator() {
return null;
}

@Override
public Map<String, String> patterns() {
return null;
}

@Override
public Map<String, String> endpointPathPatterns() {
return null;
}
}
}

0 comments on commit 673852f

Please sign in to comment.