From 0e0245f2356bba84a4c7df90fcbcd7dc51448ede Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 4 Aug 2014 20:08:17 -0400 Subject: [PATCH] [SPARK-2627] undo erroneous whitespace fixes --- python/pyspark/context.py | 4 ++-- python/pyspark/tests.py | 45 ++++++++++++++++++--------------------- python/pyspark/worker.py | 3 +-- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 840925d3e31e9..4001ecab5ea00 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -59,8 +59,8 @@ class SparkContext(object): _writeToFile = None _next_accum_id = 0 _active_spark_context = None - _lock = Lock() # zip and egg files that need to be added to PYTHONPATH - _python_includes = None + _lock = Lock() + _python_includes = None # zip and egg files that need to be added to PYTHONPATH _default_batch_size_for_serialized_input = 10 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 199f66392d1ac..da580f3ecb3bb 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -539,8 +539,7 @@ def test_sequencefiles(self): ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/") - doubles = sorted( - self.sc.sequenceFile(basepath + "/sfdouble/").collect()) + doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect()) self.assertEqual(doubles, ed) ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))] @@ -725,25 +724,25 @@ def test_unbatched_save_and_read(self): self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile( basepath + "/unbatched/") - unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/", - batchSize=1).collect()) + unbatched_sequence = sorted(self.sc.sequenceFile( + basepath + "/unbatched/", + batchSize=1).collect()) self.assertEqual(unbatched_sequence, ei) - unbatched_hadoopFile = sorted( - self.sc.hadoopFile(basepath + "/unbatched/", - "org.apache.hadoop.mapred.SequenceFileInputFormat", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.Text", - batchSize=1).collect()) + unbatched_hadoopFile = sorted(self.sc.hadoopFile( + basepath + "/unbatched/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) self.assertEqual(unbatched_hadoopFile, ei) - unbatched_newAPIHadoopFile = sorted( - self.sc.newAPIHadoopFile( - basepath + "/unbatched/", - "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", - "org.apache.hadoop.io.IntWritable", - "org.apache.hadoop.io.Text", - batchSize=1).collect()) + unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile( + basepath + "/unbatched/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text", + batchSize=1).collect()) self.assertEqual(unbatched_newAPIHadoopFile, ei) oldconf = {"mapred.input.dir": basepath + "/unbatched/"} @@ -949,9 +948,8 @@ def test_module_dependency(self): |def myfunc(x): | return x + 1 """) - proc = subprocess.Popen( - [self.sparkSubmit, "--py-files", zip, script], - stdout=subprocess.PIPE) + proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script], + stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) @@ -969,10 +967,9 @@ def test_module_dependency_on_cluster(self): |def myfunc(x): | return x + 1 """) - proc = subprocess.Popen( - [self.sparkSubmit, "--py-files", zip, "--master", - "local-cluster[1,1,512]", script], - stdout=subprocess.PIPE) + proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master", + "local-cluster[1,1,512]", script], + stdout=subprocess.PIPE) out, err = proc.communicate() self.assertEqual(0, proc.returncode) self.assertIn("[2, 3, 4]", out) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5c06aad09329f..2770f63059853 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -57,8 +57,7 @@ def main(infile, outfile): SparkFiles._is_running_on_worker = True # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - # *.py files that were added will be copied here - sys.path.append(spark_files_dir) + sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) for _ in range(num_python_includes): filename = utf8_deserializer.loads(infile)