Skip to content

Commit

Permalink
[SPARK-2470] PEP8 fixes to context.py
Browse files Browse the repository at this point in the history
  • Loading branch information
nchammas committed Jul 20, 2014
1 parent f4e0039 commit ca2d28b
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer
PairDeserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
Expand All @@ -50,12 +50,11 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH

_python_includes = None # zip and egg files that need to be added to PYTHONPATH

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -138,8 +137,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._accumulatorServer = accumulators._start_update_server()
(host, port) = self._accumulatorServer.server_address
self._javaAccumulator = self._jsc.accumulator(
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))
self._jvm.java.util.ArrayList(),
self._jvm.PythonAccumulatorParam(host, port))

self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')

Expand All @@ -165,7 +164,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(dirname, filename) = os.path.split(path)
self._python_includes.append(filename)
sys.path.append(path)
if not dirname in sys.path:
if dirname not in sys.path:
sys.path.append(dirname)

# Create a temporary directory inside spark.local.dir:
Expand All @@ -192,15 +191,19 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
if SparkContext._active_spark_context and
SparkContext._active_spark_context != instance:
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
raise ValueError(
"Cannot run multiple SparkContexts at once; "
"existing SparkContext(app=%s, master=%s)"
" created by %s at %s:%s "
% (currentAppName, currentMaster,
callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down Expand Up @@ -290,7 +293,7 @@ def textFile(self, name, minPartitions=None):
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
Expand Down Expand Up @@ -584,11 +587,12 @@ def addPyFile(self, path):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix

if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
self._python_includes.append(filename)
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
# for tests in local mode
sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename))

def setCheckpointDir(self, dirName):
"""
Expand Down Expand Up @@ -649,9 +653,9 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result
in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
where HDFS may respond to Thread.interrupt() by marking nodes as dead.
in Thread.interrupt() being called on the job's executor threads. This is useful to help
ensure that the tasks are actually stopped in a timely manner, but is off by default due
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
"""
self._jsc.setJobGroup(groupId, description, interruptOnCancel)

Expand Down Expand Up @@ -688,7 +692,7 @@ def cancelAllJobs(self):
"""
self._jsc.sc().cancelAllJobs()

def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
"""
Executes the given partitionFunc on the specified set of partitions,
returning the result as an array of elements.
Expand All @@ -703,7 +707,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]
"""
if partitions == None:
if partitions is None:
partitions = range(rdd._jrdd.partitions().size())
javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client)

Expand All @@ -714,6 +718,7 @@ def runJob(self, rdd, partitionFunc, partitions = None, allowLocal = False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))


def _test():
import atexit
import doctest
Expand Down

0 comments on commit ca2d28b

Please sign in to comment.