Skip to content

Commit

Permalink
Merge branch 'master' into KYUUBI-5579
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Nov 3, 2023
2 parents a71f3a6 + 51dd31c commit c37d655
Show file tree
Hide file tree
Showing 27 changed files with 189 additions and 260 deletions.
1 change: 1 addition & 0 deletions docs/client/jdbc/kyuubi_jdbc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ It's straightforward to use principal and keytab for Kerberos authentication, ju
- kyuubiClientPrincipal: Kerberos ``principal`` for client authentication
- kyuubiClientKeytab: path of Kerberos ``keytab`` file for client authentication
- kyuubiClientTicketCache: path of Kerberos ``ticketCache`` file for client authentication, available since 1.8.0.
- kyuubiServerPrincipal: Kerberos ``principal`` configured by `kyuubi.kinit.principal` at the server side. ``kyuubiServerPrincipal`` is available
as an alias of ``principal`` since 1.7.0, use ``principal`` for previous versions.

Expand Down
2 changes: 0 additions & 2 deletions docs/tools/kyuubi-admin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ Usage: ``bin/kyuubi-admin list engine [options]``
- The subdomain for the share level of an engine. If not specified, it will read the configuration item kyuubi.engine.share.level.subdomain from kyuubi-defaults.conf.
* - --hs2ProxyUser
- The proxy user to impersonate. When specified, it will list engines for the hs2ProxyUser.
* - -a --all
- All the engine.

.. _list_server:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HadoopFsRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.OptionsUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringURIExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "options",
"fieldExtractor" : "OptionsUriExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class CatalogStorageFormatURIExtractor extends URIExtractor {
}
}

class OptionsUriExtractor extends URIExtractor {
override def apply(v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[Map[String, String]].get("path").map(Uri).toSeq
}
}

class HadoopFsRelationFileIndexURIExtractor extends URIExtractor {
override def apply(v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[HadoopFsRelation].location.rootPaths.map(_.toString).map(Uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val SaveIntoDataSourceCommand = {
val cmd = "org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"
val queryDesc = queryQueryDesc
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc))
val uriDesc = UriDesc("options", classOf[OptionsUriExtractor])
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
}

val InsertIntoHadoopFsRelationCommand = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,19 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}


test("SaveIntoDataSourceCommand") {
withTempDir { path =>
withSingleCallEnabled {
val df = sql("SELECT 1 as id, 'Tony' as name")
interceptContains[AccessControlException](doAs(
someone,
df.write.format("console").save(path.toString)))(
s"does not have [select] privilege on [[$path, $path/]]")
}
}
}

test("HadoopFsRelation") {
val db1 = defaultDb
val table1 = "table1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.service.Serverable
Expand Down Expand Up @@ -102,9 +103,7 @@ object FlinkSQLEngine extends Logging {
startEngine(engineContext)
info("Flink engine started")

if ("yarn-application".equalsIgnoreCase(executionTarget)) {
bootstrapFlinkApplicationExecutor()
}
bootstrap(executionTarget)

// blocking main thread
countDownLatch.await()
Expand All @@ -129,15 +128,22 @@ object FlinkSQLEngine extends Logging {
}
}

private def bootstrapFlinkApplicationExecutor() = {
// trigger an execution to initiate EmbeddedExecutor with the default flink conf
private def bootstrap(executionTarget: String) = {
val flinkConf = new Configuration()
flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.")
val tableEnv = TableEnvironment.create(flinkConf)
val res = tableEnv.executeSql("select 'kyuubi'")
res.await()
info("Bootstrap Flink SQL finished.")

if ("yarn-application".equalsIgnoreCase(executionTarget)) {
// trigger an execution to initiate EmbeddedExecutor with the default flink conf
flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql")
debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.")
tableEnv.executeSql("select 'kyuubi'").await()
}

kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt =>
tableEnv.executeSql(stmt).await()
}

info("Bootstrap SQL finished.")
}

private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.service.session.{Session => FSession}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.udf.KDFRegistry
Expand Down Expand Up @@ -64,6 +65,15 @@ class FlinkSessionImpl(
override def open(): Unit = {
val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))

sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
try {
executor.executeStatement(OperationHandle.create, sql)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e)
}
}

