Skip to content

Commit

Permalink
[SPARK-2627] undo unnecessary line breaks
Browse files Browse the repository at this point in the history
“Undo” unnecessary line breaks introduced by accident when I called
autopep8 on the whole Python directory without setting the max line
length to 100. (autopep8 defaults to 79.)
  • Loading branch information
nchammas committed Aug 4, 2014
1 parent 44e3e56 commit 91b7584
Show file tree
Hide file tree
Showing 23 changed files with 143 additions and 254 deletions.
12 changes: 4 additions & 8 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@
pickleSer = PickleSerializer()

# Holds accumulators registered on the current machine, keyed by ID. This is then used to send
# the local accumulator updates back to the driver program at the end of a
# task.
# the local accumulator updates back to the driver program at the end of a task.
_accumulatorRegistry = {}


Expand Down Expand Up @@ -141,16 +140,14 @@ def __reduce__(self):
def value(self):
"""Get the accumulator's value; only usable in driver program"""
if self._deserialized:
raise Exception(
"Accumulator.value cannot be accessed inside tasks")
raise Exception("Accumulator.value cannot be accessed inside tasks")
return self._value

@value.setter
def value(self, value):
"""Sets the accumulator's value; only usable in driver program"""
if self._deserialized:
raise Exception(
"Accumulator.value cannot be accessed inside tasks")
raise Exception("Accumulator.value cannot be accessed inside tasks")
self._value = value

def add(self, term):
Expand Down Expand Up @@ -225,8 +222,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
def handle(self):
from pyspark.accumulators import _accumulatorRegistry
while not self.server.server_shutdown:
# Poll every 1 second for new data -- don't block in case of
# shutdown.
# Poll every 1 second for new data -- don't block in case of shutdown.
r, _, _ = select.select([self.rfile], [], [], 1)
if self.rfile in r:
num_updates = read_int(self.rfile)
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ def setSparkHome(self, value):
def setExecutorEnv(self, key=None, value=None, pairs=None):
"""Set an environment variable to be passed to executors."""
if (key is not None and pairs is not None) or (key is None and pairs is None):
raise Exception(
"Either pass one key-value pair or a list of pairs")
raise Exception("Either pass one key-value pair or a list of pairs")
elif key is not None:
self._jconf.setExecutorEnv(key, value)
elif pairs is not None:
Expand Down
104 changes: 39 additions & 65 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class SparkContext(object):
_writeToFile = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
# zip and egg files that need to be added to PYTHONPATH
_lock = Lock() # zip and egg files that need to be added to PYTHONPATH
_python_includes = None
_default_batch_size_for_serialized_input = 10

Expand Down Expand Up @@ -101,15 +100,13 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._callsite = rdd._extract_concise_traceback()
else:
tempNamedTuple = namedtuple("Callsite", "function file linenum")
self._callsite = tempNamedTuple(
function=None, file=None, linenum=None)
self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf)
except:
# If an error occurs, clean up in order to allow future
# SparkContext creation:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise

Expand Down Expand Up @@ -142,8 +139,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
if not self._conf.contains("spark.master"):
raise Exception("A master URL must be set in your configuration")
if not self._conf.contains("spark.app.name"):
raise Exception(
"An application name must be set in your configuration")
raise Exception("An application name must be set in your configuration")

# Read back our properties from the conf in case we loaded some of them from
# the classpath or an external config file
Expand Down Expand Up @@ -184,8 +180,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self.addPyFile(path)

# Deploy code dependencies set by spark-submit; these will already have been added
# with SparkContext.addFile, so we just need to add them to the
# PYTHONPATH
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
Expand All @@ -195,11 +190,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
sys.path.append(dirname)

# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(
self._jsc.sc().conf())
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(
local_dir).getAbsolutePath()
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()

