From bfb9f9f12fa04910ebc5f53e6a5538d8341139b8 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Mon, 4 Aug 2014 17:20:14 -0400 Subject: [PATCH] [SPARK-2627] keep up with the PEP 8 fixes --- python/pyspark/daemon.py | 3 +-- python/pyspark/serializers.py | 11 ++++++----- python/pyspark/tests.py | 9 ++++++--- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 03771de91afe6..b1b7f14da959b 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -138,8 +138,7 @@ def handle_sigchld(*args): try: os.kill(worker_pid, signal.SIGKILL) except OSError: - pass # process already died - + pass # process already died if listen_sock in ready_fds: sock, addr = listen_sock.accept() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d1628d87644ba..5b6fa2c609537 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -293,6 +293,7 @@ def _hack_namedtuple(cls): """ Make class generated by namedtuple picklable """ name = cls.__name__ fields = cls._fields + def __reduce__(self): return (_restore, (name, fields, tuple(self))) cls.__reduce__ = __reduce__ @@ -301,11 +302,11 @@ def __reduce__(self): def _hijack_namedtuple(): """ Hack namedtuple() to make it picklable """ - global _old_namedtuple # or it will put in closure + global _old_namedtuple # or it will put in closure def _copy_func(f): return types.FunctionType(f.func_code, f.func_globals, f.func_name, - f.func_defaults, f.func_closure) + f.func_defaults, f.func_closure) _old_namedtuple = _copy_func(collections.namedtuple) @@ -323,9 +324,9 @@ def namedtuple(name, fields, verbose=False, rename=False): # so only hack those in __main__ module for n, o in sys.modules["__main__"].__dict__.iteritems(): if (type(o) is type and o.__base__ is tuple - and hasattr(o, "_fields") - and "__reduce__" not in o.__dict__): - _hack_namedtuple(o) # hack inplace + and hasattr(o, "_fields") + and "__reduce__" not in o.__dict__): + _hack_namedtuple(o) # hack inplace _hijack_namedtuple() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 69eeaaa830b84..ec2553ae46649 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -840,12 +840,15 @@ def test_termination_sigterm(self): class TestWorker(PySparkTestCase): + def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) temp.close() path = temp.name + def sleep(x): - import os, time + import os + import time with open(path, 'w') as f: f.write("%d %d" % (os.getppid(), os.getpid())) time.sleep(100) @@ -875,7 +878,7 @@ def run(): os.kill(worker_pid, 0) time.sleep(0.1) except OSError: - break # worker was killed + break # worker was killed else: self.fail("worker has not been killed after 5 seconds") @@ -885,7 +888,7 @@ def run(): self.fail("daemon had been killed") def test_fd_leak(self): - N = 1100 # fd limit is 1024 by default + N = 1100 # fd limit is 1024 by default rdd = self.sc.parallelize(range(N), N) self.assertEquals(N, rdd.count())