Skip to content

Commit

Permalink
Merge pull request #447 from ryankwagner/multiEngineIssue
Browse files Browse the repository at this point in the history
Add RowGrouping to fix Row Merging on MultiEngine Queries
  • Loading branch information
pranavbhole committed May 9, 2019
2 parents 712ae25 + 6a3d7d7 commit b7c4298
Show file tree
Hide file tree
Showing 27 changed files with 541 additions and 117 deletions.
21 changes: 17 additions & 4 deletions core/src/main/scala/com/yahoo/maha/core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import scala.collection.SortedSet
sealed trait QueryContext {
def requestModel: RequestModel
def indexAliasOption: Option[String]
def factGroupByKeys: List[String]
def primaryTableName: String
}

Expand Down Expand Up @@ -59,17 +60,20 @@ case object DimFactOuterGroupByQuery extends QueryType

case class DimQueryContext private[query](dims: SortedSet[DimensionBundle],
requestModel: RequestModel,
indexAliasOption: Option[String],
indexAliasOption: Option[String],
factGroupByKeys: List[String],
queryAttributes: QueryAttributes= QueryAttributes.empty) extends DimensionQueryContext
case class FactQueryContext private[query](factBestCandidate: FactBestCandidate,
requestModel: RequestModel,
indexAliasOption: Option[String],
indexAliasOption: Option[String],
factGroupByKeys: List[String],
queryAttributes: QueryAttributes) extends FactualQueryContext
case class CombinedQueryContext private[query](dims: SortedSet[DimensionBundle],
factBestCandidate: FactBestCandidate,
requestModel: RequestModel,
queryAttributes: QueryAttributes) extends DimensionQueryContext with FactualQueryContext {
val indexAliasOption = None
val factGroupByKeys = List.empty
override def primaryTableName: String = {
if(requestModel.isDimDriven) {
dims.last.dim.name
Expand All @@ -84,6 +88,7 @@ case class DimFactOuterGroupByQueryQueryContext(dims: SortedSet[DimensionBundle]
requestModel: RequestModel,
queryAttributes: QueryAttributes) extends DimensionQueryContext with FactualQueryContext {
override def indexAliasOption: Option[String] = None
override def factGroupByKeys: List[String] = List.empty
override def primaryTableName: String = factBestCandidate.fact.name

lazy val shouldQualifyFactsInPreOuter: Boolean = {
Expand Down Expand Up @@ -143,6 +148,7 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
var dims : SortedSet[DimensionBundle] = SortedSet.empty
var factBestCandidate : Option[FactBestCandidate] = None
var indexAliasOption : Option[String] = None
var factGroupByKeys : List[String] = List.empty
var queryAttributes : QueryAttributes = QueryAttributes.empty

def addDimTable(dimension: DimensionBundle) = {
Expand Down Expand Up @@ -170,6 +176,13 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
this
}

def addFactGroupByKeys(factGroupByKeyList: List[String]) = {
require(queryType != DimFactQuery, "dim fact query doesn't use Group By keys")
require(factGroupByKeys.isEmpty, s"fact group by is already defined : factGroupByKeys=${factGroupByKeys.toString()}, cannot set to ${factGroupByKeyList.toString()}")
this.factGroupByKeys = factGroupByKeyList
this
}

def setQueryAttributes(queryAttributes: QueryAttributes) = {
this.queryAttributes = queryAttributes
this
Expand All @@ -179,10 +192,10 @@ class QueryContextBuilder(queryType: QueryType, requestModel: RequestModel) {
queryType match {
case DimOnlyQuery =>
require(dims.nonEmpty, "dim only query should not have dimension empty")
DimQueryContext(dims, requestModel, indexAliasOption, queryAttributes)
DimQueryContext(dims, requestModel, indexAliasOption, factGroupByKeys, queryAttributes)
case FactOnlyQuery =>
require(factBestCandidate.isDefined, "fact only query should have fact defined")
FactQueryContext(factBestCandidate.get, requestModel, indexAliasOption, queryAttributes)
FactQueryContext(factBestCandidate.get, requestModel, indexAliasOption, factGroupByKeys, queryAttributes)
case DimFactQuery =>
require(factBestCandidate.isDefined, "dim fact query should have fact defined")
CombinedQueryContext(dims, factBestCandidate.get, requestModel, queryAttributes)
Expand Down
32 changes: 21 additions & 11 deletions core/src/main/scala/com/yahoo/maha/core/query/QueryPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ object QueryPipeline extends Logging {
val asyncDisqualifyingSet: Set[Engine] = Set(DruidEngine)

val completeRowList: Query => RowList = (q) => new CompleteRowList(q)
val dimDrivenPartialRowList: Query => RowList = (q) => new DimDrivenPartialRowList(q.queryContext.indexAliasOption.get, q)
val dimDrivenFactOrderedPartialRowList: Query => RowList = (q) => new DimDrivenFactOrderedPartialRowList(q.queryContext.indexAliasOption.get, q)
val factDrivenPartialRowList: Query => RowList = (q) => new FactDrivenPartialRowList(q.queryContext.indexAliasOption.get, q)
val dimDrivenPartialRowList: Query => RowList = (q) => {
new DimDrivenPartialRowList(RowGrouping(q.queryContext.indexAliasOption.get, List(q.queryContext.indexAliasOption.get)), q)
}
val dimDrivenFactOrderedPartialRowList: Query => RowList = (q) => {
new DimDrivenFactOrderedPartialRowList(RowGrouping(q.queryContext.indexAliasOption.get, List(q.queryContext.indexAliasOption.get)), q)}
val factDrivenPartialRowList: Query => RowList = (q) => {
val indexAlias = q.queryContext.indexAliasOption.get
val groupByKeys: List[String] = q.queryContext.factGroupByKeys
new FactDrivenPartialRowList(RowGrouping(indexAlias, groupByKeys), q)
}

def unionViewPartialRowList(queryList: List[Query]): Query => RowList = {
require(queryList.forall(q => q.queryContext.isInstanceOf[FactualQueryContext] && q.queryContext.asInstanceOf[FactualQueryContext].factBestCandidate.fact.isInstanceOf[ViewBaseTable])
Expand Down Expand Up @@ -86,7 +93,7 @@ object QueryPipeline extends Logging {
case any =>
throw new UnsupportedOperationException(s"Cannot create UnionViewRowList with fact of type : ${any.getClass.getSimpleName}")
}
case DimQueryContext(_, _, _, _) =>
case DimQueryContext(_, _, _, _, _) =>
throw new IllegalArgumentException(s"Requested UnionViewRowList in DimQueryContext")
case _ =>
throw new IllegalArgumentException(s"Requested UnionViewRowList in Unhandled/UnknownQueryContext")
Expand Down Expand Up @@ -675,24 +682,27 @@ class DefaultQueryPipelineFactory(implicit val queryGeneratorRegistry: QueryGene
private[this] def getMultiEngineDimQuery(bestDimCandidates: SortedSet[DimensionBundle],
requestModel: RequestModel,
indexAlias: String,
factGroupByKeys: List[String],
queryAttributes: QueryAttributes,
queryGenVersion: Version): Query = {
val dimOnlyContextBuilder = QueryContext
.newQueryContext(DimOnlyQuery, requestModel)
.addDimTable(bestDimCandidates)
.addIndexAlias(indexAlias)
.addFactGroupByKeys(factGroupByKeys)
.setQueryAttributes(queryAttributes)

require(queryGeneratorRegistry.isEngineRegistered(bestDimCandidates.head.dim.engine, Option(queryGenVersion))
, s"Failed to find query generator for engine : ${bestDimCandidates.head.dim.engine}")
queryGeneratorRegistry.getValidGeneratorForVersion(bestDimCandidates.head.dim.engine, queryGenVersion, Option(requestModel)).get.generate(dimOnlyContextBuilder.build())
}

private[this] def getFactQuery(bestFactCandidate: FactBestCandidate, requestModel: => RequestModel, indexAlias: String, queryGenVersion: Version): Query = {
private[this] def getFactQuery(bestFactCandidate: FactBestCandidate, requestModel: => RequestModel, indexAlias: String, factGroupByKeys: List[String], queryGenVersion: Version): Query = {
val factOnlyContextBuilder = QueryContext
.newQueryContext(FactOnlyQuery, requestModel)
.addFactBestCandidate(bestFactCandidate)
.addIndexAlias(indexAlias)
.addFactGroupByKeys(factGroupByKeys)
require(queryGeneratorRegistry.isEngineRegistered(bestFactCandidate.fact.engine, Some(queryGenVersion))
, s"Failed to find query generator for engine : ${bestFactCandidate.fact.engine}")
queryGeneratorRegistry.getValidGeneratorForVersion(bestFactCandidate.fact.engine, queryGenVersion, Option(requestModel)).get.generate(factOnlyContextBuilder.build())
Expand Down Expand Up @@ -851,14 +861,14 @@ OuterGroupBy operation has to be applied only in the following cases
if (!requestModel.hasFactSortBy && requestModel.forceDimDriven) {
//oracle + druid
requestDebug("dimQueryThenFactQuery")
val dimQuery = getMultiEngineDimQuery(bestDimCandidates, requestModel, indexAlias, queryAttributes, queryGenVersion)
val dimQuery = getMultiEngineDimQuery(bestDimCandidates, requestModel, indexAlias, List(indexAlias), queryAttributes, queryGenVersion)
val subsequentQuery: (IndexedRowList, QueryAttributes) => Query = {
case (irl, subqueryAttributes) =>
val field = irl.indexAlias
val field = irl.rowGrouping.indexAlias
val values = irl.keys.toList.map(_.toString)
val filter = InFilter(field, values)
val injectedFactBestCandidate = factOnlyInjectFilter(factBestCandidateOption.get, filter)
val query = getFactQuery(injectedFactBestCandidate, requestModel, indexAlias, queryGenVersion)
val query = getFactQuery(injectedFactBestCandidate, requestModel, indexAlias, List(indexAlias), queryGenVersion)
irl.addSubQuery(query)
query
}
Expand All @@ -879,10 +889,10 @@ OuterGroupBy operation has to be applied only in the following cases
//since druid + oracle doesn't support row count
requestDebug("factQueryThenDimQuery")
val noRowCountRequestModel = requestModel.copy(includeRowCount = false)
val factQuery = getFactQuery(factBestCandidateOption.get, noRowCountRequestModel, indexAlias, queryGenVersion)
val factQuery = getFactQuery(factBestCandidateOption.get, noRowCountRequestModel, indexAlias, List(indexAlias), queryGenVersion)
val subsequentQuery: (IndexedRowList, QueryAttributes) => Query = {
case (irl, subqueryqueryAttributes) =>
val field = irl.indexAlias
val field = irl.rowGrouping.indexAlias
val values = irl.keys.toList.map(_.toString)
val valuesSize = values.size
if (values.nonEmpty && (valuesSize >= noRowCountRequestModel.maxRows || noRowCountRequestModel.startIndex <= 0)) {
Expand All @@ -896,7 +906,7 @@ OuterGroupBy operation has to be applied only in the following cases
}
queryAttributesBuilder.build
}
getMultiEngineDimQuery(bestDimCandidates, noRowCountRequestModel, indexAlias, injectedAttributes, queryGenVersion)
getMultiEngineDimQuery(bestDimCandidates, noRowCountRequestModel, indexAlias, List(indexAlias), injectedAttributes, queryGenVersion)
} else {
QueryChain.logger.info("No data returned from druid, should run fallback query if there is one")
NoopQuery
Expand Down
Loading

0 comments on commit b7c4298

Please sign in to comment.