Skip to content

Commit

Permalink
Add interval packing for concepts (#130)
Browse files Browse the repository at this point in the history
Co-authored-by: Torben Meyer <torben.meyer@bakdata.com>
  • Loading branch information
jnsrnhld and torbsto committed Nov 2, 2023
1 parent 2707adc commit ac1a397
Show file tree
Hide file tree
Showing 121 changed files with 1,676 additions and 1,180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Optional;
import java.util.Set;

import com.bakdata.conquery.sql.conversion.cqelement.concept.CteStep;
import com.bakdata.conquery.sql.conversion.cqelement.concept.ConceptCteStep;

/**
* A converter converts an input into a result object if the input matches the conversion class.
Expand All @@ -24,10 +24,10 @@ default <I> Optional<R> tryConvert(I input, X context) {
/**
* All steps this {@link Converter} requires.
*
* @return PREPROCESSING, AGGREGATION_SELECT and FINAL {@link CteStep} as defaults. Override if more steps are required.
* @return PREPROCESSING, AGGREGATION_SELECT and FINAL {@link ConceptCteStep} as defaults. Override if more steps are required.
*/
default Set<CteStep> requiredSteps() {
return CteStep.MANDATORY_STEPS;
default Set<ConceptCteStep> requiredSteps() {
return ConceptCteStep.MANDATORY_STEPS;
}

Class<? extends C> getConversionClass();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,61 +1,47 @@
package com.bakdata.conquery.sql.conversion.cqelement.concept;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.conversion.model.ConceptSelects;
import com.bakdata.conquery.sql.conversion.model.filter.FilterCondition;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;
import com.bakdata.conquery.sql.conversion.model.Selects;
import com.bakdata.conquery.sql.conversion.model.filter.FilterCondition;
import com.bakdata.conquery.sql.conversion.model.select.ExistsSqlSelect;
import com.bakdata.conquery.sql.conversion.model.select.ExtractingSqlSelect;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;
import org.jooq.Condition;

class AggregationFilterCte extends ConceptCte {

@Override
public QueryStep.QueryStepBuilder convertStep(CteContext cteContext) {

String aggregationFilterPredecessorCte = cteContext.getConceptTables().getPredecessorTableName(CteStep.AGGREGATION_FILTER);

final Optional<ColumnDateRange> validityDate;
if (cteContext.isExcludedFromDateAggregation()) {
validityDate = Optional.empty();
}
else {
validityDate = cteContext.getValidityDateRange().map(_validityDate -> _validityDate.qualify(aggregationFilterPredecessorCte));
}

ConceptSelects aggregationFilterSelect = new ConceptSelects(
cteContext.getPrimaryColumn(),
validityDate,
getAggregationFilterSelects(cteContext, aggregationFilterPredecessorCte)
public QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext) {

Selects aggregationFilterSelects = Selects.qualified(
conceptCteContext.getConceptTables().getPredecessorTableName(ConceptCteStep.AGGREGATION_FILTER),
conceptCteContext.getPrimaryColumn(),
getForAggregationFilterSelects(conceptCteContext)
);
List<Condition> aggregationFilterConditions = cteContext.getFilters().stream()
.flatMap(conceptFilter -> conceptFilter.getFilters().getGroup().stream())
.map(FilterCondition::filterCondition)
.toList();

List<Condition> aggregationFilterConditions = conceptCteContext.getFilters().stream()
.flatMap(conceptFilter -> conceptFilter.getFilters().getGroup().stream())
.map(FilterCondition::filterCondition)
.toList();

return QueryStep.builder()
.selects(aggregationFilterSelect)
.selects(aggregationFilterSelects)
.conditions(aggregationFilterConditions);
}

private List<SqlSelect> getAggregationFilterSelects(CteContext cteContext, String aggregationFilterPredecessorCte) {
return cteContext.getSelects().stream()
.flatMap(sqlSelects -> sqlSelects.getForFinalStep().stream())
// TODO: EXISTS edge case is only in a concepts final select statement and has no predecessor selects
.filter(conquerySelect -> !(conquerySelect instanceof ExistsSqlSelect))
.map(conquerySelect -> ExtractingSqlSelect.fromConquerySelect(conquerySelect, aggregationFilterPredecessorCte))
.distinct()
.collect(Collectors.toList());
private List<SqlSelect> getForAggregationFilterSelects(ConceptCteContext conceptCteContext) {
return conceptCteContext.getSelects().stream()
.flatMap(sqlSelects -> sqlSelects.getForFinalStep().stream())
// TODO: EXISTS edge case is only in a concepts final select statement and has no predecessor selects
.filter(conquerySelect -> !(conquerySelect instanceof ExistsSqlSelect))
.distinct()
.toList();
}

@Override
public CteStep cteStep() {
return CteStep.AGGREGATION_FILTER;
public ConceptCteStep cteStep() {
return ConceptCteStep.AGGREGATION_FILTER;
}

}
Original file line number Diff line number Diff line change
@@ -1,51 +1,31 @@
package com.bakdata.conquery.sql.conversion.cqelement.concept;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.conversion.model.ConceptSelects;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.Selects;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;

class AggregationSelectCte extends ConceptCte {

@Override
public QueryStep.QueryStepBuilder convertStep(CteContext cteContext) {

// all selects that are required in the aggregation filter step
String previousCteName = cteContext.getPrevious().getCteName();
List<SqlSelect> aggregationFilterSelects = cteContext.allConceptSelects()
.flatMap(sqlSelects -> sqlSelects.getForAggregationSelectStep().stream())
.distinct()
.collect(Collectors.toList());

SqlFunctionProvider functionProvider = cteContext.getContext().getSqlDialect().getFunction();
Optional<ColumnDateRange> aggregatedValidityDate = cteContext.getValidityDateRange()
.map(validityDate -> validityDate.qualify(previousCteName))
.map(functionProvider::aggregated)
.map(validityDate -> validityDate.asValidityDateRange(cteContext.getConceptLabel()));

ConceptSelects aggregationSelectSelects = new ConceptSelects(
cteContext.getPrimaryColumn(),
aggregatedValidityDate,
aggregationFilterSelects
);
public QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext) {

List<SqlSelect> requiredInAggregationFilterStep = conceptCteContext.allConceptSelects()
.flatMap(sqlSelects -> sqlSelects.getForAggregationSelectStep().stream())
.distinct()
.toList();

Selects aggregationSelectSelects = new Selects(conceptCteContext.getPrimaryColumn(), requiredInAggregationFilterStep);

return QueryStep.builder()
// pid normally
// first value for all existing selects
// date aggregation for date range
// new select for all aggregation selects and filter
.selects(aggregationSelectSelects)
.isGroupBy(true);
.groupBy(List.of(conceptCteContext.getPrimaryColumn()));
}

@Override
public CteStep cteStep() {
return CteStep.AGGREGATION_SELECT;
public ConceptCteStep cteStep() {
return ConceptCteStep.AGGREGATION_SELECT;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.bakdata.conquery.sql.conversion.cqelement.concept;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
Expand All @@ -20,14 +20,14 @@
import com.bakdata.conquery.sql.conversion.cqelement.concept.select.SelectConversions;
import com.bakdata.conquery.sql.conversion.dialect.SqlFunctionProvider;
import com.bakdata.conquery.sql.conversion.model.ColumnDateRange;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.filter.ConceptFilter;
import com.bakdata.conquery.sql.conversion.model.filter.ConditionUtil;
import com.bakdata.conquery.sql.conversion.model.filter.FilterType;
import com.bakdata.conquery.sql.conversion.model.filter.Filters;
import com.bakdata.conquery.sql.conversion.model.QueryStep;
import com.bakdata.conquery.sql.conversion.model.select.FieldWrapper;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelect;
import com.bakdata.conquery.sql.conversion.model.select.SqlSelects;
import com.bakdata.conquery.sql.conversion.model.filter.ConditionUtil;
import com.bakdata.conquery.sql.conversion.model.select.FieldSqlSelect;
import org.jooq.Condition;
import org.jooq.impl.DSL;

Expand Down Expand Up @@ -63,44 +63,58 @@ public ConversionContext convert(CQConcept node, ConversionContext context) {
}

CQTable table = node.getTables().get(0);
String tableName = table.getConnector().getTable().getName();
String conceptLabel = createConceptLabel(node, context);
Optional<ColumnDateRange> validityDateSelect = convertValidityDate(context.getSqlDialect().getFunctionProvider(), table, tableName, conceptLabel);

Set<ConceptCteStep> requiredSteps = getRequiredSteps(table, context.dateRestrictionActive(), validityDateSelect);
ConceptTables conceptTables = new ConceptTables(conceptLabel, requiredSteps, tableName);

ConceptTables conceptTables = new ConceptTables(conceptLabel, getRequiredSteps(table), table.getConnector().getTable().getName());
Optional<ColumnDateRange> validityDateSelect = convertValidityDate(context.getSqlDialect().getFunction(), table, conceptLabel, conceptTables);
List<ConceptFilter> conceptFilters = convertConceptFilters(context, table, conceptTables, validityDateSelect);
List<SqlSelects> conceptSelects = getConceptSelects(node, context, table, conceptLabel, conceptTables, validityDateSelect);

CteContext cteContext = CteContext.builder()
.context(context)
.filters(conceptFilters)
.selects(conceptSelects)
.primaryColumn(DSL.field(DSL.name(context.getConfig().getPrimaryColumn())))
.validityDateRange(validityDateSelect)
.isExcludedFromDateAggregation(node.isExcludeFromTimeAggregation())
.conceptTables(conceptTables)
.conceptLabel(conceptLabel)
.build();
ConceptCteContext conceptCteContext = ConceptCteContext.builder()
.conversionContext(context)
.filters(conceptFilters)
.selects(conceptSelects)
.primaryColumn(DSL.field(DSL.name(context.getConfig().getPrimaryColumn())))
.validityDate(validityDateSelect)
.isExcludedFromDateAggregation(node.isExcludeFromTimeAggregation())
.conceptTables(conceptTables)
.conceptLabel(conceptLabel)
.build();

Optional<QueryStep> lastQueryStep = Optional.empty();
for (ConceptCte queryStep : this.conceptCTEs) {
Optional<QueryStep> convertedStep = queryStep.convert(cteContext, lastQueryStep);
Optional<QueryStep> convertedStep = queryStep.convert(conceptCteContext, lastQueryStep);
if (convertedStep.isEmpty()) {
continue;
}
lastQueryStep = convertedStep;
cteContext = cteContext.withPrevious(lastQueryStep.get());
conceptCteContext = conceptCteContext.withPrevious(lastQueryStep.get());
}

return context.withQueryStep(lastQueryStep.orElseThrow(() -> new RuntimeException("No conversion for concept possible. Required steps: %s".formatted(requiredSteps()))));
}

private Set<CteStep> getRequiredSteps(CQTable table) {
if (table.getFilters().isEmpty()) {
return CteStep.MANDATORY_STEPS;
/**
* Determines if event/aggregation filter steps are required.
*
* <p>
* {@link ConceptCteStep#MANDATORY_STEPS} are allways part of any concept conversion.
*/
private Set<ConceptCteStep> getRequiredSteps(CQTable table, boolean dateRestrictionRequired, Optional<ColumnDateRange> validityDateSelect) {
Set<ConceptCteStep> requiredSteps = new HashSet<>(ConceptCteStep.MANDATORY_STEPS);

if (dateRestrictionApplicable(dateRestrictionRequired, validityDateSelect)) {
requiredSteps.add(ConceptCteStep.EVENT_FILTER);
}
return table.getFilters().stream()
.flatMap(filterValue -> this.filterValueConversions.requiredSteps(filterValue).stream())
.collect(Collectors.toSet());

table.getFilters().stream()
.flatMap(filterValue -> this.filterValueConversions.requiredSteps(filterValue).stream())
.forEach(requiredSteps::add);

return requiredSteps;
}

/**
Expand Down Expand Up @@ -145,35 +159,40 @@ private static String createConceptLabel(CQConcept node, ConversionContext conte
private static Optional<ColumnDateRange> convertValidityDate(
SqlFunctionProvider functionProvider,
CQTable table,
String conceptLabel,
ConceptTables conceptTables
String tableName,
String conceptLabel
) {
if (Objects.isNull(table.findValidityDate())) {
return Optional.empty();
}
return Optional.of(functionProvider.daterange(table.findValidityDate(), conceptTables.getPredecessorTableName(CteStep.PREPROCESSING), conceptLabel));
ColumnDateRange validityDate = functionProvider.daterange(table.findValidityDate(), tableName, conceptLabel);
return Optional.of(validityDate);
}

private static Optional<ConceptFilter> getDateRestriction(ConversionContext context, Optional<ColumnDateRange> validityDate) {

if (!context.dateRestrictionActive() || validityDate.isEmpty()) {
if (!dateRestrictionApplicable(context.dateRestrictionActive(), validityDate)) {
return Optional.empty();
}

ColumnDateRange dateRestriction = context.getSqlDialect().getFunction()
ColumnDateRange dateRestriction = context.getSqlDialect().getFunctionProvider()
.daterange(context.getDateRestrictionRange())
.asDateRestrictionRange();

List<SqlSelect> dateRestrictionSelects = dateRestriction.toFields().stream()
.map(FieldSqlSelect::new)
.map(FieldWrapper::new)
.collect(Collectors.toList());

Condition dateRestrictionCondition = context.getSqlDialect().getFunction().dateRestriction(dateRestriction, validityDate.get());
Condition dateRestrictionCondition = context.getSqlDialect().getFunctionProvider().dateRestriction(dateRestriction, validityDate.get());

return Optional.of(new ConceptFilter(
SqlSelects.builder().forPreprocessingStep(dateRestrictionSelects).build(),
Filters.builder().event(Collections.singletonList(ConditionUtil.wrap(dateRestrictionCondition, FilterType.EVENT))).build()
Filters.builder().event(List.of(ConditionUtil.wrap(dateRestrictionCondition, FilterType.EVENT))).build()
));
}

private static boolean dateRestrictionApplicable(boolean dateRestrictionRequired, Optional<ColumnDateRange> validityDateSelect) {
return dateRestrictionRequired && validityDateSelect.isPresent();
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.bakdata.conquery.sql.conversion.cqelement.concept;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import com.bakdata.conquery.sql.conversion.model.QueryStep;

abstract class ConceptCte {

protected Optional<QueryStep> convert(CteContext context, Optional<QueryStep> previous) {

protected Optional<QueryStep> convert(ConceptCteContext context, Optional<QueryStep> previous) {

if (!isRequired(context.getConceptTables())) {
return Optional.empty();
Expand All @@ -17,24 +17,24 @@ protected Optional<QueryStep> convert(CteContext context, Optional<QueryStep> pr
String cteName = context.getConceptTables().cteName(cteStep());
QueryStep.QueryStepBuilder queryStepBuilder = this.convertStep(context).cteName(cteName);

if (previous.isPresent()) {
queryStepBuilder.predecessors(List.of(previous.get()))
.fromTable(QueryStep.toTableLike(previous.get().getCteName()));
// only preprocessing has no previously converted step
if (previous.isEmpty()) {
queryStepBuilder.predecessors(List.of());
}
else {
// only PREPROCESSING step has no predecessor
queryStepBuilder.predecessors(Collections.emptyList())
.fromTable(QueryStep.toTableLike(context.getConceptTables().getPredecessorTableName(CteStep.PREPROCESSING)));
// if interval packing takes place, fromTable and predecessors of the final concept step are already set
else if (queryStepBuilder.build().getFromTable() == null && queryStepBuilder.build().getPredecessors().isEmpty()) {
queryStepBuilder.fromTable(QueryStep.toTableLike(previous.get().getCteName()))
.predecessors(List.of(previous.get()));
}
return Optional.of(queryStepBuilder.build());
}

/**
* @return The {@link CteStep} this instance belongs to.
* @return The {@link ConceptCteStep} this instance belongs to.
*/
protected abstract CteStep cteStep();
protected abstract ConceptCteStep cteStep();

protected abstract QueryStep.QueryStepBuilder convertStep(CteContext cteContext);
protected abstract QueryStep.QueryStepBuilder convertStep(ConceptCteContext conceptCteContext);

private boolean isRequired(ConceptTables conceptTables) {
return conceptTables.isRequiredStep(cteStep());
Expand Down

0 comments on commit ac1a397

Please sign in to comment.