Skip to content

Commit

Permalink
more efficient saveAsTextFile() (#67)
Browse files Browse the repository at this point in the history
* more efficient saveAsTextFile()
* make test more stable
* convert a debug message to info
  • Loading branch information
svenkreiss committed Jun 18, 2017
1 parent 31cdfa0 commit ddc0d7d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/sphinx/read_write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ CSV
def csv_row(data):
s = io.StringIO()
csv.writer(s).writerow(data)
return s.getvalue()
return s.getvalue()[:-1]
(
rdd
Expand Down
17 changes: 8 additions & 9 deletions pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ def _run_task(task_context, rdd, func, partition):
"""
task_context.attempt_number += 1

log.debug('Running stage {} for partition {} of {}.'
''.format(task_context.stage_id,
task_context.partition_id,
rdd.name()))
log.info('Running stage {} for partition {} of {}.'
''.format(task_context.stage_id,
task_context.partition_id,
rdd.name()))

try:
return func(task_context, rdd.compute(partition, task_context))
except Exception:
log.warn('Attempt {} failed for partition {} of {}.'
log.warn('Attempt {} failed for partition {} of {}: {}'
''.format(task_context.attempt_number, partition.index,
rdd.name()))
traceback.print_exc()
rdd.name(), traceback.format_exc()))

if task_context.attempt_number == task_context.max_retries:
log.error('Partition {} of {} failed.'
Expand Down Expand Up @@ -338,8 +337,8 @@ def prepare(partition):
cm_serialized,
)

for d in self._pool.map(runJob_map,
(prepare(p) for p in partitions)):
prepared_partitions = (prepare(p) for p in partitions)
for d in self._pool.map(runJob_map, prepared_partitions):
t_start = time.clock()
map_result, cache_result, s = self._data_deserializer(d)
self._stats['driver_deserialize_data'] += (time.clock() -
Expand Down
37 changes: 20 additions & 17 deletions pysparkling/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,31 +1302,34 @@ def saveAsTextFile(self, path, compressionCodecClass=None):
"""
if fileio.TextFile(path).exists():
raise FileAlreadyExistsException(
'Output {0} already exists.'.format(path)
)
'Output {0} already exists.'.format(path))

def to_stringio(data):
stringio = io.StringIO()
for line in data:
stringio.write('{}\n'.format(line))
stringio.seek(0)
return stringio

# fast single-file write for single partition RDDs
if self.getNumPartitions() == 1:
fileio.TextFile(path).dump(to_stringio(self.collect()))
return self

codec_suffix = ''
if path.endswith(tuple('.' + ending
for endings, _ in fileio.codec.FILE_ENDINGS
for ending in endings)):
codec_suffix = path[path.rfind('.'):]

if self.getNumPartitions() == 1:
fileio.TextFile(
path
).dump(io.StringIO(''.join([
'{}\n'.format(xx) for xx in self.toLocalIterator()
])))
return self

self.context.runJob(
self,
lambda tc, x: fileio.TextFile(
os.path.join(path, 'part-{0:05d}{1}'.format(tc.partitionId(),
codec_suffix))
).dump(io.StringIO(''.join([
'{}\n'.format(xx) for xx in x
]))),
self.mapPartitions(to_stringio),
lambda tc, stringio:
fileio.TextFile(os.path.join(path,
'part-{0:05d}{1}'.format(
tc.partitionId(),
codec_suffix))
).dump(stringio),
resultHandler=list,
)
fileio.TextFile(os.path.join(path, '_SUCCESS')).dump()
Expand Down
6 changes: 6 additions & 0 deletions tests/test_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import print_function

import logging
import pysparkling
import unittest

Expand Down Expand Up @@ -51,3 +52,8 @@ def test_union(self):

def test_version(self):
self.assertTrue(isinstance(pysparkling.Context().version, str))


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Context().test_retry()
4 changes: 2 additions & 2 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ def test_zipWithIndex(self):

def test_cache(self):
r = self.sc.parallelize(range(3), 3)
r = r.map(lambda _: time.sleep(0.1)).cache()
r = r.map(lambda _: time.sleep(0.5)).cache()
r.collect()

start = time.time()
r.collect()
self.assertLess(time.time() - start, 0.1)
self.assertLess(time.time() - start, 0.5)


class ProcessPoolIdlePerformance(unittest.TestCase):
Expand Down

0 comments on commit ddc0d7d

Please sign in to comment.