def _initialize_context(self, jconf):
"""
Expand Down Expand Up @@ -292,8 +285,7 @@ def parallelize(self, c, numSlices=None):
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
# Make sure we distribute data evenly if it's smaller than
# self.batchSize
# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(c):
c = list(c) # Make it a list so we can compute its length
batchSize = min(len(c) // numSlices, self._batchSize)
Expand Down Expand Up @@ -412,10 +404,8 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
Java object. (default sc._default_batch_size_for_serialized_input)
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
batchSize = max(
1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (
batchSize > 1) else PickleSerializer()
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits, batchSize)
return RDD(jrdd, self, ser)
Expand Down Expand Up @@ -445,13 +435,11 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
batchSize = max(
1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (
batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(
self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand All @@ -476,13 +464,11 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
batchSize = max(
1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (
batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(
self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand Down Expand Up @@ -510,13 +496,11 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
batchSize = max(
1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (
batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(
self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf, batchSize)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
Expand All @@ -541,12 +525,11 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
Java object. (default sc._default_batch_size_for_serialized_input)
"""
jconf = self._dictToJavaMap(conf)
batchSize = max(
1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (
batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
keyConverter, valueConverter, jconf, batchSize)
batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter,
jconf, batchSize)
return RDD(jrdd, self, ser)

def _checkpointFile(self, name, input_deserializer):
Expand Down Expand Up @@ -577,8 +560,7 @@ def union(self, rdds):
first = rdds[0]._jrdd
rest = [x._jrdd for x in rdds[1:]]
rest = ListConverter().convert(rest, self._gateway._gateway_client)
return RDD(self._jsc.union(first, rest), self,
rdds[0]._jrdd_deserializer)
return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer)

def broadcast(self, value):
"""
Expand All @@ -590,8 +572,7 @@ def broadcast(self, value):
pickleSer = PickleSerializer()
pickled = pickleSer.dumps(value)
jbroadcast = self._jsc.broadcast(bytearray(pickled))
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars)

def accumulator(self, value, accum_param=None):
"""
Expand All @@ -609,8 +590,7 @@ def accumulator(self, value, accum_param=None):
elif isinstance(value, complex):
accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM
else:
raise Exception(
"No default accumulator param for type %s" % type(value))
raise Exception("No default accumulator param for type %s" % type(value))
SparkContext._next_accum_id += 1
return Accumulator(SparkContext._next_accum_id - 1, value, accum_param)

Expand Down Expand Up @@ -655,14 +635,12 @@ def addPyFile(self, path):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
# dirname may be directory or HDFS/S3 prefix
(dirname, filename) = os.path.split(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
# for tests in local mode
sys.path.append(
os.path.join(SparkFiles.getRootDirectory(), filename))
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))

def setCheckpointDir(self, dirName):
"""
Expand All @@ -676,8 +654,7 @@ def _getJavaStorageLevel(self, storageLevel):
Returns a Java StorageLevel based on a pyspark.StorageLevel.
"""
if not isinstance(storageLevel, StorageLevel):
raise Exception(
"storageLevel must be of type pyspark.StorageLevel")
raise Exception("storageLevel must be of type pyspark.StorageLevel")

newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
return newStorageLevel(storageLevel.useDisk,
Expand Down Expand Up @@ -780,15 +757,13 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(
partitions, self._gateway._gateway_client)
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

# Implementation note: This is implemented as a mapPartitions followed
# by runJob() in order to avoid having to pass a Python lambda into
# SparkContext#runJob.
mappedRDD = rdd.mapPartitions(partitionFunc)
it = self._jvm.PythonRDD.runJob(
self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))


Expand All @@ -800,8 +775,7 @@ def _test():
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
12 changes: 4 additions & 8 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@


def compute_real_exit_code(exit_code):
# SystemExit's code can be integer or string, but os._exit only accepts
# integers
# SystemExit's code can be integer or string, but os._exit only accepts integers
if isinstance(exit_code, numbers.Integral):
return exit_code
else:
Expand All @@ -44,8 +43,7 @@ def worker(sock):
"""
# Redirect stdout to stderr
os.dup2(2, 1)
# The sys.stdout object is different from file descriptor 1
sys.stdout = sys.stderr
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1

signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
Expand All @@ -64,8 +62,7 @@ def waitSocketClose(sock):

# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read
# side.
# otherwise writes also cause a seek that makes us miss data on the read side.
infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
exit_code = 0
Expand Down Expand Up @@ -110,8 +107,7 @@ def handle_sigchld(*args):
try:
pid, status = os.waitpid(0, os.WNOHANG)
if status != 0:
msg = "worker %s crashed abruptly with exit status %s" % (
pid, status)
msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
print >> sys.stderr, msg
except EnvironmentError as err:
if err.errno not in (ECHILD, EINTR):
Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ def launch_gateway():
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
submit_args = submit_args if submit_args is not None else ""
submit_args = shlex.split(submit_args)
command = [
os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(
command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)
Expand Down
Loading

0 comments on commit 91b7584

Please sign in to comment.