Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/untangling imports #161

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,4 @@ target/
/pysparkling/tests/20news-19997.tar.gz

/scripts_private/
/.scannerwork/
4 changes: 2 additions & 2 deletions pysparkling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from .cache_manager import CacheManager, TimedCacheManager
from .context import Context
from .rdd import RDD
from .sql.types import Row
from .stat_counter import StatCounter
from .sql import Row
from .statcounter import StatCounter
from .storagelevel import StorageLevel

__all__ = ['RDD', 'Context', 'Broadcast', 'StatCounter', 'CacheManager', 'Row',
Expand Down
121 changes: 121 additions & 0 deletions pysparkling/_auto_injector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import importlib
from importlib.abc import Loader, MetaPathFinder
import sys
import time
import warnings


class Pyspark2Pysparkling(MetaPathFinder, Loader):

@classmethod
def find_spec(cls, name, path, target=None):
if name.startswith('pysparkling'):
return None # Nope. We're not providing this..

if not name.startswith('pyspark'):
return None # Not handling these either...

new_name = 'pysparkling' + name[7:]

for finder in sys.meta_path[1:]:
module_spec = finder.find_spec(fullname=new_name, path=path, target=target)
if module_spec:
break
else:
return None # No finder could be found?

module_spec.loader = cls(module_spec.loader) # Inject my own loader here.

return module_spec

def __init__(self, old_loader: Loader):
self.old_loader = old_loader

def create_module(self, spec):
# Delegate to the old loader
return self.old_loader.create_module(spec)

def exec_module(self, module):
# Delegate to the old loader
return_value = self.old_loader.exec_module(module)

# Cache it for future use
new_name = module.__name__
old_name = 'pyspark' + new_name[11:]
sys.modules[old_name] = module

return return_value

def module_repr(self, module):
# Delegate to the old loader
return self.old_loader.module_repr(module)

@classmethod
def is_enabled(cls):
return cls in sys.meta_path

@classmethod
def setup(cls):
sys.meta_path.insert(0, cls)

# Find any already loaded 'pyspark' modules:
modules_to_set = [
(pyspark_name, 'pysparkling' + pyspark_name[7:])
for pyspark_name in sys.modules
if pyspark_name.startswith('pyspark') and not pyspark_name.startswith('pysparkling')
]

if modules_to_set:
warnings.warn(
"pyspark was already loaded."
" Please setup pysparkling first to ensure no nasty side-effects take place."
)

for pyspark_name, new_name in modules_to_set:
try:
# Pysparkling was already loaded?
pysparkling_module = sys.modules[new_name]
except KeyError:
# Load it
pysparkling_module = importlib.import_module(new_name)

# And override it in the pyspark one
sys.modules[pyspark_name] = pysparkling_module


def _test():
# Comment or un-comment the next line to make the magic work...
Pyspark2Pysparkling.setup()

start = time.time()

# pylint: disable=import-outside-toplevel
from pyspark.sql import SparkSession

spark = (
SparkSession.builder
.master("local")
.appName("SparkByExamples.com")
.getOrCreate()
)
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd = spark.sparkContext.parallelize(dataList)
print(rdd.collect())

data = [
('James', '', 'Smith', '1991-04-01', 'M', 3000),
('Michael', 'Rose', '', '2000-05-19', 'M', 4000),
('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1),
]

columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.show()

print(f"Finished this run in : {time.time() - start:.3f}s (Injector was on: {Pyspark2Pysparkling.is_enabled()})")


if __name__ == '__main__':
_test()
185 changes: 185 additions & 0 deletions pysparkling/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
__all__ = ['SparkConf']


class SparkConf:

"""
Configuration for a Spark application. Used to set various Spark
parameters as key-value pairs.

Most of the time, you would create a SparkConf object with
``SparkConf()``, which will load values from `spark.*` Java system
properties as well. In this case, any parameters you set directly on
the :class:`SparkConf` object take priority over system properties.

For unit tests, you can also call ``SparkConf(false)`` to skip
loading external settings and get the same configuration no matter
what the system properties are.

All setter methods in this class support chaining. For example,
you can write ``conf.setMaster("local").setAppName("My app")``.

Parameters
----------
loadDefaults : bool
whether to load values from Java system properties (True by default)

Notes
-----
Once a SparkConf object is passed to Spark, it is cloned
and can no longer be modified by the user.

Examples
--------
>>> from pysparkling.conf import SparkConf
>>> from pysparkling.context import SparkContext
>>> conf = SparkConf()
>>> conf.setMaster("local").setAppName("My app") # doctest: +ELLIPSIS
<pysparkling.conf.SparkConf object at ...>
>>> conf.get("spark.master")
'local'
>>> conf.get("spark.app.name")
'My app'
>>> sc = SparkContext(conf=conf)
>>> sc.master
'local'
>>> sc.appName
'My app'
>>> sc.sparkHome is None
True

>>> conf = SparkConf(loadDefaults=False)
>>> conf.setSparkHome("/path") # doctest: +ELLIPSIS
<pysparkling.conf.SparkConf object at ...>
>>> conf.get("spark.home")
'/path'
>>> conf.setExecutorEnv("VAR1", "value1") # doctest: +ELLIPSIS
<pysparkling.conf.SparkConf object at ...>
>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")]) # doctest: +ELLIPSIS
<pysparkling.conf.SparkConf object at ...>
>>> conf.get("spark.executorEnv.VAR1")
'value1'
>>> print(conf.toDebugString())
spark.home=/path
spark.executorEnv.VAR1=value1
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
>>> for p in sorted(conf.getAll(), key=lambda p: p[0]):
... print(p)
('spark.executorEnv.VAR1', 'value1')
('spark.executorEnv.VAR3', 'value3')
('spark.executorEnv.VAR4', 'value4')
('spark.home', '/path')
>>> print(conf.toDebugString())
spark.home=/path
spark.executorEnv.VAR1=value1
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
"""

def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
"""
Create a new Spark configuration.
"""
if _jconf:
self._jconf = _jconf
else:
self._jconf = None
self._conf = {}

def set(self, key, value):
"""Set a configuration property."""
# Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet.
if self._jconf is not None:
self._jconf.set(key, str(value))
else:
self._conf[key] = str(value)
return self

def setIfMissing(self, key, value):
"""Set a configuration property, if not already set."""
if self.get(key) is None:
self.set(key, value)
return self

def setMaster(self, value):
"""Set master URL to connect to."""
self.set("spark.master", value)
return self

def setAppName(self, value):
"""Set application name."""
self.set("spark.app.name", value)
return self

def setSparkHome(self, value):
"""Set path where Spark is installed on worker nodes."""
self.set("spark.home", value)
return self

def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set an environment variable to be passed to executors."""
if (key is not None and pairs is not None) or (key is None and pairs is None):
raise Exception("Either pass one key-value pair or a list of pairs")

if key is not None:
self.set("spark.executorEnv." + key, value)
elif pairs is not None:
for (k, v) in pairs:
self.set("spark.executorEnv." + k, v)

return self

def setAll(self, pairs):
"""
Set multiple parameters, passed as a list of key-value pairs.

Parameters
----------
pairs : iterable of tuples
list of key-value pairs to set
"""
for (k, v) in pairs:
self.set(k, v)
return self

def get(self, key, defaultValue=None):
"""Get the configured value for some key, or return a default otherwise."""
if defaultValue is None: # Py4J doesn't call the right get() if we pass None
if self._jconf is not None:
if not self._jconf.contains(key):
return None
return self._jconf.get(key)

if key not in self._conf:
return None
return self._conf[key]

if self._jconf is not None:
return self._jconf.get(key, defaultValue)

return self._conf.get(key, defaultValue)

def getAll(self):
"""Get all values as a list of key-value pairs."""
if self._jconf is not None:
return [(elem._1(), elem._2()) for elem in self._jconf.getAll()]

return self._conf.items()

def contains(self, key):
"""Does this configuration contain a given key?"""
if self._jconf is not None:
return self._jconf.contains(key)

return key in self._conf

def toDebugString(self):
"""
Returns a printable version of the configuration, as a list of
key=value pairs, one per line.
"""
if self._jconf is not None:
return self._jconf.toDebugString()

return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items())
29 changes: 29 additions & 0 deletions pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .__version__ import __version__ as PYSPARKLING_VERSION
from .broadcast import Broadcast
from .cache_manager import CacheManager
from .conf import SparkConf
from .exceptions import ContextIsLockedException
from .fileio import File, TextFile
from .partition import Partition
Expand All @@ -19,6 +20,8 @@

log = logging.getLogger(__name__)

__all__ = ['SparkContext']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a rather big change in pysparkling API which we may want to discuss, it may not be aligned with the previous vision that were the API was on purpose different as pyspark (namely here because it is not a SparkContext that is created, it's a pysparkling context)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, needs to be discussed than. For me, I want it to be a drop-in replacement for pyspark (during tests). As such it would need to be 100% aligned with the pyspark API.

@svenkreiss , @tools4origins : what is your view on this?

I also refer to the auto-injector: https://github.com/svenkreiss/pysparkling/pull/161/files#diff-b6988b98630fdeab52d583b68acee7ba2d41304b72a201045c0c141c7fd502dc



def unit_fn(arg):
"""Used as dummy serializer and deserializer."""
Expand Down Expand Up @@ -608,3 +611,29 @@ def __call__(self, data):
length = struct.unpack(self.length_fmt, prefix)[0]
package, data = data[:length], data[length:]
yield package


class SparkContext:
"""Accepts the same initialization parameters as pyspark, but redirects everything to Context."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not subclassing Context instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I actually wanted to make SparkContext a singleton. And as far as I can see, Context is not a singleton.

PlusI went a far way out to try to copy the pyspark API into SparkContext, but finally ended up with the current implementation.

Long story short: inheriting & checking for single use in new should be better :-). I updated the code as such.


_spark_active_context = None

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=None, conf=None,
gateway=None, jsc=None, profiler_cls=None):
if SparkContext._spark_active_context is not None:
raise ValueError("Only one spark session can be active at one time.")

SparkContext._spark_active_context = Context(
serializer=serializer,
)

conf = conf or SparkConf()

self.master = master or conf.get('spark.master', None)
self.appName = appName or conf.get('spark.app.name', None)
self.sparkHome = sparkHome or conf.get('spark.home', None)

def __getattr__(self, item):
"""Redirect just everything to Context()."""
return getattr(self._spark_active_context, item)