Skip to content

Commit

Permalink
Support DataTransformerRuns in the platform (#724)
Browse files Browse the repository at this point in the history
  • Loading branch information
DementevNikita committed Jun 10, 2022
1 parent 03e5403 commit d9e3c91
Show file tree
Hide file tree
Showing 44 changed files with 500 additions and 356 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opendatadiscovery.oddplatform.controller;

import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.DataEntityRunApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.service.DataEntityRunService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@RestController
@RequiredArgsConstructor
public class DataEntityRunController implements DataEntityRunApi {
private final DataEntityRunService dataEntityRunService;

@Override
public Mono<ResponseEntity<DataEntityRunList>> getRuns(final Long dataEntityId,
final Integer page,
final Integer size,
final DataEntityRunStatus status,
final ServerWebExchange exchange) {
return dataEntityRunService
.getDataEntityRuns(dataEntityId, status, page, size)
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import lombok.RequiredArgsConstructor;
import org.opendatadiscovery.oddplatform.api.contract.api.DataQualityApi;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRunStatus;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetTestReport;
import org.opendatadiscovery.oddplatform.service.DataQualityService;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -36,15 +34,4 @@ public Mono<ResponseEntity<DataSetTestReport>> getDatasetTestReport(final Long d
.getDatasetTestReport(dataEntityId)
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<DataQualityTestRunList>> getRuns(final Long dataEntityId,
final Integer page,
final Integer size,
final DataQualityTestRunStatus status,
final ServerWebExchange exchange) {
return dataQualityService
.getDataQualityTestRuns(dataEntityId, status, page, size)
.map(ResponseEntity::ok);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityGroupFormData;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRef;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRun;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityType;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestExpectation;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRun;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetStats;
import org.opendatadiscovery.oddplatform.dto.DataEntityClassDto;
import org.opendatadiscovery.oddplatform.dto.DataEntityDetailsDto;
Expand All @@ -43,7 +43,7 @@ public class DataEntityMapperImpl implements DataEntityMapper {
private final TagMapper tagMapper;
private final MetadataFieldValueMapper metadataFieldValueMapper;
private final DatasetVersionMapper datasetVersionMapper;
private final DataQualityMapper dataQualityMapper;
private final DataEntityRunMapper dataEntityRunMapper;
private final TermMapper termMapper;

@Override
Expand Down Expand Up @@ -252,7 +252,7 @@ public DataEntityDetails mapDtoDetails(final DataEntityDetailsDto dto) {
.map(this::mapReference)
.collect(Collectors.toList()))
.linkedUrlList(dto.getDataQualityTestDetailsDto().linkedUrlList())
.latestRun(dataQualityMapper.mapDataQualityTestRun(
.latestRun(dataEntityRunMapper.mapDataEntityRun(
dto.getDataEntity().getId(),
dto.getDataQualityTestDetailsDto().latestTaskRun())
)
Expand Down Expand Up @@ -292,8 +292,8 @@ public DataEntityDetails mapDtoDetails(final DataEntityDetailsDto dto) {
public DataEntity mapDataQualityTest(final DataEntityDetailsDto dto) {
final DataQualityTestDetailsDto dqDto = dto.getDataQualityTestDetailsDto();

final DataQualityTestRun latestRun = dqDto.latestTaskRun() != null
? dataQualityMapper.mapDataQualityTestRun(dto.getDataEntity().getId(), dqDto.latestTaskRun())
final DataEntityRun latestRun = dqDto.latestTaskRun() != null
? dataEntityRunMapper.mapDataEntityRun(dto.getDataEntity().getId(), dqDto.latestTaskRun())
: null;

return mapPojo(dto)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opendatadiscovery.oddplatform.mapper;

import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRun;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.PageInfo;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.utils.Page;

@Mapper(config = MapperConfig.class, uses = OffsetDateTimeMapper.class)
public interface DataEntityRunMapper {
@Mapping(source = "run", target = ".")
DataEntityRun mapDataEntityRun(final Long dataEntityId, final DataEntityTaskRunPojo run);

default DataEntityRunList mapDataEntityRuns(final Long dataQualityTestId,
final Page<DataEntityTaskRunPojo> page) {
return new DataEntityRunList()
.pageInfo(new PageInfo()
.total(page.getTotal())
.hasNext(page.isHasNext()))
.items(page.getData().stream()
.map(r -> mapDataEntityRun(dataQualityTestId, r))
.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,12 @@
package org.opendatadiscovery.oddplatform.mapper;

import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRun;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataSetTestReport;
import org.opendatadiscovery.oddplatform.api.contract.model.PageInfo;
import org.opendatadiscovery.oddplatform.dto.DatasetTestReportDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.utils.Page;

@Mapper(config = MapperConfig.class, uses = OffsetDateTimeMapper.class)
public interface DataQualityMapper {
@Mapping(source = "run", target = ".")
DataQualityTestRun mapDataQualityTestRun(final Long dataQualityTestId, final DataEntityTaskRunPojo run);

@Mapping(target = "score", expression = "java( (int)(100 * report.getSuccessTotal() / report.getTotal()) )")
DataSetTestReport mapDatasetTestReport(final DatasetTestReportDto report);

default DataQualityTestRunList mapDataQualityTestRuns(final Long dataQualityTestId,
final Page<DataEntityTaskRunPojo> page) {
return new DataQualityTestRunList()
.pageInfo(new PageInfo()
.total(page.getTotal())
.hasNext(page.isHasNext()))
.items(page.getData().stream()
.map(r -> mapDataQualityTestRun(dataQualityTestId, r))
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.utils.Page;
import reactor.core.publisher.Mono;

public interface ReactiveDataEntityRunRepository {
Mono<Page<DataEntityTaskRunPojo>> getDataEntityRuns(final long dataQualityTestId,
final DataEntityRunStatus status,
final int page,
final int size);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.jooq.Condition;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Select;
import org.jooq.SelectConditionStep;
import org.jooq.SortOrder;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.repository.util.JooqQueryHelper;
import org.opendatadiscovery.oddplatform.repository.util.JooqReactiveOperations;
import org.opendatadiscovery.oddplatform.utils.Page;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;

import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY;
import static org.opendatadiscovery.oddplatform.model.Tables.DATA_ENTITY_TASK_RUN;

@Repository
@RequiredArgsConstructor
public class ReactiveDataEntityRunRepositoryImpl implements ReactiveDataEntityRunRepository {
private final JooqReactiveOperations jooqReactiveOperations;
private final JooqQueryHelper jooqQueryHelper;

@Override
public Mono<Page<DataEntityTaskRunPojo>> getDataEntityRuns(final long dataQualityTestId,
final DataEntityRunStatus status,
final int page,
final int size) {
final List<Condition> conditions = new ArrayList<>();
conditions.add(DATA_ENTITY.ID.eq(dataQualityTestId));
conditions.add(DATA_ENTITY.HOLLOW.isFalse());
if (status != null) {
conditions.add(DATA_ENTITY_TASK_RUN.STATUS.eq(status.name()));
}

final SelectConditionStep<Record> baseQuery = DSL
.select(DATA_ENTITY_TASK_RUN.fields())
.from(DATA_ENTITY_TASK_RUN)
.join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATA_ENTITY_TASK_RUN.DATA_ENTITY_ODDRN))
.where(conditions);

final Select<? extends Record> query = jooqQueryHelper.paginate(
baseQuery,
DATA_ENTITY_TASK_RUN.END_TIME,
SortOrder.DESC,
(page - 1) * size,
size
);

return jooqReactiveOperations.flux(query)
.collectList()
.flatMap(list -> jooqQueryHelper.pageifyResult(
list,
r -> r.into(DataEntityTaskRunPojo.class),
fetchCount(baseQuery)
));
}

private Mono<Long> fetchCount(final Select<Record> query) {
return jooqReactiveOperations.mono(DSL.selectCount().from(query))
.map(Record1::value1)
.map(Long::valueOf);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRunStatus;
import org.opendatadiscovery.oddplatform.dto.DatasetTestReportDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.utils.Page;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public interface ReactiveDataQualityRepository {
Flux<String> getDataQualityTestOddrnsForDataset(final long datasetId);

Mono<Page<DataEntityTaskRunPojo>> getDataQualityTestRuns(final long dataQualityTestId,
final DataQualityTestRunStatus status,
final int page,
final int size);

Mono<DatasetTestReportDto> getDatasetTestReport(final long datasetId);
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package org.opendatadiscovery.oddplatform.repository.reactive;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.jooq.Condition;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Select;
import org.jooq.SelectConditionStep;
import org.jooq.SelectHavingStep;
import org.jooq.SortOrder;
import org.jooq.impl.DSL;
import org.opendatadiscovery.oddplatform.api.contract.model.DataQualityTestRunStatus;
import org.opendatadiscovery.oddplatform.dto.DatasetTestReportDto;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityTaskRunPojo;
import org.opendatadiscovery.oddplatform.repository.util.JooqQueryHelper;
import org.opendatadiscovery.oddplatform.repository.util.JooqReactiveOperations;
import org.opendatadiscovery.oddplatform.utils.Page;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -52,41 +43,6 @@ public Flux<String> getDataQualityTestOddrnsForDataset(final long datasetId) {
return jooqReactiveOperations.flux(query).map(Record1::value1);
}

@Override
public Mono<Page<DataEntityTaskRunPojo>> getDataQualityTestRuns(final long dataQualityTestId,
final DataQualityTestRunStatus status,
final int page,
final int size) {
final List<Condition> conditions = new ArrayList<>();
conditions.add(DATA_ENTITY.ID.eq(dataQualityTestId));
conditions.add(DATA_ENTITY.HOLLOW.isFalse());
if (status != null) {
conditions.add(DATA_ENTITY_TASK_RUN.STATUS.eq(status.name()));
}

final SelectConditionStep<Record> baseQuery = DSL
.select(DATA_ENTITY_TASK_RUN.fields())
.from(DATA_ENTITY_TASK_RUN)
.join(DATA_ENTITY).on(DATA_ENTITY.ODDRN.eq(DATA_ENTITY_TASK_RUN.DATA_ENTITY_ODDRN))
.where(conditions);

final Select<? extends Record> query = jooqQueryHelper.paginate(
baseQuery,
DATA_ENTITY_TASK_RUN.END_TIME,
SortOrder.DESC,
(page - 1) * size,
size
);

return jooqReactiveOperations.flux(query)
.collectList()
.flatMap(list -> jooqQueryHelper.pageifyResult(
list,
r -> r.into(DataEntityTaskRunPojo.class),
fetchCount(baseQuery)
));
}

@Override
public Mono<DatasetTestReportDto> getDatasetTestReport(final long datasetId) {
final SelectConditionStep<Record1<LocalDateTime>> maxEndTimeSubquery = DSL
Expand All @@ -113,12 +69,6 @@ public Mono<DatasetTestReportDto> getDatasetTestReport(final long datasetId) {
.map(this::mapReport);
}

private Mono<Long> fetchCount(final Select<Record> query) {
return jooqReactiveOperations.mono(DSL.selectCount().from(query))
.map(Record1::value1)
.map(Long::valueOf);
}

private DatasetTestReportDto mapReport(final Map<String, Long> report) {
return DatasetTestReportDto.builder()
.successTotal(report.getOrDefault(SUCCESS.getValue(), 0L))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opendatadiscovery.oddplatform.service;

import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import reactor.core.publisher.Mono;

public interface DataEntityRunService {
Mono<DataEntityRunList> getDataEntityRuns(final long dataEntityId,
final DataEntityRunStatus status,
final int page,
final int size);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

package org.opendatadiscovery.oddplatform.service;

import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ArrayUtils;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunList;
import org.opendatadiscovery.oddplatform.api.contract.model.DataEntityRunStatus;
import org.opendatadiscovery.oddplatform.exception.NotFoundException;
import org.opendatadiscovery.oddplatform.mapper.DataEntityRunMapper;
import org.opendatadiscovery.oddplatform.model.tables.pojos.DataEntityPojo;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataEntityRepository;
import org.opendatadiscovery.oddplatform.repository.reactive.ReactiveDataEntityRunRepository;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import static org.opendatadiscovery.oddplatform.dto.DataEntityClassDto.DATA_QUALITY_TEST;
import static org.opendatadiscovery.oddplatform.dto.DataEntityClassDto.DATA_TRANSFORMER;

@Service
@RequiredArgsConstructor
public class DataEntityRunServiceImpl implements DataEntityRunService {
private final ReactiveDataEntityRunRepository dataEntityRunRepository;
private final ReactiveDataEntityRepository dataEntityRepository;
private final DataEntityRunMapper dataEntityRunMapper;

@Override
public Mono<DataEntityRunList> getDataEntityRuns(final long dataEntityId,
final DataEntityRunStatus status,
final int page,
final int size) {
return dataEntityRepository.get(dataEntityId)
.switchIfEmpty(Mono.error(
new NotFoundException("Data entity with id %d not found".formatted(dataEntityId))))
.filter(this::checkIfDeClassSupposedToHaveRuns)
.switchIfEmpty(Mono.error(new IllegalStateException(
"Data entity with id %d is not supposed to have runs due to its class".formatted(dataEntityId))))
.flatMap(de -> dataEntityRunRepository.getDataEntityRuns(de.getId(), status, page, size))
.map(pageInfo -> dataEntityRunMapper.mapDataEntityRuns(dataEntityId, pageInfo));
}

private boolean checkIfDeClassSupposedToHaveRuns(final DataEntityPojo dataEntity) {
return ArrayUtils.contains(dataEntity.getEntityClassIds(), DATA_TRANSFORMER.getId())
|| ArrayUtils.contains(dataEntity.getEntityClassIds(), DATA_QUALITY_TEST.getId());
}
}
Loading

0 comments on commit d9e3c91

Please sign in to comment.