Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 16aa64f commit e54f986
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
5 changes: 2 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,14 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.*")
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")

return gateway
13 changes: 7 additions & 6 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)

# Start py4j callback server
# Start py4j callback server.
# Callback sever is need only by SparkStreming; therefore the callback sever
# is started in StreamingContext.
SparkContext._gateway.restart_callback_server()
self._clean_up_trigger()
self._jvm = self._sc._jvm
Expand All @@ -78,6 +80,8 @@ def _clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""

def clean_up_handler(*args):
# Make sure stop callback server.
# This need improvement how to terminate callback sever properly.
SparkContext._gateway._shutdown_callback_server()
SparkContext._gateway.shutdown()
sys.exit(0)
Expand All @@ -100,7 +104,7 @@ def awaitTermination(self, timeout=None):
else:
self._jssc.awaitTermination(timeout)

# start from simple one. storageLevel is not passed for now.
#TODO: add storageLevel
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
Expand Down Expand Up @@ -134,7 +138,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for test.
This implementation is inpired by QueStream implementation.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
"""
test_rdds = list()
Expand All @@ -144,9 +148,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)

# if len(set(test_rdd_deserializers)) > 1:
# raise IOError("Deserializer should be one type to run test case. "
# "See the SparkContext.parallelize to understand how to decide deserializer")
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,17 @@ def checkpoint(self, interval):
return self

def groupByKey(self, numPartitions=None):
"""
Return a new DStream which contains group the values for each key in the
DStream into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions in
the DStream.
Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey will provide much
better performance.
"""
def createCombiner(x):
return [x]

Expand All @@ -359,6 +370,10 @@ def mergeCombiners(a, b):
numPartitions).mapValues(lambda x: ResultIterable(x))

def countByValue(self):
"""
Return new DStream which contains the count of each unique value in this
DStreeam as a (value, count) pairs.
"""
def countPartition(iterator):
counts = defaultdict(int)
for obj in iterator:
Expand All @@ -373,6 +388,9 @@ def mergeMaps(m1, m2):
return self.mapPartitions(countPartition).reduce(mergeMaps).flatMap(lambda x: x.items())

def saveAsTextFiles(self, prefix, suffix=None):
"""
Save this DStream as a text file, using string representations of elements.
"""

def saveAsTextFile(rdd, time):
path = rddToFileName(prefix, suffix, time)
Expand All @@ -381,6 +399,11 @@ def saveAsTextFile(rdd, time):
return self.foreachRDD(saveAsTextFile)

def saveAsPickledFiles(self, prefix, suffix=None):
"""
Save this DStream as a SequenceFile of serialized objects. The serializer
used is L{pyspark.serializers.PickleSerializer}, default batch size
is 10.
"""

def saveAsTextFile(rdd, time):
path = rddToFileName(prefix, suffix, time)
Expand Down Expand Up @@ -410,6 +433,7 @@ def saveAsTextFile(rdd, time):
# TODO: implement leftOuterJoin
# TODO: implemtnt rightOuterJoin


class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down
34 changes: 26 additions & 8 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
"""
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
Other option is separate this test case with other tests.
This makes sense becuase streaming tests takes long time due to waiting time
for stoping callback server.
This file will merged to tests.py. But for now, this file is separated due
to focusing to streaming test case
This file would be merged to tests.py after all functions are ready.
But for now, this file is separated due to focusing to streaming test case.
Callback server seems like unstable sometimes, which cause error in test case.
"""
from itertools import chain
Expand All @@ -43,10 +42,10 @@ def setUp(self):

def tearDown(self):
# Do not call pyspark.streaming.context.StreamingContext.stop directly because
# we do not wait to shutdowncall back server and py4j client
# we do not wait to shutdown call back server and py4j client
self.ssc._jssc.stop()
self.ssc._sc.stop()
# Why does it long time to terminaete StremaingContext and SparkContext?
# Why does it long time to terminate StremaingContext and SparkContext?
# Should we change the sleep time if this depends on machine spec?
time.sleep(10)

Expand All @@ -68,7 +67,7 @@ class TestBasicOperationsSuite(PySparkStreamingTestCase):
I am wondering if these test are enough or not.
All tests input should have list of lists. This represents stream.
Every batch interval, the first object of list are chosen to make DStream.
Please see the BasicTestSuits in Scala or QueStream which is close to this implementation.
Please see the BasicTestSuits in Scala which is close to this implementation.
"""
def setUp(self):
PySparkStreamingTestCase.setUp(self)
Expand Down Expand Up @@ -358,5 +357,24 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):

return self.result

class TestSaveAsFilesSuite(PySparkStreamingTestCase):
def setUp(self):
PySparkStreamingTestCase.setUp(self)
self.timeout = 10 # seconds
self.numInputPartitions = 2
self.result = list()

def tearDown(self):
PySparkStreamingTestCase.tearDown(self)

@classmethod
def tearDownClass(cls):
PySparkStreamingTestCase.tearDownClass()






if __name__ == "__main__":
unittest.main()

0 comments on commit e54f986

Please sign in to comment.