Skip to content

Commit

Permalink
[CARBONDATA-649] fix for update with rand function
Browse files Browse the repository at this point in the history
This closes apache#1296
  • Loading branch information
ashwini-krishnakumar authored and xubo245 committed Sep 17, 2017
1 parent 21bdfda commit fb56f86
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 61 deletions.
Expand Up @@ -448,6 +448,36 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS default.carbon1")
}

test("update table in carbondata with rand() ") {

sql("""CREATE TABLE iud.rand(imei string,age int,task bigint,num double,level decimal(10,3),name string)STORED BY 'org.apache.carbondata.format' """)
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/update01.csv' INTO TABLE iud.rand OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,age,task,num,level,name')""").collect

sql("select substring(name,1,2 ) , name ,getTupleId() as tupleId , rand() from iud.rand").show(100)

sql("select name , substring(name,1,2 ) ,getTupleId() as tupleId , num , rand() from iud.rand").show(100)

sql("Update rand set (num) = (rand())").show()

sql("Update rand set (num) = (rand(9))").show()

sql("Update rand set (name) = ('Lily')").show()

sql("select name , num from iud.rand").show(100)

sql("select imei , age , name , num from iud.rand").show(100)

sql("select rand() , getTupleId() as tupleId from iud.rand").show(100)

sql("select * from iud.rand").show(100)

sql("select imei , rand() , num from iud.rand").show(100)

sql("select name , rand() from iud.rand").show(100)

sql("DROP TABLE IF EXISTS iud.rand")
}

override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
Expand Down
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.types.{DataType, StringType}

/**
* Custom expression to override the deterministic property
*
*/
case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
override def nullable: Boolean = true

override def eval(input: InternalRow): Any = null

override protected def genCode(ctx: CodeGenContext,
ev: GeneratedExpressionCode): String = ev.code
override def deterministic: Boolean = true

override def dataType: DataType = StringType

override def children: Seq[Expression] = Seq()
}
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
import org.apache.spark.sql.hive.execution.command._
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.optimizer.{CarbonDecoderRelation}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType

Expand Down Expand Up @@ -63,15 +63,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case PhysicalOperation(projectList, predicates, l: LogicalRelation)
if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
if (isStarQuery(plan)) {
carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil
} else {
carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil
}
case InsertIntoCarbonTable(relation: CarbonDatasourceRelation,
_, child: LogicalPlan, overwrite, _) =>
ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
_, child: LogicalPlan, overwrite, _) =>
ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil
case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
CarbonDictionaryDecoder(relations,
profile,
Expand All @@ -85,21 +85,27 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
/**
* Create carbon scan
*/
private def carbonRawScan(projectList: Seq[NamedExpression],
predicates: Seq[Expression],
logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
private def carbonRawScan(projectListRaw: Seq[NamedExpression],
predicates: Seq[Expression],
logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {

val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
val tableName: String =
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
// Check out any expressions are there in project list. if they are present then we need to
// decode them as well.

val projectList = projectListRaw.map {p =>
p.transform {
case CustomDeterministicExpression(exp) => exp
}
}.asInstanceOf[Seq[NamedExpression]]
val newProjectList = projectList.map { element =>
element match {
case a@Alias(s: ScalaUDF, name)
if (name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
name.equalsIgnoreCase(
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
name.equalsIgnoreCase(
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)) =>
AttributeReference(name, StringType, true)().withExprId(a.exprId)
case other => other
}
Expand Down Expand Up @@ -154,8 +160,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
* Create carbon scan for star query
*/
private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
predicates: Seq[Expression],
logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
predicates: Seq[Expression],
logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]
val tableName: String =
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
Expand Down Expand Up @@ -194,10 +200,10 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}

def getCarbonDecoder(logicalRelation: LogicalRelation,
sc: SQLContext,
tableName: String,
projectExprsNeedToDecode: Seq[Attribute],
scan: CarbonScan): CarbonDictionaryDecoder = {
sc: SQLContext,
tableName: String,
projectExprsNeedToDecode: Seq[Attribute],
scan: CarbonScan): CarbonDictionaryDecoder = {
val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
val attrs = projectExprsNeedToDecode.map { attr =>
Expand Down Expand Up @@ -227,7 +233,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
relation: CarbonDatasourceRelation,
allAttrsNotDecode: util.Set[Attribute]): AttributeReference = {
if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) &&
!allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
!allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) {
AttributeReference(attr.name,
IntegerType,
attr.nullable,
Expand All @@ -240,7 +246,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
private def isStarQuery(plan: LogicalPlan) = {
plan match {
case LogicalFilter(condition, l: LogicalRelation)
if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
true
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true
case _ => false
Expand All @@ -252,15 +258,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DropTable(tableName, ifNotExists)
if CarbonEnv.get.carbonMetastore
.isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
.isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) =>
val identifier = toTableIdentifier(tableName.toLowerCase)
ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil
case ShowLoadsCommand(databaseName, table, limit) =>
ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
options, isOverwriteExist, inputSqlString, dataFrame, _) =>
val isCarbonTable = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
.tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
if (isCarbonTable || options.nonEmpty) {
ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
Expand All @@ -269,15 +275,15 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
case alterTable@AlterTableCompaction(altertablemodel) =>
val isCarbonTable = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sqlContext)
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sqlContext)
if (isCarbonTable) {
if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
altertablemodel.compactionType.equalsIgnoreCase("major")) {
ExecutedCommand(alterTable) :: Nil
} else {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on carbon table")
"Unsupported alter operation on carbon table")
}
} else {
ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil
Expand Down Expand Up @@ -305,7 +311,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
case DescribeFormattedCommand(sql, tblIdentifier) =>
val isTable = CarbonEnv.get.carbonMetastore
.tableExists(tblIdentifier)(sqlContext)
.tableExists(tblIdentifier)(sqlContext)
if (isTable) {
val describe =
LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)
Expand Down
Expand Up @@ -59,7 +59,7 @@ object CarbonOptimizer {
}
}

// get the carbon relation from plan.
// get the carbon relation from plan.
def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = {
plan collect {
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
Expand All @@ -73,7 +73,7 @@ object CarbonOptimizer {
* decoder plan.
*/
class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
extends Rule[LogicalPlan] with PredicateHelper {
extends Rule[LogicalPlan] with PredicateHelper {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def apply(logicalPlan: LogicalPlan): LogicalPlan = {
if (relations.nonEmpty && !isOptimized(logicalPlan)) {
Expand Down Expand Up @@ -101,7 +101,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
val newPlan = updatePlan transform {
case Project(pList, child) if (!isTransformed) =>
val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
.splitAt(pList.size - cols.size)
.splitAt(pList.size - cols.size)
val diff = cols.diff(dest.map(_.name))
if (diff.size > 0) {
sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
Expand Down Expand Up @@ -284,7 +284,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])

case union: Union
if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
val leftLocalAliasMap = CarbonAliasDecoderRelation()
Expand Down Expand Up @@ -369,7 +369,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
}
} else {
CarbonFilters
.selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
.selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
}

var child = filter.child
Expand All @@ -391,7 +391,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])

case j: Join
if !(j.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
j.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
val attrsOnJoin = new util.HashSet[Attribute]
j.condition match {
case Some(expression) =>
Expand Down Expand Up @@ -706,7 +706,38 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
if profile.isInstanceOf[IncludeProfile] && profile.isEmpty =>
child
}
finalPlan
val updateDtrFn = finalPlan transform {
case p@Project(projectList: Seq[NamedExpression], cd) =>
if (cd.isInstanceOf[Filter] || cd.isInstanceOf[LogicalRelation]) {
p.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
a.explicitMetadata)
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CustomDeterministicExpression(exp)
}
} else {
p
}
case f@Filter(condition: Expression, cd) =>
if (cd.isInstanceOf[Project] || cd.isInstanceOf[LogicalRelation]) {
f.transformAllExpressions {
case a@Alias(exp, _)
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
Alias(CustomDeterministicExpression(exp), a.name)(a.exprId, a.qualifiers,
a.explicitMetadata)
case exp: NamedExpression
if !exp.deterministic && !exp.isInstanceOf[CustomDeterministicExpression] =>
CustomDeterministicExpression(exp)
}
} else {
f
}
}

updateDtrFn
}

private def collectInformationOnAttributes(plan: LogicalPlan,
Expand Down Expand Up @@ -812,18 +843,20 @@ case class CarbonDecoderRelation(
def contains(attr: Attribute): Boolean = {
val exists =
attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
entry._1.exprId.equals(attr.exprId)) ||
extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
entry.exprId.equals(attr.exprId))
entry._1.exprId.equals(attr.exprId)) ||
extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
entry.exprId.equals(attr.exprId))
exists
}

def fillAttributeMap(attrMap: java.util.HashMap[AttributeReferenceWrapper,
CarbonDecoderRelation]): Unit = {
CarbonDecoderRelation]): Unit = {
attributeMap.foreach { attr =>
attrMap.put(AttributeReferenceWrapper(attr._1), this)
}
}

lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
}


@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, StringType}

/**
* Custom expression to override the deterministic property .
*/
case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{
override def nullable: Boolean = true

override def eval(input: InternalRow): Any = null

override def dataType: DataType = StringType

override def children: Seq[Expression] = Seq()

override def deterministic: Boolean = true

def childexp : Expression = nonDt

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
}

0 comments on commit fb56f86

Please sign in to comment.