Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control-service: Add GraphQL read from DB #2837

Merged
merged 6 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,28 @@ private static DataJobContacts getContactsFromJob(DataJob job) {
return contacts;
}

public static JobDeploymentStatus toJobDeploymentStatus(
ActualDataJobDeployment deploymentStatus) {
JobDeploymentStatus jobDeploymentStatus = new JobDeploymentStatus();

jobDeploymentStatus.setDataJobName(deploymentStatus.getDataJobName());
jobDeploymentStatus.setPythonVersion(deploymentStatus.getPythonVersion());
jobDeploymentStatus.setGitCommitSha(deploymentStatus.getGitCommitSha());
jobDeploymentStatus.setEnabled(deploymentStatus.getEnabled());
jobDeploymentStatus.setLastDeployedBy(deploymentStatus.getLastDeployedBy());
jobDeploymentStatus.setLastDeployedDate(
deploymentStatus.getLastDeployedDate() == null
? null
: deploymentStatus.getLastDeployedDate().toString());
jobDeploymentStatus.setResources(getResourcesFromDeployment(deploymentStatus));
// The ActualDataJobDeployment does not have a mode attribute, which is required by the
// JobDeploymentStatus,
// so we need to set something in order to avoid errors.
jobDeploymentStatus.setMode("release");

return jobDeploymentStatus;
}

private static DataJobResources getResourcesFromDeployment(ActualDataJobDeployment deployment) {
DataJobResources resources = new DataJobResources();
var deploymentResources = deployment.getResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

package com.vmware.taurus.service.graphql;

import com.vmware.taurus.datajobs.DeploymentModelConverter;
import com.vmware.taurus.datajobs.ToApiModelConverter;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.repository.JobsRepository;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.graphql.model.Criteria;
import com.vmware.taurus.service.graphql.model.DataJobPage;
Expand All @@ -26,13 +30,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -50,6 +48,8 @@ public class GraphQLDataFetchers {
private final JobsRepository jobsRepository;
private final DeploymentService deploymentService;
private final ExecutionDataFetcher executionDataFetcher;
private final DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;
private final DeploymentServiceV2 deploymentServiceV2;

public DataFetcher<Object> findAllAndBuildDataJobPage() {
return dataFetchingEnvironment -> {
Expand Down Expand Up @@ -206,9 +206,15 @@ private Predicate<V2DataJob> computeSearch(

private List<V2DataJob> populateDeployments(
List<V2DataJob> allDataJob, Map<String, DataJob> dataJobs) {
Map<String, JobDeploymentStatus> deploymentStatuses =
deploymentService.readDeployments().stream()
.collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob));
Map<String, JobDeploymentStatus> deploymentStatuses;

if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.DB)) {
deploymentStatuses = readJobDeploymentsFromDb();
} else if (dataJobDeploymentPropertiesConfig.getReadDataSource().equals(ReadFrom.K8S)) {
deploymentStatuses = readJobDeploymentsFromK8s();
} else {
deploymentStatuses = Collections.emptyMap();
}

allDataJob.forEach(
dataJob -> {
Expand All @@ -227,6 +233,19 @@ private List<V2DataJob> populateDeployments(
return allDataJob;
}

private Map<String, JobDeploymentStatus> readJobDeploymentsFromK8s() {
return deploymentService.readDeployments().stream()
.collect(Collectors.toMap(JobDeploymentStatus::getDataJobName, cronJob -> cronJob));
}

private Map<String, JobDeploymentStatus> readJobDeploymentsFromDb() {
return deploymentServiceV2.findAllActualDataJobDeployments().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> DeploymentModelConverter.toJobDeploymentStatus(entry.getValue())));
}

private static DataJobPage buildResponse(int pageSize, int count, List pageList) {

return DataJobPage.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package com.vmware.taurus.service.graphql;

import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig.ReadFrom;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.repository.JobsRepository;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.graphql.model.Filter;
Expand All @@ -22,11 +26,7 @@
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByScheduleCron;
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyBySourceUrl;
import com.vmware.taurus.service.graphql.strategy.datajob.JobFieldStrategyByTeam;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.graphql.model.DataJobPage;
import com.vmware.taurus.service.model.ExecutionStatus;
import com.vmware.taurus.service.model.JobConfig;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import graphql.GraphQLException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
Expand All @@ -68,6 +69,10 @@ class GraphQLDataFetchersTest {

@Mock private DataFetchingFieldSelectionSet dataFetchingFieldSelectionSet;

@Mock private DeploymentServiceV2 deploymentServiceV2;

@Mock private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

private DataFetcher<Object> findDataJobs;

@BeforeEach
Expand All @@ -76,12 +81,18 @@ public void before() {
new JobFieldStrategyFactory(collectSupportedFieldStrategies());
GraphQLDataFetchers graphQLDataFetchers =
new GraphQLDataFetchers(
strategyFactory, jobsRepository, deploymentService, executionDataFetcher);
strategyFactory,
jobsRepository,
deploymentService,
executionDataFetcher,
dataJobDeploymentPropertiesConfig,
deploymentServiceV2);
findDataJobs = graphQLDataFetchers.findAllAndBuildDataJobPage();
}

@Test
void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs());
Expand All @@ -99,6 +110,7 @@ void testDataFetcherOfJobs_whenGettingFullList_shouldReturnAllDataJobs() throws

@Test
void testDataFetcherOfJobs_whenGettingPagedResult_shouldReturnPagedJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(2);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(2);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobs());
Expand Down Expand Up @@ -136,6 +148,7 @@ void testDataFetcherOfJobs_whenSupportedFieldProvidedWithSorting_shouldReturnJob

@Test
void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2");
Expand All @@ -156,6 +169,7 @@ void testDataFetcherOfJobs_whenSearchingSpecificJob_shouldReturnSearchedJob() th

@Test
void testDataFetcherOfJobs_whenSearchingByPattern_shouldReturnMatchingJobs() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(10);
when(dataFetchingEnvironment.getArgument("search")).thenReturn("sample-job-2");
Expand Down Expand Up @@ -213,6 +227,7 @@ void testDataFetcherOfJobs_whenValidPageNumberIsProvided_shouldNotThrowException

@Test
void testPopulateDeployments() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand All @@ -238,8 +253,39 @@ void testPopulateDeployments() throws Exception {
assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull();
}

@Test
void testPopulateDeployments_readFromDB() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.DB);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentServiceV2.findAllActualDataJobDeployments())
.thenReturn(mockMapOfActualJobDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
when(dataFetchingEnvironment.getArgument("pageSize")).thenReturn(100);
when(dataFetchingEnvironment.getSelectionSet()).thenReturn(dataFetchingFieldSelectionSet);
when(dataFetchingFieldSelectionSet.contains(JobFieldStrategyBy.DEPLOYMENT.getPath()))
.thenReturn(true);

DataJobPage dataJobPage = (DataJobPage) findDataJobs.get(dataFetchingEnvironment);

assertThat(dataJobPage.getContent()).hasSize(5);
var job2 = (V2DataJob) dataJobPage.getContent().get(1);
assertThat(job2.getDeployments()).hasSize(1);
assertThat(job2.getDeployments().get(0).getLastExecutionStatus()).isNull();
assertThat(job2.getDeployments().get(0).getLastExecutionTime()).isNull();
assertThat(job2.getDeployments().get(0).getLastExecutionDuration()).isNull();
assertThat(job2.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.8-secure");
var job4 = (V2DataJob) dataJobPage.getContent().get(3);
assertThat(job4.getDeployments()).hasSize(1);
assertThat(job4.getDeployments().get(0).getLastExecutionStatus())
.isEqualTo(DataJobExecution.StatusEnum.SUCCEEDED);
assertThat(job4.getDeployments().get(0).getLastExecutionTime()).isNull();
assertThat(job4.getDeployments().get(0).getLastExecutionDuration()).isEqualTo(0);
assertThat(job4.getDeployments().get(0).getJobPythonVersion()).isEqualTo("3.9-secure");
}

@Test
void testFilterByLastExecutionStatus() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand All @@ -263,6 +309,7 @@ void testFilterByLastExecutionStatus() throws Exception {

@Test
void testSortingByLastExecutionStatus() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -296,6 +343,7 @@ void testSortingByLastExecutionStatus() throws Exception {

@Test
void testSortingByLastExecutionTime() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -327,6 +375,7 @@ void testSortingByLastExecutionTime() throws Exception {

@Test
void testSortingByLastExecutionDuration() throws Exception {
when(dataJobDeploymentPropertiesConfig.getReadDataSource()).thenReturn(ReadFrom.K8S);
when(jobsRepository.findAll()).thenReturn(mockListOfDataJobsWithLastExecution());
when(deploymentService.readDeployments()).thenReturn(mockListOfDeployments());
when(dataFetchingEnvironment.getArgument("pageNumber")).thenReturn(1);
Expand Down Expand Up @@ -376,6 +425,15 @@ private List<DataJob> mockListOfDataJobs() {
return dataJobs;
}

private Map<String, ActualDataJobDeployment> mockMapOfActualJobDeployments() {
return Map.of(
"sample-job-1", mockSampleActualJobDeployment("sample-job-1", true, "3.8-secure"),
"sample-job-2", mockSampleActualJobDeployment("sample-job-2", false, "3.8-secure"),
"sample-job-3", mockSampleActualJobDeployment("sample-job-3", true, "3.9-secure"),
"sample-job-4", mockSampleActualJobDeployment("sample-job-4", false, "3.9-secure"),
"sample-job-5", mockSampleActualJobDeployment("sample-job-5", true, "3.9-secure"));
}

private List<DataJob> mockListOfDataJobsWithLastExecution() {
List<DataJob> dataJobs = new ArrayList<>();

Expand Down Expand Up @@ -444,6 +502,25 @@ private JobDeploymentStatus mockSampleDeployment(String jobName, boolean enabled
return status;
}

private ActualDataJobDeployment mockSampleActualJobDeployment(
String jobName, boolean enabled, String pythonVersion) {
ActualDataJobDeployment actualJobDeployment = new ActualDataJobDeployment();
actualJobDeployment.setDataJobName(jobName);
actualJobDeployment.setEnabled(enabled);
actualJobDeployment.setPythonVersion(pythonVersion);
actualJobDeployment.setLastDeployedDate(
OffsetDateTime.of(2023, 10, 25, 16, 30, 42, 42, ZoneOffset.UTC));

DataJobDeploymentResources resources = new DataJobDeploymentResources();
resources.setCpuLimitCores(1f);
resources.setCpuRequestCores(1f);
resources.setMemoryLimitMi(100);
resources.setMemoryRequestMi(100);

actualJobDeployment.setResources(resources);
return actualJobDeployment;
}

static ArrayList<LinkedHashMap<String, String>> constructFilter(Filter... filters) {
ArrayList<LinkedHashMap<String, String>> rawFilters = new ArrayList<>();
Arrays.stream(filters)
Expand Down
Loading