val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array(USE_CATALOG, USE_DATABASE).contains(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources

private var zkServer: EmbeddedZookeeper = _

protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected val conf: KyuubiConf = new KyuubiConf(false)

protected def engineRefId: String

Expand All @@ -60,7 +60,6 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
}
}
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
conf.set(k, v)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.flink.operation

import java.util.UUID

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}

class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {

protected def jdbcUrl: String = getFlinkEngineServiceUrl

protected val ENGINE_INITIALIZE_SQL_VALUE: String =
"show databases;"

protected val ENGINE_SESSION_INITIALIZE_SQL_VALUE: String =
"""create catalog cat_b with ('type'='generic_in_memory');
|create table blackhole(i int) with ('connector'='blackhole');
|create table datagen(i int) with (
|'connector'='datagen',
|'fields.i.kind'='sequence',
|'fields.i.start'='1',
|'fields.i.end'='10')""".stripMargin

override def withKyuubiConf: Map[String, String] = {
Map(
"flink.execution.target" -> "remote",
"flink.high-availability.cluster-id" -> "flink-mini-cluster",
"flink.app.name" -> "kyuubi_connection_flink_kandy",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
KYUUBI_SESSION_USER_KEY -> "kandy")
}

override protected def engineRefId: String = UUID.randomUUID().toString

def namespace: String = "/kyuubi/flink-local-engine-test"

def shareLevel: String = ShareLevel.USER.toString

def engineType: String = "flink"

test("execute statement - kyuubi engine initialize") {
withJdbcStatement() { statement =>
var resultSet = statement.executeQuery("show catalogs")
val expectedCatalogs = Set("default_catalog", "cat_b")
var actualCatalogs = Set[String]()
while (resultSet.next()) {
actualCatalogs += resultSet.getString(1)
}
assert(expectedCatalogs.subsetOf(actualCatalogs))

resultSet = statement.executeQuery("show databases")
assert(resultSet.next())
assert(resultSet.getString(1) === "default_database")
assert(!resultSet.next())

val expectedTables = Set("blackhole", "datagen")
resultSet = statement.executeQuery("show tables")
while (resultSet.next()) {
assert(expectedTables.contains(resultSet.getString(1)))
}
assert(!resultSet.next())

var dropResult = statement.executeQuery("drop catalog cat_b")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table blackhole")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")

dropResult = statement.executeQuery("drop table datagen")
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ object KyuubiAuthenticationFactory extends Logging {
}

def getValidPasswordAuthMethod(authTypes: Set[AuthType]): AuthMethod = {
if (authTypes.contains(NONE)) AuthMethods.NONE
if (authTypes == Set(NOSASL)) AuthMethods.NONE
else if (authTypes.contains(NONE)) AuthMethods.NONE
else if (authTypes.contains(LDAP)) AuthMethods.LDAP
else if (authTypes.contains(JDBC)) AuthMethods.JDBC
else if (authTypes.contains(CUSTOM)) AuthMethods.CUSTOM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class AdminControlCliArguments(args: Seq[String], env: Map[String, String] = sys
| type ${cliConfig.engineOpts.engineType}
| sharelevel ${cliConfig.engineOpts.engineShareLevel}
| sharesubdomain ${cliConfig.engineOpts.engineSubdomain}
| all ${cliConfig.engineOpts.all}
""".stripMargin
case ControlObject.SERVER =>
s"""Parsed arguments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ class AdminListEngineCommand(cliConfig: CliConfig)
normalizedCliConfig.engineOpts.engineType,
normalizedCliConfig.engineOpts.engineShareLevel,
normalizedCliConfig.engineOpts.engineSubdomain,
normalizedCliConfig.commonOpts.hs2ProxyUser,
normalizedCliConfig.engineOpts.all).asScala
normalizedCliConfig.commonOpts.hs2ProxyUser).asScala
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object AdminCommandLine extends CommonCommandLine {
.text("\tDelete resources.")
.action((_, c) => c.copy(action = ControlAction.DELETE))
.children(
deleteEngineCmd(builder).text("\tDelete the specified engine node for user.")))
engineCmd(builder).text("\tDelete the specified engine node for user.")))

}

