Skip to content

Commit

Permalink
[KYUUBI apache#5579][AUTHZ] Support LogicalRelation don't have Catalo…
Browse files Browse the repository at this point in the history
…gTable but have HadoopFsRelation
  • Loading branch information
AngersZhuuuu committed Oct 31, 2023
1 parent 591250c commit 33282e2
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.
#

org.apache.kyuubi.plugin.spark.authz.serde.HadoopFsRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringURIExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,39 @@
"fieldExtractor" : "CatalogTableTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
"scanDescs" : [ {
"fieldName" : "tableMeta",
"fieldExtractor" : "CatalogTableTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
"scanDescs" : [ {
"fieldName" : "catalogTable",
"fieldExtractor" : "CatalogTableOptionTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ {
"fieldName" : "relation",
"fieldExtractor" : "HadoopFsRelationFileIndexURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
"scanDescs" : [ {
"fieldName" : null,
"fieldExtractor" : "DataSourceV2RelationTableExtractor",
"catalogDesc" : null
} ],
"functionDescs" : [ ]
"functionDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveGenericUDF",
"scanDescs" : [ ],
Expand All @@ -43,7 +51,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveGenericUDTF",
"scanDescs" : [ ],
Expand All @@ -57,7 +66,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveSimpleUDF",
"scanDescs" : [ ],
Expand All @@ -71,7 +81,8 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hive.HiveUDAFFunction",
"scanDescs" : [ ],
Expand All @@ -85,5 +96,6 @@
"skipTypes" : [ "TEMP", "SYSTEM" ]
},
"isInput" : true
} ]
} ],
"uriDescs" : [ ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.slf4j.LoggerFactory

import org.apache.kyuubi.plugin.spark.authz.OperationType.OperationType
Expand Down Expand Up @@ -112,6 +113,10 @@ object PrivilegesBuilder {
val cols = conditionList ++ aggCols
buildQuery(a.child, privilegeObjects, projectionList, cols, spark)

case logicalRelation @ LogicalRelation(_: HadoopFsRelation, _, None, _) =>
getScanSpec(logicalRelation).uris(logicalRelation)
.foreach(privilegeObjects += PrivilegeObject(_))

case scan if isKnownScan(scan) && scan.resolved =>
getScanSpec(scan).tables(scan, spark).foreach(mergeProjection(_, scan))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ case class TableCommandSpec(
case class ScanSpec(
classname: String,
scanDescs: Seq[ScanDesc],
functionDescs: Seq[FunctionDesc] = Seq.empty) extends CommandSpec {
functionDescs: Seq[FunctionDesc] = Seq.empty,
uriDescs: Seq[UriDesc] = Seq.empty) extends CommandSpec {
override def opType: String = OperationType.QUERY.toString
def tables: (LogicalPlan, SparkSession) => Seq[Table] = (plan, spark) => {
scanDescs.flatMap { td =>
Expand All @@ -115,6 +116,18 @@ case class ScanSpec(
}
}

def uris: LogicalPlan => Seq[Uri] = plan => {
uriDescs.flatMap { ud =>
try {
ud.extract(plan)
} catch {
case e: Exception =>
LOG.debug(ud.error(plan, e))
None
}
}
}

def functions: (Expression) => Seq[Function] = (expr) => {
functionDescs.flatMap { fd =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.execution.datasources.HadoopFsRelation

trait URIExtractor extends (AnyRef => Option[Uri]) with Extractor

object URIExtractor {
Expand All @@ -33,3 +35,9 @@ class StringURIExtractor extends URIExtractor {
Some(Uri(v1.asInstanceOf[String]))
}
}

class HadoopFsRelationFileIndexURIExtractor extends URIExtractor {
override def apply(v1: AnyRef): Option[Uri] = {
Some(Uri(v1.asInstanceOf[HadoopFsRelation].location.rootPaths.map(_.toString).mkString(",")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object Scans extends CommandSpecs[ScanSpec] {
ScanDesc(
"catalogTable",
classOf[CatalogTableOptionTableExtractor])
ScanSpec(r, Seq(tableDesc))
val uriDesc = UriDesc("relation", classOf[HadoopFsRelationFileIndexURIExtractor])
ScanSpec(r, Seq(tableDesc), uriDescs = Seq(uriDesc))
}

val DataSourceV2Relation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.plugin.spark.authz.ranger

import scala.reflect.io.File
import scala.util.Try

import org.apache.hadoop.security.UserGroupInformation
Expand Down Expand Up @@ -1032,4 +1033,31 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("HadoopFsRelation") {
val db1 = defaultDb
val table1 = "table1"
val tableDirectory = getClass.getResource("/").getPath + "table_directory"
val directory = File(tableDirectory).createDirectory()
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$db1.$table1", "table"))) {
doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)"))
doAs(
admin,
sql(
s"""
|INSERT OVERWRITE DIRECTORY '${directory.path}'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin))

interceptContains[AccessControlException](doAs(
someone,
sql(
s"""
|SELECT * FROM parquet.`${directory.path}`""".stripMargin).explain(true)))(
s"does not have [select] privilege on " +
s"[[file:${directory.path}, file:${directory.path}/]]")
}
}
}
}

0 comments on commit 33282e2

Please sign in to comment.