Skip to content

Commit

Permalink
[SPARK-2627] undo erroneous whitespace fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nchammas committed Aug 5, 2014
1 parent bf30942 commit 0e0245f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 28 deletions.
4 changes: 2 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class SparkContext(object):
_writeToFile = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock() # zip and egg files that need to be added to PYTHONPATH
_python_includes = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
_default_batch_size_for_serialized_input = 10

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
Expand Down
45 changes: 21 additions & 24 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,7 @@ def test_sequencefiles(self):

ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
self.sc.parallelize(ed).saveAsSequenceFile(basepath + "/sfdouble/")
doubles = sorted(
self.sc.sequenceFile(basepath + "/sfdouble/").collect())
doubles = sorted(self.sc.sequenceFile(basepath + "/sfdouble/").collect())
self.assertEqual(doubles, ed)

ebs = [(1, bytearray(b'\x00\x07spam\x08')), (2, bytearray(b'\x00\x07spam\x08'))]
Expand Down Expand Up @@ -725,25 +724,25 @@ def test_unbatched_save_and_read(self):
self.sc.parallelize(ei, numSlices=len(ei)).saveAsSequenceFile(
basepath + "/unbatched/")

unbatched_sequence = sorted(self.sc.sequenceFile(basepath + "/unbatched/",
batchSize=1).collect())
unbatched_sequence = sorted(self.sc.sequenceFile(
basepath + "/unbatched/",
batchSize=1).collect())
self.assertEqual(unbatched_sequence, ei)

unbatched_hadoopFile = sorted(
self.sc.hadoopFile(basepath + "/unbatched/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
unbatched_hadoopFile = sorted(self.sc.hadoopFile(
basepath + "/unbatched/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_hadoopFile, ei)

unbatched_newAPIHadoopFile = sorted(
self.sc.newAPIHadoopFile(
basepath + "/unbatched/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
unbatched_newAPIHadoopFile = sorted(self.sc.newAPIHadoopFile(
basepath + "/unbatched/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text",
batchSize=1).collect())
self.assertEqual(unbatched_newAPIHadoopFile, ei)

oldconf = {"mapred.input.dir": basepath + "/unbatched/"}
Expand Down Expand Up @@ -949,9 +948,8 @@ def test_module_dependency(self):
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--py-files", zip, script],
stdout=subprocess.PIPE)
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
Expand All @@ -969,10 +967,9 @@ def test_module_dependency_on_cluster(self):
|def myfunc(x):
| return x + 1
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--py-files", zip, "--master",
"local-cluster[1,1,512]", script],
stdout=subprocess.PIPE)
proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
"local-cluster[1,1,512]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out)
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def main(infile, outfile):
SparkFiles._is_running_on_worker = True

# fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
# *.py files that were added will be copied here
sys.path.append(spark_files_dir)
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
filename = utf8_deserializer.loads(infile)
Expand Down

0 comments on commit 0e0245f

Please sign in to comment.