Skip to content

Commit

Permalink
Merge branch 'feature/self-join-unification-improved' into version4
Browse files Browse the repository at this point in the history
  • Loading branch information
bcogrel committed Nov 12, 2019
2 parents fbec8c9 + daeecde commit ed1a630
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 46 deletions.
Expand Up @@ -189,7 +189,7 @@ public Optional<ImmutableSubstitution<NonFunctionalTerm>> computeMGUS2(Immutable
* Computes one Most General Unifier (MGU) of (two) substitutions.
*/
public Optional<ImmutableSubstitution<ImmutableTerm>> computeMGUS(ImmutableSubstitution<? extends ImmutableTerm> substitution1,
ImmutableSubstitution<? extends ImmutableTerm> substitution2) {
ImmutableSubstitution<? extends ImmutableTerm> substitution2) {

ImmutableList.Builder<ImmutableTerm> firstArgListBuilder = ImmutableList.builder();
ImmutableList.Builder<ImmutableTerm> secondArgListBuilder = ImmutableList.builder();
Expand All @@ -214,13 +214,8 @@ public Optional<ImmutableSubstitution<VariableOrGroundTerm>> computeAtomMGUS(
ImmutableSubstitution<VariableOrGroundTerm> substitution1,
ImmutableSubstitution<VariableOrGroundTerm> substitution2) {
Optional<ImmutableSubstitution<ImmutableTerm>> optionalMGUS = computeMGUS(substitution1, substitution2);
if (optionalMGUS.isPresent()) {
return Optional.of(substitutionTools.convertIntoVariableOrGroundTermSubstitution(
optionalMGUS.get()));
}
else {
return Optional.empty();
}
return optionalMGUS
.map(substitutionTools::convertIntoVariableOrGroundTermSubstitution);
}


Expand Down
Expand Up @@ -5,7 +5,6 @@
import com.google.common.collect.ImmutableMap;
import it.unibz.inf.ontop.model.term.Constant;
import it.unibz.inf.ontop.model.term.ImmutableTerm;
import it.unibz.inf.ontop.model.term.NonConstantTerm;
import it.unibz.inf.ontop.model.term.Variable;
import org.junit.Test;

Expand Down
Expand Up @@ -3,22 +3,28 @@
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import it.unibz.inf.ontop.injection.IntermediateQueryFactory;
import it.unibz.inf.ontop.iq.IntermediateQuery;
import it.unibz.inf.ontop.iq.exception.EmptyQueryException;
import it.unibz.inf.ontop.iq.exception.InvalidQueryOptimizationProposalException;
import it.unibz.inf.ontop.iq.impl.QueryTreeComponent;
import it.unibz.inf.ontop.iq.node.DataNode;
import it.unibz.inf.ontop.iq.node.EmptyNode;
import it.unibz.inf.ontop.iq.node.ExtensionalDataNode;
import it.unibz.inf.ontop.iq.node.InnerJoinNode;
import it.unibz.inf.ontop.iq.proposal.InnerJoinOptimizationProposal;
import it.unibz.inf.ontop.iq.proposal.NodeCentricOptimizationResults;
import it.unibz.inf.ontop.iq.proposal.impl.NodeCentricOptimizationResultsImpl;
import it.unibz.inf.ontop.model.atom.RelationPredicate;
import it.unibz.inf.ontop.model.term.ImmutableExpression;
import it.unibz.inf.ontop.model.term.TermFactory;
import it.unibz.inf.ontop.model.term.Variable;
import it.unibz.inf.ontop.model.term.VariableOrGroundTerm;
import it.unibz.inf.ontop.substitution.ImmutableSubstitution;
import it.unibz.inf.ontop.substitution.SubstitutionFactory;
import it.unibz.inf.ontop.substitution.impl.ImmutableUnificationTools;
import it.unibz.inf.ontop.utils.ImmutableCollectors;

import java.util.Optional;

Expand All @@ -37,12 +43,14 @@ public abstract class RedundantSelfJoinExecutor extends SelfJoinLikeExecutor imp
*/
private static final int MAX_ITERATIONS = 100;
private final IntermediateQueryFactory iqFactory;
private final TermFactory termFactory;

protected RedundantSelfJoinExecutor(IntermediateQueryFactory iqFactory,
SubstitutionFactory substitutionFactory,
ImmutableUnificationTools unificationTools, TermFactory termFactory) {
super(substitutionFactory, unificationTools, termFactory);
this.iqFactory = iqFactory;
this.termFactory = termFactory;
}


Expand Down Expand Up @@ -137,7 +145,38 @@ private Optional<ConcreteProposal> propose(InnerJoinNode joinNode, ImmutableMult
predicateProposal.ifPresent(proposalListBuilder::add);
}

return createConcreteProposal(proposalListBuilder.build(), priorityVariables);
return createConcreteProposal(proposalListBuilder.build(), initialDataNodeMap, priorityVariables);
}

protected Optional<ConcreteProposal> createConcreteProposal(
ImmutableList<PredicateLevelProposal> predicateProposals,
ImmutableMultimap<RelationPredicate, ExtensionalDataNode> initialDataNodeMap, ImmutableList<Variable> priorityVariables) {



Optional<ImmutableSubstitution<VariableOrGroundTerm>> optionalMergedSubstitution;
try {
optionalMergedSubstitution = mergeSubstitutions(extractSubstitutions(predicateProposals), initialDataNodeMap, priorityVariables);
} catch (AtomUnificationException e) {
return Optional.empty();
}

ImmutableSet<DataNode> removedDataNodes =predicateProposals.stream()
.flatMap(p -> p.getRemovedDataNodes().stream())
.collect(ImmutableCollectors.toSet());

if (removedDataNodes.isEmpty()
&& (! optionalMergedSubstitution.isPresent()))
return Optional.empty();

Optional<ImmutableExpression> isNotConjunction = termFactory.getConjunction(predicateProposals.stream()
.map(PredicateLevelProposal::getIsNotNullConjunction)
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(ImmutableExpression::flattenAND)
.distinct());

return Optional.of(new ConcreteProposal(optionalMergedSubstitution, removedDataNodes, isNotConjunction));
}

protected abstract Optional<PredicateLevelProposal> proposePerPredicate(InnerJoinNode joinNode, ImmutableCollection<ExtensionalDataNode> initialNodes,
Expand Down
@@ -1,6 +1,7 @@
package it.unibz.inf.ontop.iq.executor.join;

import com.google.common.collect.*;
import it.unibz.inf.ontop.exception.MinorOntopInternalBugException;
import it.unibz.inf.ontop.iq.exception.EmptyQueryException;
import it.unibz.inf.ontop.iq.node.*;
import it.unibz.inf.ontop.model.atom.DataAtom;
Expand All @@ -20,6 +21,8 @@
import it.unibz.inf.ontop.model.term.VariableOrGroundTerm;
import it.unibz.inf.ontop.substitution.ImmutableSubstitution;
import it.unibz.inf.ontop.utils.ImmutableCollectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.stream.Stream;
Expand All @@ -28,6 +31,8 @@

public class SelfJoinLikeExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(SelfJoinLikeExecutor.class);

/**
* TODO: explain
*
Expand Down Expand Up @@ -219,6 +224,7 @@ protected PredicateLevelProposal proposeForGroupingMap(
})
.filter(s -> !s.isEmpty())
.collect(ImmutableCollectors.toSet());

/*
* All the nodes that have been at least once dominated (--> could thus be removed).
*
Expand Down Expand Up @@ -249,34 +255,6 @@ protected PredicateLevelProposal proposeForGroupingMap(
}
}

protected Optional<ConcreteProposal> createConcreteProposal(
ImmutableList<PredicateLevelProposal> predicateProposals,
ImmutableList<Variable> priorityVariables) {
Optional<ImmutableSubstitution<VariableOrGroundTerm>> optionalMergedSubstitution;
try {
optionalMergedSubstitution = mergeSubstitutions(extractSubstitutions(predicateProposals), priorityVariables);
} catch (AtomUnificationException e) {
return Optional.empty();
}

ImmutableSet<DataNode> removedDataNodes =predicateProposals.stream()
.flatMap(p -> p.getRemovedDataNodes().stream())
.collect(ImmutableCollectors.toSet());

if (removedDataNodes.isEmpty()
&& (! optionalMergedSubstitution.isPresent()))
return Optional.empty();

Optional<ImmutableExpression> isNotConjunction = termFactory.getConjunction(predicateProposals.stream()
.map(PredicateLevelProposal::getIsNotNullConjunction)
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(ImmutableExpression::flattenAND)
.distinct());

return Optional.of(new ConcreteProposal(optionalMergedSubstitution, removedDataNodes, isNotConjunction));
}


protected ImmutableSubstitution<VariableOrGroundTerm> unifyRedundantNodes(
Collection<ExtensionalDataNode> redundantNodes) throws AtomUnificationException {
Expand All @@ -291,31 +269,109 @@ protected ImmutableSubstitution<VariableOrGroundTerm> unifyRedundantNodes(
return accumulatedSubstitution;
}

ImmutableMap<Variable, Collection<ExtensionalDataNode>> occurrenceVariableMap = redundantNodes.stream()
.flatMap(n -> n.getVariables().stream()
.map(v -> Maps.immutableEntry(v, n)))
.collect(ImmutableCollectors.toMultimap()).asMap();

// Variables used in more than one data node
ImmutableSet<Variable> sharedVariables = occurrenceVariableMap.entrySet().stream()
.filter(e -> ImmutableSet.copyOf(e.getValue()).size() > 1)
.map(Map.Entry::getKey)
.collect(ImmutableCollectors.toSet());
ImmutableSet<Variable> nonSharedVariables = Sets.difference(occurrenceVariableMap.keySet(), sharedVariables)
.immutableCopy();

Iterator<ExtensionalDataNode> nodeIterator = redundantNodes.iterator();

/*
* For performance purposes, we can detach some fragments from the substitution to be "unified" with the following atom.
*/
ImmutableList.Builder<ImmutableSubstitution<VariableOrGroundTerm>> nonSharedSubstitutionListBuilder = ImmutableList.builder();

// Non-final
DataAtom accumulatedAtom = nodeIterator.next().getProjectionAtom();

while (nodeIterator.hasNext()) {
DataAtom newAtom = nodeIterator.next().getProjectionAtom();

/*
* Before the following unification, we detach a fragment about non-shared variables from the accumulated substitution
*
* Particularly useful when dealing with tables with a large number of columns (e.g. views after collapsing some JSON objects)
*
*/
ImmutableSubstitution<VariableOrGroundTerm> nonSharedSubstitution = accumulatedSubstitution.reduceDomainToIntersectionWith(nonSharedVariables);
if (!nonSharedSubstitution.isEmpty())
nonSharedSubstitutionListBuilder.add(nonSharedSubstitution);

ImmutableSubstitution<VariableOrGroundTerm> substitutionToUnify = nonSharedSubstitution.isEmpty()
? accumulatedSubstitution
: accumulatedSubstitution.reduceDomainToIntersectionWith(sharedVariables);

// May throw an exception
accumulatedSubstitution = updateSubstitution(accumulatedSubstitution, accumulatedAtom, newAtom);
accumulatedSubstitution = updateSubstitution(substitutionToUnify, accumulatedAtom, newAtom);

accumulatedAtom = accumulatedSubstitution.applyToDataAtom(accumulatedAtom);
}
return accumulatedSubstitution;

return Stream.concat(
nonSharedSubstitutionListBuilder.build().stream(),
Stream.of(accumulatedSubstitution))
.reduce((v1, v2) -> v2.composeWith2(v1))
.orElseThrow(() -> new MinorOntopInternalBugException("At least one substitution was expected"));
}

protected Optional<ImmutableSubstitution<VariableOrGroundTerm>> mergeSubstitutions(
ImmutableList<ImmutableSubstitution<VariableOrGroundTerm>> substitutions, ImmutableList<Variable> priorityVariables)
ImmutableList<ImmutableSubstitution<VariableOrGroundTerm>> substitutions,
ImmutableMultimap<RelationPredicate, ExtensionalDataNode> initialDataNodeMap,
ImmutableList<Variable> priorityVariables)
throws AtomUnificationException {

ImmutableMap<Variable, Collection<RelationPredicate>> occurrenceVariableMap = initialDataNodeMap.asMap().entrySet().stream()
.flatMap(e -> e.getValue().stream()
.flatMap(n -> n.getVariables().stream())
.map(v -> Maps.immutableEntry(v, e.getKey())))
.collect(ImmutableCollectors.toMultimap()).asMap();

// Variables appearing for for more one relation predicate
ImmutableSet<Variable> sharedVariables = occurrenceVariableMap.entrySet().stream()
.filter(e -> ImmutableSet.copyOf(e.getValue()).size() > 1)
.map(Map.Entry::getKey)
.collect(ImmutableCollectors.toSet());
ImmutableSet<Variable> nonSharedVariables = Sets.difference(occurrenceVariableMap.keySet(), sharedVariables)
.immutableCopy();

/*
* For performance purposes, we can detach some fragments from the substitution to be "unified" with the following atom.
*/
ImmutableList.Builder<ImmutableSubstitution<VariableOrGroundTerm>> nonSharedSubstitutionListBuilder = ImmutableList.builder();

// Non-final
Optional<ImmutableSubstitution<VariableOrGroundTerm>> optionalAccumulatedSubstitution = Optional.empty();

for (ImmutableSubstitution<VariableOrGroundTerm> substitution : substitutions) {
if (!substitution.isEmpty()) {
if (optionalAccumulatedSubstitution.isPresent()) {

ImmutableSubstitution<VariableOrGroundTerm> accumulatedSubstitution = optionalAccumulatedSubstitution.get();

/*
* Before the following unification, we detach a fragment about non-shared variables from the accumulated substitution
*
* Particularly useful when dealing with tables with a large number of columns (e.g. views after collapsing some JSON objects)
*
*/
ImmutableSubstitution<VariableOrGroundTerm> nonSharedSubstitution = accumulatedSubstitution.reduceDomainToIntersectionWith(nonSharedVariables);
if (!nonSharedSubstitution.isEmpty())
nonSharedSubstitutionListBuilder.add(nonSharedSubstitution);

ImmutableSubstitution<VariableOrGroundTerm> substitutionToUnify = nonSharedSubstitution.isEmpty()
? accumulatedSubstitution
: accumulatedSubstitution.reduceDomainToIntersectionWith(sharedVariables);

Optional<ImmutableSubstitution<VariableOrGroundTerm>> optionalMGUS = unificationTools.computeAtomMGUS(
optionalAccumulatedSubstitution.get(), substitution);
substitutionToUnify, substitution);
if (optionalMGUS.isPresent()) {
optionalAccumulatedSubstitution = optionalMGUS;
}
Expand All @@ -331,6 +387,11 @@ protected Optional<ImmutableSubstitution<VariableOrGroundTerm>> mergeSubstitutio
}

return optionalAccumulatedSubstitution
.map(s -> Stream.concat(
nonSharedSubstitutionListBuilder.build().stream(),
Stream.of(s))
.reduce((v1, v2) -> v2.composeWith2(v1))
.orElseThrow(() -> new MinorOntopInternalBugException("At least one substitution was expected")))
.map(s -> s.orientate(priorityVariables));
}

Expand Down
Expand Up @@ -58,9 +58,12 @@ public IQ optimize(IQ query, ExecutorRegistry executorRegistry) {

LOGGER.debug("After projection shrinking: \n" + intermediateQuery.toString());


long beginningJoinLike = System.currentTimeMillis();
intermediateQuery = joinLikeOptimizer.optimize(intermediateQuery);
LOGGER.debug("New query after fixed point join optimization: \n" + intermediateQuery.toString());
LOGGER.debug(String.format(
"New query after fixed point join optimization (%d ms): \n%s",
System.currentTimeMillis() - beginningJoinLike,
intermediateQuery.toString()));

intermediateQuery = flattenUnionOptimizer.optimize(intermediateQuery);
LOGGER.debug("New query after flattening Unions: \n" + intermediateQuery.toString());
Expand Down
Expand Up @@ -85,6 +85,8 @@ private QuestQueryProcessor(@Assisted OBDASpecification obdaSpecification,
public IQ reformulateIntoNativeQuery(InputQuery inputQuery)
throws OntopReformulationException {

long beginning = System.currentTimeMillis();

IQ cachedQuery = queryCache.get(inputQuery);
if (cachedQuery != null)
return cachedQuery;
Expand All @@ -103,8 +105,10 @@ public IQ reformulateIntoNativeQuery(InputQuery inputQuery)
log.debug("Start the unfolding...");

IQ unfoldedIQ = queryUnfolder.optimize(rewrittenIQ);
if (unfoldedIQ.getTree().isDeclaredAsEmpty())
return unfoldedIQ;
if (unfoldedIQ.getTree().isDeclaredAsEmpty()) {
log.debug(String.format("Reformulation time: %d ms", System.currentTimeMillis() - beginning));
return unfoldedIQ;
}
log.debug("Unfolded query: \n" + unfoldedIQ.toString());

IQ optimizedQuery = generalOptimizer.optimize(unfoldedIQ, executorRegistry);
Expand All @@ -113,6 +117,7 @@ public IQ reformulateIntoNativeQuery(InputQuery inputQuery)

IQ executableQuery = generateExecutableQuery(plannedQuery);
queryCache.put(inputQuery, executableQuery);
log.debug(String.format("Reformulation time: %d ms", System.currentTimeMillis() - beginning));
return executableQuery;

}
Expand Down

0 comments on commit ed1a630

Please sign in to comment.