Skip to content

Commit

Permalink
control-service: Add GraphQL read from DB (#2837)
Browse files Browse the repository at this point in the history
Currently, when using the GraphQL API, the data job deployment data is
read from the kubernetes
cronjobs. This works fine in general, but it also means that the
kubernetes resources become the
single point of truth, which is not ideal.
    
As part of VEP-2272, the deployment information is moved to the DB,
which improves speed and
reliability. This means that we need a way to read data from the DB,
instead of the kubernetes.
   
This change adds support for reading deployment data from the database
when using the GrapgQL
API. The option to read from the k8s remains.
    
Testing Done: Added test.

---------

Signed-off-by: Andon Andonov <andonova@vmware.com>
Co-authored-by: github-actions <>
  • Loading branch information
doks5 committed Oct 27, 2023
1 parent 85bffc0 commit b4220f6
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,28 @@ private static DataJobContacts getContactsFromJob(DataJob job) {
return contacts;
}

public static JobDeploymentStatus toJobDeploymentStatus(
ActualDataJobDeployment deploymentStatus) {
JobDeploymentStatus jobDeploymentStatus = new JobDeploymentStatus();

jobDeploymentStatus.setDataJobName(deploymentStatus.getDataJobName());
jobDeploymentStatus.setPythonVersion(deploymentStatus.getPythonVersion());
jobDeploymentStatus.setGitCommitSha(deploymentStatus.getGitCommitSha());
jobDeploymentStatus.setEnabled(deploymentStatus.getEnabled());
jobDeploymentStatus.setLastDeployedBy(deploymentStatus.getLastDeployedBy());
jobDeploymentStatus.setLastDeployedDate(
deploymentStatus.getLastDeployedDate() == null
? null
: deploymentStatus.getLastDeployedDate().toString());
jobDeploymentStatus.setResources(getResourcesFromDeployment(deploymentStatus));
// The ActualDataJobDeployment does not have a mode attribute, which is required by the
// JobDeploymentStatus,
// so we need to set something in order to avoid errors.
jobDeploymentStatus.setMode("release");

return jobDeploymentStatus;
}

private static DataJobResources getResourcesFromDeployment(ActualDataJobDeployment deployment) {
DataJobResources resources = new DataJobResources();
var deploymentResources = deployment.getResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package com.vmware.taurus.service.graphql;

import com.vmware.taurus.datajobs.DeploymentModelConverter;
import com.vmware.taurus.datajobs.ToApiModelConverter;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.repository.JobsRepository;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.graphql.model.Criteria;
import com.vmware.taurus.service.graphql.model.DataJobPage;
Expand All @@ -26,13 +30,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -50,6 +48,8 @@ public class GraphQLDataFetchers {
private final JobsRepository jobsRepository;
private final DeploymentService deploymentService;
private final ExecutionDataFetcher executionDataFetcher;
private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;
private final DeploymentServiceV2 deploymentServiceV2;

public DataFetcher<Object> findAllAndBuildDataJobPage() {
return dataFetchingEnvironment -> {
Expand Down Expand Up @@ -206,9 +206,15 @@ private Predicate<V2DataJob> computeSearch(

private List<V2DataJob> populateDeployments(
List<V2DataJob> allDataJob, Map<String, DataJob> dataJobs) {
Map<String, JobDeploymentStatus> deploymentStatuses =
deploymentService.readDeployments().stream()
.collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob));
Map<String, JobDeploymentStatus> deploymentStatuses;

if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.DB)) {
deploymentStatuses = readJobDeploymentsFromDb();
} else if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
deploymentStatuses = readJobDeploymentsFromK8s();
} else {
deploymentStatuses = Collections.emptyMap();
}

allDataJob.forEach(
dataJob -> {
Expand All @@ -227,6 +233,19 @@ private List<V2DataJob> populateDeployments(
return allDataJob;
}

private Map<String, JobDeploymentStatus> readJobDeploymentsFromK8s() {
return deploymentService.readDeployments().stream()
.collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob));
}

private Map<String, JobDeploymentStatus> readJobDeploymentsFromDb() {
return deploymentServiceV2.findAllActualDataJobDeployments().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> DeploymentModelConverter.toJobDeploymentStatus(entry.getValue())));
}

private static DataJobPage buildResponse(int pageSize, int count, List pageList) {

return DataJobPage.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package com.vmware.taurus.service.graphql;

import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.repository.JobsRepository;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.graphql.model.Filter;
Expand All @@ -22,11 +26,7 @@
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByScheduleCron;
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyBySourceUrl;
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByTeam;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.graphql.model.DataJobPage;
import com.vmware.taurus.service.model.ExecutionStatus;
import com.vmware.taurus.service.model.JobConfig;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import graphql.GraphQLException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -68,6 +69,10 @@ class GraphQLDataFetchersTest {

@Mock private DataFetchingFieldSelectionSet dataFetchingFieldSelectionSet;

@Mock private DeploymentServiceV2 deploymentServiceV2;

@Mock private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

private DataFetcher<Object> findDataJobs;

@BeforeEach
Expand All @@ -76,12 +81,18 @@ public void before() {
new JobFieldStrategyFactory(collectSupportedFieldStrategies());
GraphQLDataFetchers graphQLDataFetchers =
new GraphQLDataFetchers(
strategyFactory, jobsRepository, deploymentService, executionDataFetcher);
strategyFactory,
jobsRepository,
deploymentService,
executionDataFetcher,
dataJobDeploymentPropertiesConfig,
deploymentServiceV2);
findDataJobs = graphQLDataFetchers.findAllAndBuildDataJobPage();
}

@Test
void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs());
Expand All @@ -99,6 +110,7 @@ void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws

