Replace ports file with gateway socket #238

Merged
merged 85 commits into from Oct 4, 2016

Projects

None yet

3 participants

@javierluraschi
Member
javierluraschi commented Sep 28, 2016 edited

This PR lays the ground work to provide more advanced connection models and removes the need to use an intermediate ports file to ease configuration across platforms.

current connection architecture

sparklyr - backend improvements - crop

  • spark-submit: When executing spark_connect, sparklyr runs a shell command “spark-submit” which submits the sparklyr backend as a jar to Apache Spark..
  • ports-file: A binary file is written by the sparklyr backend to communicate back to sparklyr the sockets in the backend.
  • socket: Two sockets are provided, one is a monitor to destroy the backend once it closes, the other one requests java calls and serializes results back.

improved connection architecture

This PR proposes using the monitor socket as a gateway socket that replaces the ports file and provides routing and reconnection to existing backends as follows:

sparklyr - new backend - crop

At this point, it is not intended to add or change functionality. Therefore, commands are expected to work without any additional change.

The only new configuration parameter worth mentioning is the default port the gateway service will attempt to use. In some very particular systems, if this port is already in use, please follow the following configuration pattern:

config <- spark_config()

# default configuration settings for gateway socket
config[["sparklyr.gateway.port"]] <- "8880"
config[["sparklyr.gateway.address"]] <- "localhost"

sc <- spark_connect(master = "yarn-cluster", config = config)
javierluraschi added some commits Sep 22, 2016
@javierluraschi javierluraschi work in progress for multi-client connections 0168d58
@javierluraschi javierluraschi add support for connecting to gateway in r 7b135db
@javierluraschi javierluraschi get gateway socket to work end-to-end with single instances 1905874
@javierluraschi javierluraschi allow service port to be specified by client 9bca2cd
@javierluraschi javierluraschi prepare for merge with cran changes 03c6e8e
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket b729704
@javierluraschi javierluraschi regenerate jars 5392d27
@javierluraschi javierluraschi fix classpath paths 92c4e55
@javierluraschi javierluraschi write monitor port to allow gateway routing at some point 1d9ee9a
@javierluraschi javierluraschi add support to start sparklyr backend as a long running service c8af9a8
@javierluraschi javierluraschi add support to reconnect to existing service a60f845
@javierluraschi javierluraschi reuse sparkcontext while reconnecting 5f64c98
@javierluraschi javierluraschi reuse hive context on reconnection 2ed790e
@javierluraschi javierluraschi refactor shell abort code 7c766ed
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket 86704eb
@javierluraschi javierluraschi additional refactoring for abort_shell 85b603f
@javierluraschi javierluraschi add support for backend session ids to validate correct connections e59125f
@javierluraschi javierluraschi improve error messages and logging when session already in use 3e87b0b
@javierluraschi javierluraschi add support for gateway commands d1b8dec
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket 645ac70
@javierluraschi javierluraschi add support to route requests to right spark backend instance
934a7ba
@javierluraschi javierluraschi add support to configure gateway address
e278877
@javierluraschi javierluraschi changed the title from [WIP] Replace ports file with gateway socket to WIP: Replace ports file with gateway socket Sep 28, 2016
@jjallaire
Member

We might consider adding a service parameter directly to spark_connect in addition to being able to use the config file.

.classpath
@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
@kevinushey
kevinushey Sep 28, 2016 Contributor

Do we want to commit this to the sparklyr repository?

@javierluraschi
javierluraschi Sep 28, 2016 Member

@kevinushey what's your take on this? I definitely had to use Eclipse to develop this set of changes so I'm planning to keep the Eclipse project in my working directory; however, in the short term, you are probably the only other person making changes to the Scala code. So, have you been using Eclipse already for your Scala changes? If not, would having an Eclipse project be helpful to you? If the answer is no, then I don't think it's worth checking this in, otherwise, I'll parameterize the paths to make you can use this as well.

@kevinushey
kevinushey Sep 28, 2016 Contributor

Let's bring it in!

@javierluraschi
javierluraschi Sep 30, 2016 Member

