Skip to content

Commit

Permalink
[SPARK-46899][CORE][FOLLOWUP] Enable /workers/kill if `spark.decomm…
Browse files Browse the repository at this point in the history
…ission.enabled=true`

### What changes were proposed in this pull request?

This PR aims to re-enable `/workers/kill` API if `spark.decommission.enabled=true` as a follow-up of
- apache#44926

### Why are the changes needed?

To address this review comment in order to prevent a regression.
- apache#44926 (review)

### Does this PR introduce _any_ user-facing change?

No, this will recover the previous feature.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45010 from dongjoon-hyun/SPARK-46899-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
dongjoon-hyun authored and yaooqinn committed Feb 4, 2024
1 parent 7ca355c commit ed6fe4f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.get(UI_KILL_ENABLED)
val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED)
val decommissionEnabled = master.conf.get(DECOMMISSION_ENABLED)
val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)

initialize()
Expand All @@ -61,11 +61,13 @@ class MasterWebUI(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}
if (decommissionEnabled) {
attachHandler(createServletHandler("/workers/kill", new HttpServlet {
override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val hostnames: Seq[String] = Option(req.getParameterValues("host"))
.getOrElse(Array[String]()).toImmutableArraySeq
if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) {
if (!isDecommissioningRequestAllowed(req)) {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import org.mockito.Mockito.{mock, when}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.master._
import org.apache.spark.deploy.master.ui.MasterWebUISuite._
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.util.Utils

class ReadOnlyMasterWebUISuite extends SparkFunSuite {

val conf = new SparkConf().set(UI_KILL_ENABLED, false)
val conf = new SparkConf()
.set(UI_KILL_ENABLED, false)
.set(DECOMMISSION_ENABLED, false)
val securityMgr = new SecurityManager(conf)
val rpcEnv = mock(classOf[RpcEnv])
val master = mock(classOf[Master])
Expand Down

0 comments on commit ed6fe4f

Please sign in to comment.