Skip to content

Commit

Permalink
Merge branch 'master' into KYUUBI-5579
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Nov 6, 2023
2 parents 651f3f6 + fdacd23 commit 96f2006
Show file tree
Hide file tree
Showing 21 changed files with 330 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Expression, NamedExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Generate, LogicalPlan, Project, Sort, SubqueryAlias, View, Window}

/**
* Infer the columns for Rebalance and Sort to improve the compression ratio.
Expand Down Expand Up @@ -96,6 +96,12 @@ object InferRebalanceAndSortOrders {
case f: Filter => candidateKeys(f.child, output)
case s: SubqueryAlias => candidateKeys(s.child, output)
case v: View => candidateKeys(v.child, output)
case g: Generate => candidateKeys(g.child, AttributeSet(g.requiredChildOutput))
case w: Window =>
val aliasMap = getAliasMap(w.windowExpressions)
Some((
w.partitionSpec.map(p => aliasMap.getOrElse(p.canonicalized, p)),
w.orderSpec.map(_.child).map(o => aliasMap.getOrElse(o.canonicalized, o))))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
}

withView("v") {
withTable("t", "input1", "input2") {
withTable("t", "t2", "input1", "input2") {
withSQLConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS.key -> "true") {
sql(s"CREATE TABLE t (c1 int, c2 long) USING PARQUET PARTITIONED BY (p string)")
sql(s"CREATE TABLE t2 (c1 int, c2 long, c3 long) USING PARQUET PARTITIONED BY (p string)")
sql(s"CREATE TABLE input1 USING PARQUET AS SELECT * FROM VALUES(1,2),(1,3)")
sql(s"CREATE TABLE input2 USING PARQUET AS SELECT * FROM VALUES(1,3),(1,3)")
sql(s"CREATE VIEW v as SELECT col1, count(*) as col2 FROM input1 GROUP BY col1")
Expand Down Expand Up @@ -264,6 +265,30 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
|SELECT * FROM v
|""".stripMargin)
checkShuffleAndSort(df5.queryExecution.analyzed, 1, 1)

// generate
val df6 = sql(
s"""
|INSERT INTO TABLE t2 PARTITION(p='a')
|SELECT /*+ broadcast(input2) */ input1.col1, input2.col1, cast(cc.action1 as bigint)
|FROM input1
|JOIN input2
|ON input1.col1 = input2.col1
| lateral view explode(ARRAY(input1.col1, input1.col2)) cc as action1
|""".stripMargin)
checkShuffleAndSort(df6.queryExecution.analyzed, 1, 1)

// window
val df7 = sql(
s"""
|INSERT INTO TABLE t2 PARTITION(p='a')
|SELECT /*+ broadcast(input2) */ input1.col1, input2.col2,
| RANK() OVER (PARTITION BY input2.col2 ORDER BY input1.col1) AS rank
|FROM input1
|JOIN input2
|ON input1.col1 = input2.col1
|""".stripMargin)
checkShuffleAndSort(df7.queryExecution.analyzed, 1, 1)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,11 @@
"uriDescs" : [ {
"fieldName" : "storage",
"fieldExtractor" : "CatalogStorageFormatURIExtractor",
"actionTypeDesc" : {
"fieldName" : "overwrite",
"fieldExtractor" : "OverwriteOrInsertActionTypeExtractor",
"actionType" : null
},
"isInput" : false
} ]
}, {
Expand Down Expand Up @@ -1347,6 +1352,11 @@
"uriDescs" : [ {
"fieldName" : "options",
"fieldExtractor" : "OptionsUriExtractor",
"actionTypeDesc" : {
"fieldName" : "mode",
"fieldExtractor" : "SaveModeActionTypeExtractor",
"actionType" : null
},
"isInput" : false
} ]
}, {
Expand Down Expand Up @@ -1381,6 +1391,11 @@
"uriDescs" : [ {
"fieldName" : "storage",
"fieldExtractor" : "CatalogStorageFormatURIExtractor",
"actionTypeDesc" : {
"fieldName" : "overwrite",
"fieldExtractor" : "OverwriteOrInsertActionTypeExtractor",
"actionType" : null
},
"isInput" : false
} ]
}, {
Expand Down Expand Up @@ -1654,6 +1669,7 @@
"uriDescs" : [ {
"fieldName" : "path",
"fieldExtractor" : "StringURIExtractor",
"actionTypeDesc" : null,
"isInput" : false
} ]
}, {
Expand All @@ -1679,6 +1695,7 @@
"uriDescs" : [ {
"fieldName" : "path",
"fieldExtractor" : "StringURIExtractor",
"actionTypeDesc" : null,
"isInput" : true
} ]
}, {
Expand Down Expand Up @@ -1980,4 +1997,68 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.DeleteCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
"tableDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
}, {
"fieldName" : "target",
"fieldExtractor" : "SubqueryAliasTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ object PrivilegeObject {
) // TODO: Support catalog for function
}

def apply(uri: Uri): PrivilegeObject = {
def apply(
uri: Uri,
actionType: PrivilegeObjectActionType): PrivilegeObject = {
val privilegeObjectType = Option(new URI(uri.path).getScheme) match {
case Some("file") => LOCAL_URI
case _ => DFS_URL
}
new PrivilegeObject(
privilegeObjectType,
PrivilegeObjectActionType.OTHER,
actionType,
uri.path,
null,
Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ object PrivilegesBuilder {
spec.uriDescs.foreach { ud =>
try {
val uris = ud.extract(plan)
val actionType = ud.actionTypeDesc.map(_.extract(plan)).getOrElse(OTHER)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
inputObjs ++= uris.map(PrivilegeObject(_, actionType))
} else {
outputObjs ++= uris.map(PrivilegeObject(_))
outputObjs ++= uris.map(PrivilegeObject(_, actionType))
}
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ case class ScanDesc(
case class UriDesc(
fieldName: String,
fieldExtractor: String,
actionTypeDesc: Option[ActionTypeDesc] = None,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Seq[Uri] = {
val uriVal = invokeAs[AnyRef](v, fieldName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ class ResolvedIdentifierTableExtractor extends TableExtractor {
}
}

/**
* org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
*/
class SubqueryAliasTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
v1.asInstanceOf[SubqueryAlias] match {
case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
case SubqueryAlias(identifier, _) =>
lookupExtractor[StringTableExtractor].apply(spark, identifier.toString())
}
}
}

/**
* org.apache.spark.sql.connector.catalog.Table
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,13 +1469,13 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {

assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.actionType === PrivilegeObjectActionType.INSERT_OVERWRITE)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
assert(accessType1 == AccessType.UPDATE)
}

test("InsertIntoDataSourceCommand") {
Expand Down Expand Up @@ -1601,13 +1601,13 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {

assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.actionType === PrivilegeObjectActionType.INSERT_OVERWRITE)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
assert(accessType1 == AccessType.UPDATE)
}

test("InsertIntoHiveDirCommand") {
Expand All @@ -1634,13 +1634,13 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {

assert(out.size == 1)
val po1 = out.head
assert(po1.actionType === PrivilegeObjectActionType.OTHER)
assert(po1.actionType === PrivilegeObjectActionType.INSERT_OVERWRITE)
assert(po1.privilegeObjectType === PrivilegeObjectType.DFS_URL)
assert(po1.dbname === directory.path)
assert(po1.objectName === null)
assert(po1.columns === Seq.empty)
val accessType1 = ranger.AccessType(po1, operationType, isInput = true)
assert(accessType1 == AccessType.SELECT)
assert(accessType1 == AccessType.UPDATE)
}

test("InsertIntoHiveTableCommand") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._

object DeltaCommands extends CommandSpecs[TableCommandSpec] {

val DeleteCommand = {
val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc = TableDesc(
"catalogTable",
classOf[CatalogTableOptionTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc))
val targetDesc = TableDesc(
"target",
classOf[SubqueryAliasTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc, targetDesc))
}

val UpdateCommand = {
val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand"
DeleteCommand.copy(classname = cmd)
}

override def specs: Seq[TableCommandSpec] = Seq(
DeleteCommand,
UpdateCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
// scalastyle:on
test("check spec json files") {
writeCommandSpecJson("database", Seq(DatabaseCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands))
writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, HudiCommands, DeltaCommands))
writeCommandSpecJson("function", Seq(FunctionCommands))
writeCommandSpecJson("scan", Seq(Scans))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,22 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val InsertIntoDataSourceDir = {
val cmd = "org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand"
val queryDesc = queryQueryDesc
val uriDesc = UriDesc("storage", classOf[CatalogStorageFormatURIExtractor])
val actionTypeDesc = overwriteActionTypeDesc
val uriDesc = UriDesc(
"storage",
classOf[CatalogStorageFormatURIExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
}

val SaveIntoDataSourceCommand = {
val cmd = "org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"
val queryDesc = queryQueryDesc
val uriDesc = UriDesc("options", classOf[OptionsUriExtractor])
val actionTypeDesc = ActionTypeDesc("mode", classOf[SaveModeActionTypeExtractor])
val uriDesc = UriDesc(
"options",
classOf[OptionsUriExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
}

Expand Down
Loading

0 comments on commit 96f2006

Please sign in to comment.