From 91b7584a5c43700bd559ed29868af4fba97f26ea Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 4 Aug 2014 19:25:11 -0400 Subject: [PATCH] [SPARK-2627] undo unnecessary line breaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit “Undo” unnecessary line breaks introduced by accident when I called autopep8 on the whole Python directory without setting the max line length to 100. (autopep8 defaults to 79.) --- python/pyspark/accumulators.py | 12 +-- python/pyspark/conf.py | 3 +- python/pyspark/context.py | 104 ++++++++++--------------- python/pyspark/daemon.py | 12 +-- python/pyspark/java_gateway.py | 6 +- python/pyspark/mllib/_common.py | 30 +++---- python/pyspark/mllib/classification.py | 6 +- python/pyspark/mllib/clustering.py | 3 +- python/pyspark/mllib/linalg.py | 15 ++-- python/pyspark/mllib/random.py | 12 +-- python/pyspark/mllib/recommendation.py | 3 +- python/pyspark/mllib/regression.py | 6 +- python/pyspark/mllib/stat.py | 9 +-- python/pyspark/mllib/tests.py | 39 ++++------ python/pyspark/mllib/tree.py | 3 +- python/pyspark/mllib/util.py | 9 +-- python/pyspark/rdd.py | 16 ++-- python/pyspark/rddsampler.py | 6 +- python/pyspark/shell.py | 3 +- python/pyspark/sql.py | 6 +- python/pyspark/statcounter.py | 12 +-- python/pyspark/tests.py | 79 +++++++------------ python/pyspark/worker.py | 3 +- 23 files changed, 143 insertions(+), 254 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index f85e2501f9b03..f133cf6f7befc 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -97,8 +97,7 @@ pickleSer = PickleSerializer() # Holds accumulators registered on the current machine, keyed by ID. This is then used to send -# the local accumulator updates back to the driver program at the end of a -# task. +# the local accumulator updates back to the driver program at the end of a task. _accumulatorRegistry = {} @@ -141,16 +140,14 @@ def __reduce__(self): def value(self): """Get the accumulator's value; only usable in driver program""" if self._deserialized: - raise Exception( - "Accumulator.value cannot be accessed inside tasks") + raise Exception("Accumulator.value cannot be accessed inside tasks") return self._value @value.setter def value(self, value): """Sets the accumulator's value; only usable in driver program""" if self._deserialized: - raise Exception( - "Accumulator.value cannot be accessed inside tasks") + raise Exception("Accumulator.value cannot be accessed inside tasks") self._value = value def add(self, term): @@ -225,8 +222,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): def handle(self): from pyspark.accumulators import _accumulatorRegistry while not self.server.server_shutdown: - # Poll every 1 second for new data -- don't block in case of - # shutdown. + # Poll every 1 second for new data -- don't block in case of shutdown. r, _, _ = select.select([self.rfile], [], [], 1) if self.rfile in r: num_updates = read_int(self.rfile) diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index ebfc0ad003b3b..fb716f6753a45 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -125,8 +125,7 @@ def setSparkHome(self, value): def setExecutorEnv(self, key=None, value=None, pairs=None): """Set an environment variable to be passed to executors.""" if (key is not None and pairs is not None) or (key is None and pairs is None): - raise Exception( - "Either pass one key-value pair or a list of pairs") + raise Exception("Either pass one key-value pair or a list of pairs") elif key is not None: self._jconf.setExecutorEnv(key, value) elif pairs is not None: diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 3ecd9a58f5b74..840925d3e31e9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -59,8 +59,7 @@ class SparkContext(object): _writeToFile = None _next_accum_id = 0 _active_spark_context = None - _lock = Lock() - # zip and egg files that need to be added to PYTHONPATH + _lock = Lock() # zip and egg files that need to be added to PYTHONPATH _python_includes = None _default_batch_size_for_serialized_input = 10 @@ -101,15 +100,13 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, self._callsite = rdd._extract_concise_traceback() else: tempNamedTuple = namedtuple("Callsite", "function file linenum") - self._callsite = tempNamedTuple( - function=None, file=None, linenum=None) + self._callsite = tempNamedTuple(function=None, file=None, linenum=None) SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf) except: - # If an error occurs, clean up in order to allow future - # SparkContext creation: + # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise @@ -142,8 +139,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, if not self._conf.contains("spark.master"): raise Exception("A master URL must be set in your configuration") if not self._conf.contains("spark.app.name"): - raise Exception( - "An application name must be set in your configuration") + raise Exception("An application name must be set in your configuration") # Read back our properties from the conf in case we loaded some of them from # the classpath or an external config file @@ -184,8 +180,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self.addPyFile(path) # Deploy code dependencies set by spark-submit; these will already have been added - # with SparkContext.addFile, so we just need to add them to the - # PYTHONPATH + # with SparkContext.addFile, so we just need to add them to the PYTHONPATH for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) @@ -195,11 +190,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, sys.path.append(dirname) # Create a temporary directory inside spark.local.dir: - local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir( - self._jsc.sc().conf()) + local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ - self._jvm.org.apache.spark.util.Utils.createTempDir( - local_dir).getAbsolutePath() + self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() def _initialize_context(self, jconf): """ @@ -292,8 +285,7 @@ def parallelize(self, c, numSlices=None): # because it sends O(n) Py4J commands. As an alternative, serialized # objects are written to a file and loaded through textFile(). tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) - # Make sure we distribute data evenly if it's smaller than - # self.batchSize + # Make sure we distribute data evenly if it's smaller than self.batchSize if "__len__" not in dir(c): c = list(c) # Make it a list so we can compute its length batchSize = min(len(c) // numSlices, self._batchSize) @@ -412,10 +404,8 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, Java object. (default sc._default_batch_size_for_serialized_input) """ minSplits = minSplits or min(self.defaultParallelism, 2) - batchSize = max( - 1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if ( - batchSize > 1) else PickleSerializer() + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, keyConverter, valueConverter, minSplits, batchSize) return RDD(jrdd, self, ser) @@ -445,13 +435,11 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max( - 1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if ( - batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.newAPIHadoopFile( - self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -476,13 +464,11 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max( - 1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if ( - batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.newAPIHadoopRDD( - self._jsc, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -510,13 +496,11 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter= Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max( - 1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if ( - batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.hadoopFile( - self._jsc, path, inputFormatClass, keyClass, - valueClass, keyConverter, valueConverter, jconf, batchSize) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, @@ -541,12 +525,11 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, Java object. (default sc._default_batch_size_for_serialized_input) """ jconf = self._dictToJavaMap(conf) - batchSize = max( - 1, batchSize or self._default_batch_size_for_serialized_input) - ser = BatchedSerializer(PickleSerializer()) if ( - batchSize > 1) else PickleSerializer() - jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, - keyConverter, valueConverter, jconf, batchSize) + batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) + ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, + jconf, batchSize) return RDD(jrdd, self, ser) def _checkpointFile(self, name, input_deserializer): @@ -577,8 +560,7 @@ def union(self, rdds): first = rdds[0]._jrdd rest = [x._jrdd for x in rdds[1:]] rest = ListConverter().convert(rest, self._gateway._gateway_client) - return RDD(self._jsc.union(first, rest), self, - rdds[0]._jrdd_deserializer) + return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) def broadcast(self, value): """ @@ -590,8 +572,7 @@ def broadcast(self, value): pickleSer = PickleSerializer() pickled = pickleSer.dumps(value) jbroadcast = self._jsc.broadcast(bytearray(pickled)) - return Broadcast(jbroadcast.id(), value, jbroadcast, - self._pickled_broadcast_vars) + return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars) def accumulator(self, value, accum_param=None): """ @@ -609,8 +590,7 @@ def accumulator(self, value, accum_param=None): elif isinstance(value, complex): accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM else: - raise Exception( - "No default accumulator param for type %s" % type(value)) + raise Exception("No default accumulator param for type %s" % type(value)) SparkContext._next_accum_id += 1 return Accumulator(SparkContext._next_accum_id - 1, value, accum_param) @@ -655,14 +635,12 @@ def addPyFile(self, path): HTTP, HTTPS or FTP URI. """ self.addFile(path) - # dirname may be directory or HDFS/S3 prefix - (dirname, filename) = os.path.split(path) + (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): self._python_includes.append(filename) # for tests in local mode - sys.path.append( - os.path.join(SparkFiles.getRootDirectory(), filename)) + sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) def setCheckpointDir(self, dirName): """ @@ -676,8 +654,7 @@ def _getJavaStorageLevel(self, storageLevel): Returns a Java StorageLevel based on a pyspark.StorageLevel. """ if not isinstance(storageLevel, StorageLevel): - raise Exception( - "storageLevel must be of type pyspark.StorageLevel") + raise Exception("storageLevel must be of type pyspark.StorageLevel") newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel return newStorageLevel(storageLevel.useDisk, @@ -780,15 +757,13 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): """ if partitions is None: partitions = range(rdd._jrdd.partitions().size()) - javaPartitions = ListConverter().convert( - partitions, self._gateway._gateway_client) + javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) # Implementation note: This is implemented as a mapPartitions followed # by runJob() in order to avoid having to pass a Python lambda into # SparkContext#runJob. mappedRDD = rdd.mapPartitions(partitionFunc) - it = self._jvm.PythonRDD.runJob( - self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) + it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) @@ -800,8 +775,7 @@ def _test(): globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index b1b7f14da959b..e73538baf0b93 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -30,8 +30,7 @@ def compute_real_exit_code(exit_code): - # SystemExit's code can be integer or string, but os._exit only accepts - # integers + # SystemExit's code can be integer or string, but os._exit only accepts integers if isinstance(exit_code, numbers.Integral): return exit_code else: @@ -44,8 +43,7 @@ def worker(sock): """ # Redirect stdout to stderr os.dup2(2, 1) - # The sys.stdout object is different from file descriptor 1 - sys.stdout = sys.stderr + sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) @@ -64,8 +62,7 @@ def waitSocketClose(sock): # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because - # otherwise writes also cause a seek that makes us miss data on the read - # side. + # otherwise writes also cause a seek that makes us miss data on the read side. infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536) exit_code = 0 @@ -110,8 +107,7 @@ def handle_sigchld(*args): try: pid, status = os.waitpid(0, os.WNOHANG) if status != 0: - msg = "worker %s crashed abruptly with exit status %s" % ( - pid, status) + msg = "worker %s crashed abruptly with exit status %s" % (pid, status) print >> sys.stderr, msg except EnvironmentError as err: if err.errno not in (ECHILD, EINTR): diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2be2282f0c80b..37386ab0d7d49 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -39,14 +39,12 @@ def launch_gateway(): submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") submit_args = submit_args if submit_args is not None else "" submit_args = shlex.split(submit_args) - command = [ - os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args + command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen( - command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 1503c567f13ea..db341da85f865 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -35,8 +35,7 @@ # Check whether we have SciPy. MLlib works without it too, but if we have it, some methods, -# such as _dot and _serialize_double_vector, start to support scipy.sparse -# matrices. +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices. _have_scipy = False _scipy_issparse = None @@ -165,8 +164,7 @@ def _serialize_sparse_vector(v): header[1] = nonzeros _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32) values_offset = 9 + 4 * nonzeros - _copyto(v.values, buffer=ba, offset=values_offset, - shape=[nonzeros], dtype=float64) + _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64) return ba @@ -190,11 +188,9 @@ def _deserialize_double(ba, offset=0): True """ if type(ba) != bytearray: - raise TypeError( - "_deserialize_double called on a %s; wanted bytearray" % type(ba)) + raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba)) if len(ba) - offset != 8: - raise TypeError( - "_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb) + raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb) return struct.unpack("d", ba[offset:])[0] @@ -250,8 +246,7 @@ def _deserialize_sparse_vector(ba, offset=0): raise TypeError("_deserialize_sparse_vector called on bytearray " "with wrong length") indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32) - values = _deserialize_numpy_array( - [nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) + values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64) return SparseVector(int(size), indices, values) @@ -330,8 +325,7 @@ def _deserialize_labeled_point(ba, offset=0): if type(ba) != bytearray: raise TypeError("Expecting a bytearray but got %s" % type(ba)) if ba[offset] != LABELED_POINT_MAGIC: - raise TypeError("Expecting magic number %d but got %d" % - (LABELED_POINT_MAGIC, ba[0])) + raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0])) label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0] features = _deserialize_double_vector(ba, offset + 9) return LabeledPoint(label, features) @@ -345,8 +339,7 @@ def _copyto(array, buffer, offset, shape, dtype): TODO: In the future this could use numpy.copyto on NumPy 1.7+, but we should benchmark that to see whether it provides a benefit. """ - temp_array = ndarray( - shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C') + temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C') temp_array[...] = array @@ -403,8 +396,7 @@ def _linear_predictor_typecheck(x, coeffs): elif isinstance(x, RDD): raise RuntimeError("Bulk predict not yet supported.") else: - raise TypeError( - "Argument of type " + type(x).__name__ + " unsupported") + raise TypeError("Argument of type " + type(x).__name__ + " unsupported") # If we weren't given initial weights, take a zero vector of the appropriate @@ -500,8 +492,7 @@ def _convert_vector(vec): assert vec.shape[1] == 1, "Expected column vector" csc = vec.tocsc() return SparseVector(vec.shape[0], csc.indices, csc.data) - raise TypeError( - "Expected NumPy array, SparseVector, or scipy.sparse matrix") + raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix") def _squared_distance(v1, v2): @@ -554,8 +545,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 2e31665105c62..a85abbcd02c79 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -190,8 +190,7 @@ def train(cls, data, lambda_=1.0): """ sc = data.context dataBytes = _get_unmangled_labeled_point_rdd(data) - ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes( - dataBytes._jrdd, lambda_) + ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) return NaiveBayesModel( _deserialize_double_vector(ans[0]), _deserialize_double_vector(ans[1]), @@ -202,8 +201,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 7e974d7186bf7..a0630d1d5c58b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -99,8 +99,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 41b1f2976893b..9a239abfbbeb1 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -60,8 +60,7 @@ def __init__(self, size, *args): self.indices = array([p[0] for p in pairs], dtype=int32) self.values = array([p[1] for p in pairs], dtype=float64) else: - assert len(args[0]) == len( - args[1]), "index and value arrays not same length" + assert len(args[0]) == len(args[1]), "index and value arrays not same length" self.indices = array(args[0], dtype=int32) self.values = array(args[1], dtype=float64) for i in xrange(len(self.indices) - 1): @@ -90,12 +89,10 @@ def dot(self, other): result += self.values[i] * other[self.indices[i]] return result elif other.ndim == 2: - results = [self.dot(other[:, i]) - for i in xrange(other.shape[1])] + results = [self.dot(other[:, i]) for i in xrange(other.shape[1])] return array(results) else: - raise Exception( - "Cannot call dot with %d-dimensional array" % other.ndim) + raise Exception("Cannot call dot with %d-dimensional array" % other.ndim) else: result = 0.0 i, j = 0, 0 @@ -171,8 +168,7 @@ def __str__(self): def __repr__(self): inds = self.indices vals = self.values - entries = ", ".join( - ["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) + entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))]) return "SparseVector({0}, {{{1}}})".format(self.size, entries) def __eq__(self, other): @@ -262,8 +258,7 @@ def _test(): if __name__ == "__main__": # remove current path from list of search paths to avoid importing mllib.random - # for C{import random}, which is done in an external dependency of pyspark - # during doctests. + # for C{import random}, which is done in an external dependency of pyspark during doctests. import sys sys.path.pop(0) _test() diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 1b51da913c4b3..eb496688b6eef 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -54,8 +54,7 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): >>> parts == sc.defaultParallelism True """ - jrdd = sc._jvm.PythonMLLibAPI().uniformRDD( - sc._jsc, size, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed) uniform = RDD(jrdd, sc, NoOpSerializer()) return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -79,8 +78,7 @@ def normalRDD(sc, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - 1.0) < 0.1 True """ - jrdd = sc._jvm.PythonMLLibAPI().normalRDD( - sc._jsc, size, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed) normal = RDD(jrdd, sc, NoOpSerializer()) return normal.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -101,8 +99,7 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None): >>> abs(stats.stdev() - sqrt(mean)) < 0.5 True """ - jrdd = sc._jvm.PythonMLLibAPI().poissonRDD( - sc._jsc, mean, size, numPartitions, seed) + jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed) poisson = RDD(jrdd, sc, NoOpSerializer()) return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes))) @@ -177,8 +174,7 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index c398b008fda54..e863fc249ec36 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -81,8 +81,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index c23ec7e44ca6d..d8792cf44872f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -44,8 +44,7 @@ def __init__(self, label, features): elif type(features) == list: self.features = array(features) else: - raise TypeError( - "Expected NumPy array, list, SparseVector, or scipy.sparse matrix") + raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") def __str__(self): return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")" @@ -248,8 +247,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index dbf2a03bb3a73..982906b9d09f0 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -80,15 +80,13 @@ def corr(x, y=None, method=None): try: Xser = _get_unmangled_double_vector_rdd(x) except TypeError: - raise TypeError( - "corr called on a single RDD not consisted of Vectors.") + raise TypeError("corr called on a single RDD not consisted of Vectors.") resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method) return _deserialize_double_matrix(resultMat) else: xSer = _get_unmangled_rdd(x, _serialize_double) ySer = _get_unmangled_rdd(y, _serialize_double) - result = sc._jvm.PythonMLLibAPI().corr( - xSer._jrdd, ySer._jrdd, method) + result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method) return result @@ -97,8 +95,7 @@ def _test(): from pyspark import SparkContext globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index a427f7c820200..6f3ec8ac94bac 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -47,12 +47,9 @@ def test_serialize(self): self.assertTrue(sv is _convert_vector(sv)) self.assertTrue(dv is _convert_vector(dv)) self.assertTrue(array_equal(dv, _convert_vector(lst))) - self.assertEquals( - sv, _deserialize_double_vector(_serialize_double_vector(sv))) - self.assertTrue( - array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) - self.assertTrue( - array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv)))) + self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst)))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -65,11 +62,9 @@ def test_dot(self): self.assertEquals(10.0, _dot(sv, dv)) self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat))) self.assertEquals(30.0, _dot(dv, dv)) - self.assertTrue( - array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) + self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat))) self.assertEquals(30.0, _dot(lst, dv)) - self.assertTrue( - array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) + self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat))) def test_squared_distance(self): sv = SparseVector(4, {1: 1, 3: 2}) @@ -101,8 +96,7 @@ def test_clustering(self): [1.1, 0], [1.2, 0], ] - clusters = KMeans.train( - self.sc.parallelize(data), 2, initializationMode="k-means||") + clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) @@ -178,8 +172,7 @@ def test_regression(self): categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories dt_model = \ - DecisionTree.trainRegressor( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) @@ -205,14 +198,10 @@ def test_serialize(self): self.assertEquals(sv, _convert_vector(lil.tocoo())) self.assertEquals(sv, _convert_vector(lil.tocsr())) self.assertEquals(sv, _convert_vector(lil.todok())) - self.assertEquals( - sv, _deserialize_double_vector(_serialize_double_vector(lil))) - self.assertEquals( - sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) - self.assertEquals( - sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) - self.assertEquals( - sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr()))) + self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok()))) def test_dot(self): from scipy.sparse import lil_matrix @@ -256,8 +245,7 @@ def test_clustering(self): self.scipy_matrix(3, {2: 1.0}), self.scipy_matrix(3, {2: 1.1}) ] - clusters = KMeans.train( - self.sc.parallelize(data), 2, initializationMode="k-means||") + clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||") self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) @@ -331,8 +319,7 @@ def test_regression(self): self.assertTrue(rr_model.predict(features[3]) > 0) categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories - dt_model = DecisionTree.trainRegressor( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + dt_model = DecisionTree.trainRegressor(rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index 5713fa7be0297..2518001ea0b93 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -218,8 +218,7 @@ def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 75754286cd622..4962d05491c03 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -127,8 +127,7 @@ def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None): parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l)) if numFeatures <= 0: parsed.cache() - numFeatures = parsed.map( - lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 + numFeatures = parsed.map(lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) @staticmethod @@ -183,8 +182,7 @@ def loadLabeledPoints(sc, path, minPartitions=None): (0.0,[1.01,2.02,3.03]) """ minPartitions = minPartitions or min(sc.defaultParallelism, 2) - jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints( - sc._jsc, path, minPartitions) + jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) serialized = RDD(jSerialized, sc, NoOpSerializer()) return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) @@ -196,8 +194,7 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod( - globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 374535587cfde..30b834d2085cd 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1078,9 +1078,10 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl jconf = self.ctx._dictToJavaMap(conf) pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) - self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile( - pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) + self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, jconf) def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ @@ -1125,10 +1126,11 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No jconf = self.ctx._dictToJavaMap(conf) pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) - self.ctx._jvm.PythonRDD.saveAsHadoopFile( - pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, - jconf, compressionCodecClass) + self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, + jconf, compressionCodecClass) def saveAsSequenceFile(self, path, compressionCodecClass=None): """ diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 35940191c70a3..55e247da0e4dc 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -31,8 +31,7 @@ def __init__(self, withReplacement, seed=None): "Falling back to default random generator for sampling.") self._use_numpy = False - self._seed = seed if seed is not None else random.randint( - 0, sys.maxint) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None self._split = None @@ -87,8 +86,7 @@ def getPoissonSample(self, split, mean): def shuffle(self, vals): if self._random is None: - # this should only ever called on the master so - self.initRandomGenerator(0) + self.initRandomGenerator(0) # this should only ever called on the master so # the split does not matter if self._use_numpy: diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index be22020fb827b..e1e7cd954189f 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -39,8 +39,7 @@ if os.environ.get("ADD_FILES") is not None else None) if os.environ.get("SPARK_EXECUTOR_URI"): - SparkContext.setSystemProperty( - "spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) + SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) sc = SparkContext(appName="PySparkShell", pyFiles=add_files) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 059cf1633a86f..293af6183e9cf 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -983,8 +983,7 @@ def registerFunction(self, name, f, returnType=StringType()): includes = ListConverter().convert(self._sc._python_includes, self._sc._gateway._gateway_client) self._ssql_ctx.registerPython(name, - bytearray( - CloudPickleSerializer().dumps(command)), + bytearray(CloudPickleSerializer().dumps(command)), env, includes, self._sc.pythonExec, @@ -1530,8 +1529,7 @@ def registerTempTable(self, name): self._jschema_rdd.registerTempTable(name) def registerAsTable(self, name): - warnings.warn( - "Use registerTempTable instead of registerAsTable.", DeprecationWarning) + warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning) self.registerTempTable(name) def insertInto(self, tableName, overwrite=False): diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py index 8b59d62bb40c3..1e597d64e03fe 100644 --- a/python/pyspark/statcounter.py +++ b/python/pyspark/statcounter.py @@ -51,15 +51,13 @@ def merge(self, value): return self - # Merge another StatCounter into this one, adding up the internal - # statistics. + # Merge another StatCounter into this one, adding up the internal statistics. def mergeStats(self, other): if not isinstance(other, StatCounter): raise Exception("Can only merge Statcounters!") if other is self: # reference equality holds - # Avoid overwriting fields in a weird order - self.merge(copy.deepcopy(other)) + self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: if self.n == 0: self.mu = other.mu @@ -75,14 +73,12 @@ def mergeStats(self, other): elif self.n * 10 < other.n: self.mu = other.mu - (delta * self.n) / (self.n + other.n) else: - self.mu = ( - self.mu * self.n + other.mu * other.n) / (self.n + other.n) + self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) self.maxValue = maximum(self.maxValue, other.maxValue) self.minValue = minimum(self.minValue, other.minValue) - self.m2 += other.m2 + \ - (delta * delta * self.n * other.n) / (self.n + other.n) + self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n return self diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ec2553ae46649..199f66392d1ac 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -227,12 +227,10 @@ def test_add_egg_file_locally(self): def func(): from userlib import UserClass self.assertRaises(ImportError, func) - path = os.path.join( - SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") + path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") self.sc.addPyFile(path) from userlib import UserClass - self.assertEqual( - "Hello World from inside a package!", UserClass().hello()) + self.assertEqual("Hello World from inside a package!", UserClass().hello()) class TestRDDFunctions(PySparkTestCase): @@ -240,8 +238,7 @@ class TestRDDFunctions(PySparkTestCase): def test_failed_sparkcontext_creation(self): # Regression test for SPARK-1550 self.sc.stop() - self.assertRaises( - Exception, lambda: SparkContext("an-invalid-master-name")) + self.assertRaises(Exception, lambda: SparkContext("an-invalid-master-name")) self.sc = SparkContext("local") def test_save_as_textfile_with_unicode(self): @@ -338,8 +335,7 @@ def setUp(self): PySparkTestCase.setUp(self) self.tempdir = tempfile.NamedTemporaryFile(delete=False) os.unlink(self.tempdir.name) - self.sc._jvm.WriteInputFormatTestDataGenerator.generateData( - self.tempdir.name, self.sc._jsc) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) def tearDown(self): PySparkTestCase.tearDown(self) @@ -350,15 +346,13 @@ def test_sequencefiles(self): ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), - (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", "org.apache.hadoop.io.DoubleWritable", "org.apache.hadoop.io.Text").collect()) - ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), - (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.assertEqual(doubles, ed) bytes = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbytes/", @@ -386,8 +380,7 @@ def test_sequencefiles(self): bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.BooleanWritable").collect()) - eb = [(1, False), (1, True), (2, False), - (2, False), (2, True), (3, True)] + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] self.assertEqual(bools, eb) nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", @@ -447,8 +440,7 @@ def test_oldhadoop(self): "org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), - (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") @@ -467,8 +459,7 @@ def test_newhadoop(self): "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", "org.apache.hadoop.io.IntWritable", "org.apache.hadoop.io.Text").collect()) - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), - (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] self.assertEqual(ints, ei) hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") @@ -541,21 +532,18 @@ def tearDown(self): def test_sequencefiles(self): basepath = self.tempdir.name - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), - (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] self.sc.parallelize(ei).saveAsSequenceFile(basepath + "/sfint/") ints = sorted(self.sc.sequenceFile(basepath + "/sfint/").collect()) self.assertEqual(ints, ei) - ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), - (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/") doubles = sorted( self.sc.sequenceFile(basepath + "/sfdouble/").collect()) self.assertEqual(doubles, ed) - ebs = [(1, bytearray(b'\x00\x07spam\x08')), - (2, bytearray(b'\x00\x07spam\x08'))] + ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))] self.sc.parallelize(ebs).saveAsSequenceFile(basepath + "/sfbytes/") bytes = sorted(self.sc.sequenceFile(basepath + "/sfbytes/").collect()) self.assertEqual(bytes, ebs) @@ -567,8 +555,7 @@ def test_sequencefiles(self): text = sorted(self.sc.sequenceFile(basepath + "/sftext/").collect()) self.assertEqual(text, et) - eb = [(1, False), (1, True), (2, False), - (2, False), (2, True), (3, True)] + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] self.sc.parallelize(eb).saveAsSequenceFile(basepath + "/sfbool/") bools = sorted(self.sc.sequenceFile(basepath + "/sfbool/").collect()) self.assertEqual(bools, eb) @@ -686,8 +673,7 @@ def test_converters(self): "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat", keyConverter="org.apache.spark.api.python.TestOutputKeyConverter", valueConverter="org.apache.spark.api.python.TestOutputValueConverter") - converted = sorted( - self.sc.sequenceFile(basepath + "/converters/").collect()) + converted = sorted(self.sc.sequenceFile(basepath + "/converters/").collect()) expected = [(u'1', 3.0), (u'2', 1.0), (u'3', 2.0)] @@ -700,21 +686,19 @@ def test_reserialization(self): data = zip(x, y) rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y)) rdd.saveAsSequenceFile(basepath + "/reserialize/sequence") - result1 = sorted( - self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) + result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect()) self.assertEqual(result1, data) - rdd.saveAsHadoopFile(basepath + "/reserialize/hadoop", - "org.apache.hadoop.mapred.SequenceFileOutputFormat") - result2 = sorted( - self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) + rdd.saveAsHadoopFile( + basepath + "/reserialize/hadoop", + "org.apache.hadoop.mapred.SequenceFileOutputFormat") + result2 = sorted(self.sc.sequenceFile(basepath + "/reserialize/hadoop").collect()) self.assertEqual(result2, data) rdd.saveAsNewAPIHadoopFile( basepath + "/reserialize/newhadoop", "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") - result3 = sorted( - self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) + result3 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newhadoop").collect()) self.assertEqual(result3, data) conf4 = { @@ -723,8 +707,7 @@ def test_reserialization(self): "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", "mapred.output.dir": basepath + "/reserialize/dataset"} rdd.saveAsHadoopDataset(conf4) - result4 = sorted( - self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) + result4 = sorted(self.sc.sequenceFile(basepath + "/reserialize/dataset").collect()) self.assertEqual(result4, data) conf5 = {"mapreduce.outputformat.class": @@ -733,14 +716,12 @@ def test_reserialization(self): "mapred.output.value.class": "org.apache.hadoop.io.IntWritable", "mapred.output.dir": basepath + "/reserialize/newdataset"} rdd.saveAsNewAPIHadoopDataset(conf5) - result5 = sorted( - self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) + result5 = sorted(self.sc.sequenceFile(basepath + "/reserialize/newdataset").collect()) self.assertEqual(result5, data) def test_unbatched_save_and_read(self): basepath = self.tempdir.name - ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), - (2, u'bb'), (2, u'bb'), (3, u'cc')] + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile( basepath + "/unbatched/") @@ -897,8 +878,7 @@ class TestSparkSubmit(unittest.TestCase): def setUp(self): self.programDir = tempfile.mkdtemp() - self.sparkSubmit = os.path.join( - os.environ.get("SPARK_HOME"), "bin", "spark-submit") + self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit") def tearDown(self): shutil.rmtree(self.programDir) @@ -935,8 +915,7 @@ def test_single_script(self): |sc = SparkContext() |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() """) - proc = subprocess.Popen( - [self.sparkSubmit, script], stdout=subprocess.PIPE) + proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 4, 6]", out) @@ -952,8 +931,7 @@ def test_script_with_local_functions(self): |sc = SparkContext() |print sc.parallelize([1, 2, 3]).map(foo).collect() """) - proc = subprocess.Popen( - [self.sparkSubmit, script], stdout=subprocess.PIPE) + proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[3, 6, 9]", out) @@ -971,8 +949,9 @@ def test_module_dependency(self): |def myfunc(x): | return x + 1 """) - proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script], - stdout=subprocess.PIPE) + proc = subprocess.Popen( + [self.sparkSubmit, "--py-files", zip, script], + stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 14a63256b551f..5c06aad09329f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -56,8 +56,7 @@ def main(infile, outfile): SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True - # fetch names of includes (*.zip and *.egg files) and construct - # PYTHONPATH + # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH # *.py files that were added will be copied here sys.path.append(spark_files_dir) num_python_includes = read_int(infile)