From fb56f86f353a27d70ec8fcfc220aaa154f7032aa Mon Sep 17 00:00:00 2001 From: ashwini-krishnakumar Date: Thu, 7 Sep 2017 07:36:32 +0000 Subject: [PATCH] [CARBONDATA-649] fix for update with rand function This closes #1296 --- .../iud/UpdateCarbonTableTestCase.scala | 30 ++++++++++ .../sql/CustomDeterministicExpression.scala | 41 ++++++++++++++ .../spark/sql/hive/CarbonStrategies.scala | 52 ++++++++++-------- .../spark/sql/optimizer/CarbonOptimizer.scala | 55 +++++++++++++++---- .../sql/CustomDeterministicExpression.scala | 42 ++++++++++++++ .../execution/CarbonLateDecodeStrategy.scala | 49 +++++++++-------- .../sql/optimizer/CarbonLateDecodeRule.scala | 43 +++++++++++++-- 7 files changed, 251 insertions(+), 61 deletions(-) create mode 100644 integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala create mode 100644 integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 623416b1a50..4186fa217cc 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -448,6 +448,36 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS default.carbon1") } + test("update table in carbondata with rand() ") { + + sql("""CREATE TABLE iud.rand(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """) + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud.rand OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,age,task,num,level,name')""").collect + + sql("select substring(name,1,2 ) , name ,getTupleId() as tupleId , rand() from iud.rand").show(100) + + sql("select name , substring(name,1,2 ) ,getTupleId() as tupleId , num , rand() from iud.rand").show(100) + + sql("Update rand set (num) = (rand())").show() + + sql("Update rand set (num) = (rand(9))").show() + + sql("Update rand set (name) = ('Lily')").show() + + sql("select name , num from iud.rand").show(100) + + sql("select imei , age , name , num from iud.rand").show(100) + + sql("select rand() , getTupleId() as tupleId from iud.rand").show(100) + + sql("select * from iud.rand").show(100) + + sql("select imei , rand() , num from iud.rand").show(100) + + sql("select name , rand() from iud.rand").show(100) + + sql("DROP TABLE IF EXISTS iud.rand") + } + override def afterAll { sql("use default") sql("drop database if exists iud cascade") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala new file mode 100644 index 00000000000..d745be20707 --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} +import org.apache.spark.sql.types.{DataType, StringType} + +/** + * Custom expression to override the deterministic property + * + */ +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{ + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = null + + override protected def genCode(ctx: CodeGenContext, + ev: GeneratedExpressionCode): String = ev.code + override def deterministic: Boolean = true + + override def dataType: DataType = StringType + + override def children: Seq[Expression] = Seq() +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 13ff2a9c8b0..204225b7769 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand} import org.apache.spark.sql.hive.execution.command._ -import org.apache.spark.sql.optimizer.CarbonDecoderRelation +import org.apache.spark.sql.optimizer.{CarbonDecoderRelation} import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType @@ -63,15 +63,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { case PhysicalOperation(projectList, predicates, l: LogicalRelation) - if l.relation.isInstanceOf[CarbonDatasourceRelation] => + if l.relation.isInstanceOf[CarbonDatasourceRelation] => if (isStarQuery(plan)) { carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil } else { carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil } case InsertIntoCarbonTable(relation: CarbonDatasourceRelation, - _, child: LogicalPlan, overwrite, _) => - ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil + _, child: LogicalPlan, overwrite, _) => + ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => CarbonDictionaryDecoder(relations, profile, @@ -85,21 +85,27 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { /** * Create carbon scan */ - private def carbonRawScan(projectList: Seq[NamedExpression], - predicates: Seq[Expression], - logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { + private def carbonRawScan(projectListRaw: Seq[NamedExpression], + predicates: Seq[Expression], + logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation] val tableName: String = relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase // Check out any expressions are there in project list. if they are present then we need to // decode them as well. + + val projectList = projectListRaw.map {p => + p.transform { + case CustomDeterministicExpression(exp) => exp + } + }.asInstanceOf[Seq[NamedExpression]] val newProjectList = projectList.map { element => element match { case a@Alias(s: ScalaUDF, name) if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) => + name.equalsIgnoreCase( + CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) => AttributeReference(name, StringType, true)().withExprId(a.exprId) case other => other } @@ -154,8 +160,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { * Create carbon scan for star query */ private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression], - predicates: Seq[Expression], - logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { + predicates: Seq[Expression], + logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation] val tableName: String = relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase @@ -194,10 +200,10 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } def getCarbonDecoder(logicalRelation: LogicalRelation, - sc: SQLContext, - tableName: String, - projectExprsNeedToDecode: Seq[Attribute], - scan: CarbonScan): CarbonDictionaryDecoder = { + sc: SQLContext, + tableName: String, + projectExprsNeedToDecode: Seq[Attribute], + scan: CarbonScan): CarbonDictionaryDecoder = { val relation = CarbonDecoderRelation(logicalRelation.attributeMap, logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]) val attrs = projectExprsNeedToDecode.map { attr => @@ -227,7 +233,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { relation: CarbonDatasourceRelation, allAttrsNotDecode: util.Set[Attribute]): AttributeReference = { if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) && - !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) { + !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) { AttributeReference(attr.name, IntegerType, attr.nullable, @@ -240,7 +246,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { private def isStarQuery(plan: LogicalPlan) = { plan match { case LogicalFilter(condition, l: LogicalRelation) - if l.relation.isInstanceOf[CarbonDatasourceRelation] => + if l.relation.isInstanceOf[CarbonDatasourceRelation] => true case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true case _ => false @@ -252,7 +258,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case DropTable(tableName, ifNotExists) if CarbonEnv.get.carbonMetastore - .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) => + .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) => val identifier = toTableIdentifier(tableName.toLowerCase) ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil case ShowLoadsCommand(databaseName, table, limit) => @@ -260,7 +266,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath, options, isOverwriteExist, inputSqlString, dataFrame, _) => val isCarbonTable = CarbonEnv.get.carbonMetastore - .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext) + .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext) if (isCarbonTable || options.nonEmpty) { ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath, options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil @@ -269,15 +275,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } case alterTable@AlterTableCompaction(altertablemodel) => val isCarbonTable = CarbonEnv.get.carbonMetastore - .tableExists(TableIdentifier(altertablemodel.tableName, - altertablemodel.dbName))(sqlContext) + .tableExists(TableIdentifier(altertablemodel.tableName, + altertablemodel.dbName))(sqlContext) if (isCarbonTable) { if (altertablemodel.compactionType.equalsIgnoreCase("minor") || altertablemodel.compactionType.equalsIgnoreCase("major")) { ExecutedCommand(alterTable) :: Nil } else { throw new MalformedCarbonCommandException( - "Unsupported alter operation on carbon table") + "Unsupported alter operation on carbon table") } } else { ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil @@ -305,7 +311,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } case DescribeFormattedCommand(sql, tblIdentifier) => val isTable = CarbonEnv.get.carbonMetastore - .tableExists(tblIdentifier)(sqlContext) + .tableExists(tblIdentifier)(sqlContext) if (isTable) { val describe = LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala index 02ac5f88246..914203f769f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala @@ -59,7 +59,7 @@ object CarbonOptimizer { } } -// get the carbon relation from plan. + // get the carbon relation from plan. def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = { plan collect { case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => @@ -73,7 +73,7 @@ object CarbonOptimizer { * decoder plan. */ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) - extends Rule[LogicalPlan] with PredicateHelper { + extends Rule[LogicalPlan] with PredicateHelper { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) def apply(logicalPlan: LogicalPlan): LogicalPlan = { if (relations.nonEmpty && !isOptimized(logicalPlan)) { @@ -101,7 +101,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) val newPlan = updatePlan transform { case Project(pList, child) if (!isTransformed) => val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList - .splitAt(pList.size - cols.size) + .splitAt(pList.size - cols.size) val diff = cols.diff(dest.map(_.name)) if (diff.size > 0) { sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}") @@ -284,7 +284,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) case union: Union if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] || - union.right.isInstanceOf[CarbonDictionaryTempDecoder]) => + union.right.isInstanceOf[CarbonDictionaryTempDecoder]) => val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] val leftLocalAliasMap = CarbonAliasDecoderRelation() @@ -369,7 +369,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) } } else { CarbonFilters - .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap) + .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap) } var child = filter.child @@ -391,7 +391,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) case j: Join if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] || - j.right.isInstanceOf[CarbonDictionaryTempDecoder]) => + j.right.isInstanceOf[CarbonDictionaryTempDecoder]) => val attrsOnJoin = new util.HashSet[Attribute] j.condition match { case Some(expression) => @@ -706,7 +706,38 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) if profile.isInstanceOf[IncludeProfile] && profile.isEmpty => child } - finalPlan + val updateDtrFn = finalPlan transform { + case p@Project(projectList: Seq[NamedExpression], cd) => + if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) { + p.transformAllExpressions { + case a@Alias(exp, _) + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers, + a.explicitMetadata) + case exp: NamedExpression + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + CustomDeterministicExpression(exp) + } + } else { + p + } + case f@Filter(condition: Expression, cd) => + if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) { + f.transformAllExpressions { + case a@Alias(exp, _) + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers, + a.explicitMetadata) + case exp: NamedExpression + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + CustomDeterministicExpression(exp) + } + } else { + f + } + } + + updateDtrFn } private def collectInformationOnAttributes(plan: LogicalPlan, @@ -812,14 +843,14 @@ case class CarbonDecoderRelation( def contains(attr: Attribute): Boolean = { val exists = attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) && - entry._1.exprId.equals(attr.exprId)) || - extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) && - entry.exprId.equals(attr.exprId)) + entry._1.exprId.equals(attr.exprId)) || + extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) && + entry.exprId.equals(attr.exprId)) exists } def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper, - CarbonDecoderRelation]): Unit = { + CarbonDecoderRelation]): Unit = { attributeMap.foreach { attr => attrMap.put(AttributeReferenceWrapper(attr._1), this) } @@ -827,3 +858,5 @@ case class CarbonDecoderRelation( lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap } + + diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala new file mode 100644 index 00000000000..63127466aa0 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.types.{DataType, StringType} + +/** + * Custom expression to override the deterministic property . + */ +case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{ + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = null + + override def dataType: DataType = StringType + + override def children: Seq[Expression] = Seq() + + override def deterministic: Boolean = true + + def childexp : Expression = nonDt + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("") +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index eac0a288470..bc090678334 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.optimizer.CarbonDecoderRelation +import org.apache.spark.sql.optimizer.{CarbonDecoderRelation} import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType} @@ -59,7 +59,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { filters, (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan( a.map(_.name).toArray, f), needDecoder)) :: - Nil + Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) || !CarbonDictionaryDecoder. @@ -139,10 +139,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { protected def pruneFilterProjectRaw( relation: LogicalRelation, - projects: Seq[NamedExpression], + rawProjects: Seq[NamedExpression], filterPredicates: Seq[Expression], scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = { + val projects = rawProjects.map {p => + p.transform { + case CustomDeterministicExpression(exp) => exp + } + }.asInstanceOf[Seq[NamedExpression]] val projectSet = AttributeSet(projects.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) @@ -162,7 +167,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) AttributeSet(handledPredicates.flatMap(_.references)) -- - (projectSet ++ unhandledSet).map(relation.attributeMap) + (projectSet ++ unhandledSet).map(relation.attributeMap) } // Combines all Catalyst filter `Expression`s that are either not convertible to data source @@ -213,12 +218,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { // when the columns of this projection are enough to evaluate all filter conditions, // just do a scan followed by a filter, with no extra project. val requestedColumns = projects - // Safe due to if above. - .asInstanceOf[Seq[Attribute]] - // Match original case of attributes. - .map(relation.attributeMap) - // Don't request columns that are only referenced by pushed filters. - .filterNot(handledSet.contains) + // Safe due to if above. + .asInstanceOf[Seq[Attribute]] + // Match original case of attributes. + .map(relation.attributeMap) + // Don't request columns that are only referenced by pushed filters. + .filterNot(handledSet.contains) val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) val updateProject = projects.map { expr => @@ -227,7 +232,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val dict = map.get(attr.name) if (dict.isDefined && dict.get) { attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr - .exprId, attr.qualifier) + .exprId, attr.qualifier) } } attr @@ -245,17 +250,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { var newProjectList: Seq[Attribute] = Seq.empty val updatedProjects = projects.map { - case a@Alias(s: ScalaUDF, name) - if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => - val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId) - newProjectList :+= reference - reference - case other => other + case a@Alias(s: ScalaUDF, name) + if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || + name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId) + newProjectList :+= reference + reference + case other => other } // Don't request columns that are only referenced by pushed filters. val requestedColumns = - (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList + (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) val scan = getDataSourceScan(relation, updateRequestedColumns.asInstanceOf[Seq[Attribute]], @@ -454,9 +459,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) => CastExpressionOptimization.checkIfCastCanBeRemove(c) case Not(EqualTo(a: Attribute, Literal(v, t))) => - Some(sources.Not(sources.EqualTo(a.name, v))) + Some(sources.Not(sources.EqualTo(a.name, v))) case Not(EqualTo(Literal(v, t), a: Attribute)) => - Some(sources.Not(sources.EqualTo(a.name, v))) + Some(sources.Not(sources.EqualTo(a.name, v))) case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => CastExpressionOptimization.checkIfCastCanBeRemove(c) case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => @@ -534,6 +539,6 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val supportCodegen = sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size supportCodegen && vectorizedReader.toBoolean && - cols.forall(_.dataType.isInstanceOf[AtomicType]) + cols.forall(_.dataType.isInstanceOf[AtomicType]) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 0dca0d4b9fa..c6dd90598aa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -51,7 +51,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { plan collect { case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => CarbonDecoderRelation(l.attributeMap, - l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) } } @@ -94,7 +94,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { val newCols = cols.map { case a@Alias(s: ScalaUDF, name) if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => + name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => udfExists = true projectionToBeAdded :+= a AttributeReference(name, StringType, nullable = true)().withExprId(a.exprId) @@ -311,7 +311,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { ) if (hasCarbonRelation(child) && condAttrs.size() > 0 && - !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) { + !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) { CarbonDictionaryTempDecoder(condAttrs, new util.HashSet[AttributeReferenceWrapper](), child, false, Some(localAliasMap)) @@ -389,7 +389,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { Filter(filter.condition, child) } - case j: Join + case j: Join if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] || j.right.isInstanceOf[CarbonDictionaryTempDecoder]) => val attrsOnJoin = new util.HashSet[Attribute] @@ -720,7 +720,39 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { cd } } - finalPlan + + val updateDtrFn = finalPlan transform { + case p@Project(projectList: Seq[NamedExpression], cd) => + if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) { + p.transformAllExpressions { + case a@Alias(exp, _) + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier, + a.explicitMetadata, a.isGenerated) + case exp: NamedExpression + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + CustomDeterministicExpression(exp) + } + } else { + p + } + case f@Filter(condition: Expression, cd) => + if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) { + f.transformAllExpressions { + case a@Alias(exp, _) + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifier, + a.explicitMetadata, a.isGenerated) + case exp: NamedExpression + if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] => + CustomDeterministicExpression(exp) + } + } else { + f + } + } + + updateDtrFn } private def collectInformationOnAttributes(plan: LogicalPlan, @@ -841,3 +873,4 @@ case class CarbonDecoderRelation( lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap } +