Skip to content

Commit

Permalink
add optional argument 'mode' for rdd.pipe
Browse files Browse the repository at this point in the history
permissive - do not check returncode
strict - only allow returncode 0
grep - allow returncode 0 or 1
  • Loading branch information
megatron-me-uk committed Jun 30, 2015
1 parent a0c0161 commit 34fcdc3
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,25 @@ def groupBy(self, f, numPartitions=None):
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)

@ignore_unicode_prefix
def pipe(self, command, env={}):
def pipe(self, command, env={}, mode='permissive'):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']
"""
if mode == 'permissive':
def fail_condition(x):
return False
elif mode == 'strict':
def fail_condition(x):
return x == 0
elif mode == 'grep':
def fail_condition(x):
return x == 0 or x == 1
else:
raise ValueError("mode must be one of 'permissive', 'strict' or 'grep'.")

def func(iterator):
pipe = Popen(
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
Expand All @@ -707,7 +719,7 @@ def pipe_objs(out):

def check_return_code():
pipe.wait()
if pipe.returncode:
if fail_condition(pipe.returncode):
raise Exception("Pipe function `%s' exited "
"with error code %d" % (command, pipe.returncode))
else:
Expand Down

0 comments on commit 34fcdc3

Please sign in to comment.