Skip to content

Commit

Permalink
[SPARK-3167] Handle special driver configs in Windows
Browse files Browse the repository at this point in the history
This is an effort to bring the Windows scripts up to speed after recent splashing changes in apache#1845.

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#2129 from andrewor14/windows-config and squashes the following commits:

881a8f0 [Andrew Or] Add reference to Windows taskkill
92e6047 [Andrew Or] Update a few comments (minor)
22b1acd [Andrew Or] Fix style again (minor)
afcffea [Andrew Or] Fix style (minor)
72004c2 [Andrew Or] Actually respect --driver-java-options
803218b [Andrew Or] Actually respect SPARK_*_CLASSPATH
eeb34a0 [Andrew Or] Update outdated comment (minor)
35caecc [Andrew Or] In Windows, actually kill Java processes on exit
f97daa2 [Andrew Or] Fix Windows spark shell stdin issue
83ebe60 [Andrew Or] Parse special driver configs in Windows (broken)
  • Loading branch information
andrewor14 authored and conviva-zz committed Sep 4, 2014
1 parent 1702d5f commit be742c7
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
3 changes: 2 additions & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

rem Build up classpath
set CLASSPATH=%FWDIR%conf
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf

if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
set ASSEMBLY_JAR=%%d
Expand Down
46 changes: 39 additions & 7 deletions bin/spark-class2.cmd
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!

setlocal enabledelayedexpansion

set SCALA_VERSION=2.10
Expand All @@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given

if not "x%SPARK_MEM%"=="x" (
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
echo e.g., spark.executor.memory or spark.driver.memory.
)

rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
Expand Down Expand Up @@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%

rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.repl.Main" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
rem The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
)
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
Expand All @@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size
for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
if "%jversion%" geq "1.8.0" (
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
) else (
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
)
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!

Expand Down Expand Up @@ -115,5 +125,27 @@ rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
rem to prepare the launch environment of this driver JVM.

rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
rem be done here because the Windows "shift" command does not work in a conditional block.
set BOOTSTRAP_ARGS=
shift
:start_parse
if "%~1" == "" goto end_parse
set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
shift
goto start_parse
:end_parse

if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
set SPARK_CLASS=1
"%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
) else (
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
)
:exit
2 changes: 1 addition & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")
Expand Down
34 changes: 23 additions & 11 deletions bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!

set SPARK_HOME=%~dp0..
set ORIG_ARGS=%*

rem Clear the values of all variables used
set DEPLOY_MODE=
set DRIVER_MEMORY=
rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
set SPARK_SUBMIT_OPTS=
set SPARK_DRIVER_MEMORY=
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=

:loop
if [%1] == [] goto continue
if [%1] == [--deploy-mode] (
set DEPLOY_MODE=%2
set SPARK_SUBMIT_DEPLOY_MODE=%2
) else if [%1] == [--properties-file] (
set SPARK_SUBMIT_PROPERTIES_FILE=%2
) else if [%1] == [--driver-memory] (
set DRIVER_MEMORY=%2
set SPARK_SUBMIT_DRIVER_MEMORY=%2
) else if [%1] == [--driver-library-path] (
set SPARK_SUBMIT_LIBRARY_PATH=%2
) else if [%1] == [--driver-class-path] (
Expand All @@ -45,12 +50,19 @@ if [%1] == [] goto continue
goto loop
:continue

if [%DEPLOY_MODE%] == [] (
set DEPLOY_MODE=client
)
rem For client mode, the driver will be launched in the same JVM that launches
rem SparkSubmit, so we may need to read the properties file for any extra class
rem paths, library paths, java options and memory early on. Otherwise, it will
rem be too late by the time the driver JVM has started.

if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
rem Parse the properties file only if the special configs exist
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
)
)
)

cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
val process = builder.start()

// Redirect stdin, stdout, and stderr to/from the child JVM
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdinThread.start()
stdoutThread.start()
stderrThread.start()

// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
stdinThread.join()
process.destroy()
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
// a thread that contends with the subprocess in reading from System.in.
if (Utils.isWindows) {
// For the PySpark shell, the termination of this process is handled in java_gateway.py
process.waitFor()
} else {
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
stdinThread.join()
process.destroy()
}
}

}
17 changes: 17 additions & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import atexit
import os
import sys
import signal
Expand Down Expand Up @@ -69,6 +70,22 @@ def preexec_func():
error_msg += "--------------------------------------------------------------\n"
raise Exception(error_msg)

# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)

# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):
Expand Down

0 comments on commit be742c7

Please sign in to comment.