Expand All @@ -64,7 +64,7 @@ object AdminCommandLine extends CommonCommandLine {
.text("\tList information about resources.")
.action((_, c) => c.copy(action = ControlAction.LIST))
.children(
listEngineCmd(builder).text("\tList the engine nodes"),
engineCmd(builder).text("\tList all the engine nodes for a user"),
serverCmd(builder).text("\tList all the server nodes")))

}
Expand All @@ -80,7 +80,7 @@ object AdminCommandLine extends CommonCommandLine {
refreshConfigCmd(builder).text("\tRefresh the config with specified type.")))
}

private def deleteEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
private def engineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
.children(
Expand All @@ -95,24 +95,6 @@ object AdminCommandLine extends CommonCommandLine {
.text("The engine share level this engine belong to."))
}

private def listEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE))
.children(
opt[String]("engine-type").abbr("et")
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineType = v)))
.text("The engine type this engine belong to."),
opt[String]("engine-subdomain").abbr("es")
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineSubdomain = v)))
.text("The engine subdomain this engine belong to."),
opt[String]("engine-share-level").abbr("esl")
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineShareLevel = v)))
.text("The engine share level this engine belong to."),
opt[String]("all").abbr("a")
.action((v, c) => c.copy(engineOpts = c.engineOpts.copy(all = v)))
.text("All the engine."))
}

private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
import builder._
cmd("server").action((_, c) => c.copy(resource = ControlObject.SERVER))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ case class EngineOpts(
user: String = null,
engineType: String = null,
engineSubdomain: String = null,
engineShareLevel: String = null,
all: String = null)
engineShareLevel: String = null)

case class AdminConfigOpts(configType: String = null)
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,13 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
|Command: list [engine|server]
| List information about resources.
|Command: list engine [options]
| List the engine nodes
| List all the engine nodes for a user
| -et, --engine-type <value>
| The engine type this engine belong to.
| -es, --engine-subdomain <value>
| The engine subdomain this engine belong to.
| -esl, --engine-share-level <value>
| The engine share level this engine belong to.
| -a, --all <value> All the engine.
|Command: list server
| List all the server nodes
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class JdbcConnectionParams {
public static final String AUTH_KYUUBI_SERVER_PRINCIPAL = "kyuubiServerPrincipal";
public static final String AUTH_KYUUBI_CLIENT_PRINCIPAL = "kyuubiClientPrincipal";
public static final String AUTH_KYUUBI_CLIENT_KEYTAB = "kyuubiClientKeytab";
public static final String AUTH_KYUUBI_CLIENT_TICKET_CACHE = "kyuubiClientTicketCache";
public static final String AUTH_PASSWD = "password";
public static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
public static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,8 @@ private Subject createSubject() {
AccessControlContext context = AccessController.getContext();
return Subject.getSubject(context);
} else if (isTgtCacheAuthMode()) {
return KerberosAuthenticationManager.getTgtCacheAuthentication().getSubject();
String ticketCache = sessConfMap.get(AUTH_KYUUBI_CLIENT_TICKET_CACHE);
return KerberosAuthenticationManager.getTgtCacheAuthentication(ticketCache).getSubject();
} else {
// This should never happen
throw new IllegalArgumentException("Unsupported auth mode");
Expand Down
Loading

0 comments on commit c37d655

Please sign in to comment.