Spent some time here, unfortunately, eclipse is not great at parameterizing .classpaths. They recommend maven building the .classpath file, etc. Removing for now.

.classpath
+ <classpathentry kind="lib" path="inst/java/sparklyr-2.0-2.11.jar"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="con" path="org.scala-ide.sdt.launching.SCALA_CONTAINER"/>
+ <classpathentry kind="lib" path="/Users/javierluraschi/Library/Caches/spark/spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar"/>
@kevinushey
kevinushey Sep 28, 2016 Contributor

If we want this in the repo, is it possible to declare this as a relative path?

@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
@kevinushey
kevinushey Sep 28, 2016 Contributor

Same here -- should we have this in the repo / add to .Rbuildignore?

@javierluraschi
javierluraschi Sep 30, 2016 Member

I still think is helpful, we would still have to add classpaths manually to the project though.

R/spark_shell.R
+ }
+
+ tryCatch({
+ readInt <- function(con, n = 1) {
@kevinushey
kevinushey Sep 28, 2016 Contributor

We already have readInt defined in sparklyr; do we need to redefine it here?

@javierluraschi
javierluraschi Sep 30, 2016 Member

Thanks. Removed.

R/spark_shell.R
- paste(readLines(output_file), collapse = "\n"),
- if (file.exists(error_file)) paste(readLines(error_file), collapse = "\n") else "",
- sep = ""))
+ writeInt <- function(con, value) {
@kevinushey
kevinushey Sep 28, 2016 Contributor

This should already be defined as well.

+ var ss: ServerSocket = null
+ var available = false
+
+ Try {
@kevinushey
kevinushey Sep 28, 2016 Contributor

Does Try just swallow exceptions? Should we be catching / logging on failure?

@javierluraschi
javierluraschi Sep 30, 2016 Member

Yes, Try swallow exceptions, that was my intention since we are trying to discover is a port is open.

+ }
+
+ if (ss != null) {
+ Try {
@kevinushey
kevinushey Sep 28, 2016 Contributor

Similar logging here?

@javierluraschi
javierluraschi Sep 30, 2016 Member

Same, by design swallowing exceptions.

java/backend.scala
+ gatewayServerSocket.setSoTimeout(0)
+
+ while(true) {
+ bind()
@kevinushey
kevinushey Sep 28, 2016 Contributor

I'm probably not understanding exactly what's going on here, but it looks like we're repeatedly calling bind() within an infinite loop, and that call to bind unconditionally launches a number of threads. Can you elaborate a bit on what's going on here?

@kevinushey
kevinushey Sep 30, 2016 Contributor

Awesome, thanks! Much clearer now :)

javierluraschi added some commits Sep 28, 2016
@javierluraschi javierluraschi fix typo in wait_connect_gateway
0748a17
@javierluraschi javierluraschi read gateway address config as character 6b14689
@javierluraschi javierluraschi pass sessionid to gateway while querying ports
9eab545
javierluraschi added some commits Sep 28, 2016
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket d4e2220
@javierluraschi javierluraschi support service parameter directly in spark_connect
16e1df1
@javierluraschi javierluraschi fix regression for reusing connection in different r session
831a91a
@javierluraschi javierluraschi bump version
0c1ae1b
@javierluraschi javierluraschi use hash to generate sparklyr backend session id
dcb107b
@javierluraschi javierluraschi include master in service session id hash ccd39a5
@javierluraschi javierluraschi regenerate docs b01fd77
@javierluraschi javierluraschi exclude eclipse references from r build 26695c1
@javierluraschi javierluraschi bump version a23f3c0
@javierluraschi javierluraschi fix to abort_shell while connecting to gateway service 70cb384
@javierluraschi javierluraschi add entries to news file 7261de0
@javierluraschi javierluraschi regenerate jars 7c01f3c
@javierluraschi javierluraschi add support for filter in spark_log and logging improvements e60e9a3
@javierluraschi javierluraschi add support for terminate in spark_disconnect 8f742c3
@javierluraschi javierluraschi regenerate documentation adb2e91
@javierluraschi javierluraschi fix service connection from config setting 9ef4369
@javierluraschi javierluraschi support terminate in spark_disconnect_all 53e40df
@javierluraschi javierluraschi refactor spark gateway code into its own file 0a87c5a
@javierluraschi javierluraschi support connecting from same rsession multiple times through gateway 8dc7604
@javierluraschi javierluraschi support for gateway connection mode to news and jars update 78b4697
@javierluraschi javierluraschi implement gateway port redirection follow up 6d457f4
@javierluraschi javierluraschi only start backend when gateway will process the given session d85699e
@javierluraschi javierluraschi rebuild jars and bump version 39c0e68
@javierluraschi javierluraschi add support for basic spark_gateway_connection operations 6a46bbd
@javierluraschi javierluraschi cache sparkcontext for backend instance c3a9ab1
@javierluraschi javierluraschi map sparklyr protocol to gateway connection mode f977262
@javierluraschi javierluraschi reuse sparkcontext backend when connecting to service b0cd641
@javierluraschi javierluraschi bump version and rebuild jars a0eae2c
@javierluraschi javierluraschi pass terminate parameter down from spark_disconnect_all 140bd06
@javierluraschi javierluraschi describe in detail functionality of backend.scala 3ce27cb
@javierluraschi javierluraschi ignore .project eclipse file ccffe53
@javierluraschi javierluraschi rebuild documentation 0428981
@javierluraschi javierluraschi add support for backend to bing to any ip address 23df9a5
@javierluraschi javierluraschi bump version and rebuild jars 1bbdac4
@javierluraschi javierluraschi add support for remote gateway connections with sparklyr.service.remote d334af2
@javierluraschi javierluraschi use gateway address while connecting to sparklyr backend ebe7926
@javierluraschi javierluraschi remove .classpath to avoid referencing external dirs 6801df9
@javierluraschi javierluraschi remove duplicate documentation entry 641b6d0
@javierluraschi javierluraschi disable new gateway scenarios to scope change ef4f6b6
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket
0b61aef
@javierluraschi javierluraschi rebuild jars and documentation
5afea6d
@javierluraschi javierluraschi remove terminate parameter from spark_disconnect
cf27646
@javierluraschi javierluraschi add a timeout compatiable with windows sockets
b4993ec
@javierluraschi javierluraschi move wait time for port registration from backend to client to respoe…
…ct config
a50ed99
@javierluraschi javierluraschi terminate the server before exiting the backend process to allow for …
…cleanup of resources
7c78278
@javierluraschi javierluraschi avoid terminating backend when port mapping is not available
0679c4d
@javierluraschi javierluraschi use stopBackend termination mode by default to match existing behavior
d537c8e
@javierluraschi javierluraschi improve logging while connecting 17fcb86
@kevinushey
Contributor

FWIW you can also explicitly call spark_log.spark_shell_connection if that's preferred (munging the class of the object seems a bit awkward).

Member

Changed!

javierluraschi added some commits Oct 4, 2016
@javierluraschi javierluraschi fix typo and more logging improvements 6b2f034
@javierluraschi javierluraschi improve backend while starting multiple-sessions 2f539bd
@javierluraschi javierluraschi adjust abckend timeout to be windows compatible 77c3e16
@javierluraschi javierluraschi additional logging for backend connections 8e0e04f
@javierluraschi javierluraschi Merge branch 'master' into feature/service-socket
176cd6d
@javierluraschi javierluraschi regenerate jars 74a23eb
@javierluraschi javierluraschi bump version
d1689b1
@javierluraschi javierluraschi improve abort_shell to include traceback and error file c8757a1
@javierluraschi javierluraschi remove amazon aws from default 2ff7bce
@javierluraschi javierluraschi attempt to remove deprecated use of shell command in windows
4a107b2
@javierluraschi javierluraschi shquote using cmd2 in windows
546dfaa
@javierluraschi javierluraschi remove outputfile logging in windows since file is blocked
f09dfc2
@javierluraschi javierluraschi changed the title from WIP: Replace ports file with gateway socket to Replace ports file with gateway socket Oct 4, 2016
@javierluraschi javierluraschi merged commit b0d1296 into master Oct 4, 2016

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment