Skip to content

Commit

Permalink
SPARK-1114: Allow PySpark to use existing JVM and Gateway
Browse files Browse the repository at this point in the history
Patch to allow PySpark to use existing JVM and Gateway. Changes to PySpark implementation of SparkConf to take existing SparkConf JVM handle. Change to PySpark SparkContext to allow subclass specific context initialization.

Author: Ahir Reddy <ahirreddy@gmail.com>

Closes #622 from ahirreddy/pyspark-existing-jvm and squashes the following commits:

a86f457 [Ahir Reddy] Patch to allow PySpark to use existing JVM and Gateway. Changes to PySpark implementation of SparkConf to take existing SparkConf JVM handle. Change to PySpark SparkContext to allow subclass specific context initialization.
  • Loading branch information
ahirreddy authored and mateiz committed Feb 21, 2014
1 parent 3fede48 commit 59b1379
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
15 changes: 10 additions & 5 deletions python/pyspark/conf.py
Expand Up @@ -75,19 +75,24 @@ class SparkConf(object):
and can no longer be modified by the user.
"""

def __init__(self, loadDefaults=True, _jvm=None):
def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
"""
Create a new Spark configuration.
@param loadDefaults: whether to load values from Java system
properties (True by default)
@param _jvm: internal parameter used to pass a handle to the
Java VM; does not need to be set by users
@param _jconf: Optionally pass in an existing SparkConf handle
to use its parameters
"""
from pyspark.context import SparkContext
SparkContext._ensure_initialized()
_jvm = _jvm or SparkContext._jvm
self._jconf = _jvm.SparkConf(loadDefaults)
if _jconf:
self._jconf = _jconf
else:
from pyspark.context import SparkContext
SparkContext._ensure_initialized()
_jvm = _jvm or SparkContext._jvm
self._jconf = _jvm.SparkConf(loadDefaults)

def set(self, key, value):
"""Set a configuration property."""
Expand Down
17 changes: 12 additions & 5 deletions python/pyspark/context.py
Expand Up @@ -51,7 +51,8 @@ class SparkContext(object):


def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand All @@ -70,6 +71,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
unlimited batch size.
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
>>> from pyspark.context import SparkContext
Expand All @@ -80,7 +83,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
SparkContext._ensure_initialized(self)
SparkContext._ensure_initialized(self, gateway=gateway)

self.environment = environment or {}
self._conf = conf or SparkConf(_jvm=self._jvm)
Expand Down Expand Up @@ -120,7 +123,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self.environment[varName] = v

# Create the Java SparkContext through Py4J
self._jsc = self._jvm.JavaSparkContext(self._conf._jconf)
self._jsc = self._initialize_context(self._conf._jconf)

# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
Expand Down Expand Up @@ -152,11 +155,15 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()

# Initialize SparkContext in function to allow subclass specific initialization
def _initialize_context(self, jconf):
return self._jvm.JavaSparkContext(jconf)

@classmethod
def _ensure_initialized(cls, instance=None):
def _ensure_initialized(cls, instance=None, gateway=None):
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = launch_gateway()
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile

Expand Down

0 comments on commit 59b1379

Please sign in to comment.