@Test
void testDataFetcherOfJobs_whenGettingPagedResult_shouldReturnPagedJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(2);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(2);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs());
Expand Down Expand Up @@ -136,6 +148,7 @@ void testDataFetcherOfJobs_whenSupportedFieldProvidedWithSorting_shouldReturnJob

@Test
void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2");
Expand All @@ -156,6 +169,7 @@ void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() th

@Test
void testDataFetcherOfJobs_whenSearchingByPattern_shouldReturnMatchingJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2");
Expand Down Expand Up @@ -213,6 +227,7 @@ void testDataFetcherOfJobs_whenValidPageNumberIsProvided_shouldNotThrowException

@Test
void testPopulateDeployments() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand All @@ -238,8 +253,39 @@ void testPopulateDeployments() throws Exception {
assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull();
}

@Test
void testPopulateDeployments_readFromDB() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.DB);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentServiceV2.findAllActualDataJobDeployments())
.thenReturn(mockMapOfActualJobDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(100);
when(dataFetchingEnvironment.getSelectionSet()).thenReturn(dataFetchingFieldSelectionSet);
when(dataFetchingFieldSelectionSet.contains(JobFieldStrategyBy.DEPLOYMENT.getPath()))
.thenReturn(true);

DataJobPage dataJobPage = (DataJobPage) findDataJobs.get(dataFetchingEnvironment);

assertThat(dataJobPage.getContent()).hasSize(5);
var job2 = (V2DataJob) dataJobPage.getContent().get(1);
assertThat(job2.getDeployments()).hasSize(1);
assertThat(job2.getDeployments().get(0).getLastExecutionStatus()).isNull();
assertThat(job2.getDeployments().get(0).getLastExecutionTime()).isNull();
assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull();
assertThat(job2.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.8-secure");
var job4 = (V2DataJob) dataJobPage.getContent().get(3);
assertThat(job4.getDeployments()).hasSize(1);
assertThat(job4.getDeployments().get(0).getLastExecutionStatus())
.isEqualTo(DataJobExecution.StatusEnum.SUCCEEDED);
assertThat(job4.getDeployments().get(0).getLastExecutionTime()).isNull();
assertThat(job4.getDeployments().get(0).getLastExecutionDuration()).isEqualTo(0);
assertThat(job4.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.9-secure");
}

@Test
void testFilterByLastExecutionStatus() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand All @@ -263,6 +309,7 @@ void testFilterByLastExecutionStatus() throws Exception {

@Test
void testSortingByLastExecutionStatus() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -296,6 +343,7 @@ void testSortingByLastExecutionStatus() throws Exception {

@Test
void testSortingByLastExecutionTime() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -327,6 +375,7 @@ void testSortingByLastExecutionTime() throws Exception {

@Test
void testSortingByLastExecutionDuration() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -376,6 +425,15 @@ private List<DataJob> mockListOfDataJobs() {
return dataJobs;
}

private Map<String, ActualDataJobDeployment> mockMapOfActualJobDeployments() {
return Map.of(
"sample-job-1", mockSampleActualJobDeployment("sample-job-1", true, "3.8-secure"),
"sample-job-2", mockSampleActualJobDeployment("sample-job-2", false, "3.8-secure"),
"sample-job-3", mockSampleActualJobDeployment("sample-job-3", true, "3.9-secure"),
"sample-job-4", mockSampleActualJobDeployment("sample-job-4", false, "3.9-secure"),
"sample-job-5", mockSampleActualJobDeployment("sample-job-5", true, "3.9-secure"));
}

private List<DataJob> mockListOfDataJobsWithLastExecution() {
List<DataJob> dataJobs = new ArrayList<>();

Expand Down Expand Up @@ -444,6 +502,25 @@ private JobDeploymentStatus mockSampleDeployment(String jobName, boolean enabled
return status;
}

private ActualDataJobDeployment mockSampleActualJobDeployment(
String jobName, boolean enabled, String pythonVersion) {
ActualDataJobDeployment actualJobDeployment = new ActualDataJobDeployment();
actualJobDeployment.setDataJobName(jobName);
actualJobDeployment.setEnabled(enabled);
actualJobDeployment.setPythonVersion(pythonVersion);
actualJobDeployment.setLastDeployedDate(
OffsetDateTime.of(2023, 10, 25, 16, 30, 42, 42, ZoneOffset.UTC));

DataJobDeploymentResources resources = new DataJobDeploymentResources();
resources.setCpuLimitCores(1f);
resources.setCpuRequestCores(1f);
resources.setMemoryLimitMi(100);
resources.setMemoryRequestMi(100);

actualJobDeployment.setResources(resources);
return actualJobDeployment;
}

static ArrayList<LinkedHashMap<String, String>> constructFilter(Filter... filters) {
ArrayList<LinkedHashMap<String, String>> rawFilters = new ArrayList<>();
Arrays.stream(filters)
Expand Down

0 comments on commit b4220f6

Please sign in to comment.