Skip to content

Commit

Permalink
clean up code
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and Ken Takagiwa committed Jul 20, 2014
1 parent bd20e17 commit 84a021f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 128 deletions.
41 changes: 21 additions & 20 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
from pyspark.storagelevel import *
from pyspark.rdd import RDD
from pyspark.context import SparkContext
from pyspark.streaming.dstream import DStream

from py4j.java_collections import ListConverter

from pyspark.streaming.dstream import DStream

class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
connection to a Spark cluster, and can be used to create L{DStream}s and
broadcast variables on that cluster.
"""

Expand Down Expand Up @@ -71,34 +71,35 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def actorStream(self, props, name, storageLevel, supervisorStrategy):
raise NotImplementedError

def addStreamingListener(self, streamingListener):
raise NotImplementedError
def start(self):
"""
Start the execution of the streams.
"""
self._jssc.start()

def awaitTermination(self, timeout=None):
"""
Wait for the execution to stop.
"""
if timeout:
self._jssc.awaitTermination(timeout)
else:
self._jssc.awaitTermination()

# start from simple one. storageLevel is not passed for now.
def socketTextStream(self, hostname, port):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded '\n' delimited
lines.
"""
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())

def start(self):
self._jssc.start()

def stop(self, stopSparkContext=True):
raise NotImplementedError

def textFileStream(self, directory):
"""
Create an input stream that monitors a Hadoop-compatible file system
for new files and reads them as text files. Files must be wrriten to the
monitored directory by "moving" them from another location within the same
file system. FIle names starting with . are ignored.
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def transform(self, seq):
raise NotImplementedError

def union(self, seq):
raise NotImplementedError

85 changes: 45 additions & 40 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from itertools import chain, ifilter, imap
import operator

import logging

from pyspark.serializers import NoOpSerializer,\
BatchedSerializer, CloudPickleSerializer, pack_long
from pyspark.rdd import _JavaStackTrace
Expand All @@ -25,64 +23,86 @@ def count(self):
"""
#TODO make sure count implementation, thiis different from what pyspark does
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))

def _sum(self):
"""
"""
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)

def print_(self):
"""
Since print is reserved name for python, we cannot make a print method function.
This function prints serialized data in RDD in DStream because Scala and Java cannot
deserialized pickled python object. Please use DStream.pyprint() instead to print result.
Call DStream.print().
"""
# print is a reserved name of Python. We cannot give print to function name
#hack to call print function in DStream
getattr(self._jdstream, "print")()

def pyprint(self):
"""
Print the first ten elements of each RDD generated in this DStream. This is an output
operator, so this DStream will be registered as an output stream and there materialized.
"""
self._jdstream.pyprint()

def filter(self, f):
"""
Return DStream containing only the elements that satisfy predicate.
"""
def func(iterator): return ifilter(f, iterator)
return self.mapPartitions(func)
return self._mapPartitions(func)

def flatMap(self, f, preservesPartitioning=False):
"""
Pass each value in the key-value pair DStream through flatMap function
without changing the keys: this also retains the original RDD's partition.
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
return self._mapPartitionsWithIndex(func, preservesPartitioning)

def map(self, f, preservesPartitioning=False):
def map(self, f):
"""
Return DStream by applying a function to each element of DStream.
"""
def func(iterator): return imap(f, iterator)
return self.mapPartitions(func)
#return PipelinedDStream(self, func, preservesPartitioning)
return self._mapPartitions(func)

def mapPartitions(self, f):
def _mapPartitions(self, f):
"""
Return a new DStream by applying a function to each partition of this DStream.
"""
def func(s, iterator): return f(iterator)
return self.mapPartitionsWithIndex(func)
return self._mapPartitionsWithIndex(func)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
Return a new DStream by applying a function to each partition of this DStream,
While tracking the index of the original partition.
"""
return PipelinedDStream(self, f, preservesPartitioning)

def reduce(self, func, numPartitions=None):

def reduceByKey(self, func, numPartitions=None):
"""
Merge the value for each key using an associative reduce function.
This will also perform the merging locally on each mapper before
sending resuls to reducer, similarly to a "combiner" in MapReduce.
Output will be hash-partitioned with C{numPartitions} partitions, or
the default parallelism level if C{numPartitions} is not specified.
"""
return self.combineByKey(lambda x:x, func, func, numPartitions)

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = None):
"""
Count the number of elements for each key, and return the result to the
master as a dictionary
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
Expand Down Expand Up @@ -148,42 +168,27 @@ def add_shuffle_key(split, iterator):
dstream._partitionFunc = partitionFunc
return dstream

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
"""
return PipelinedDStream(self, f, preservesPartitioning)

def _defaultReducePartitions(self):
"""
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
If spark.default.parallelism is set, then we'll use the value from SparkContext
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
be inherent.
"""
# hard code to avoid the error
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
return self.getNumPartitions()

return self._jdstream.partitions().size()

def _defaultReducePartitions(self):
def getNumPartitions(self):
"""
Return the number of partitions in RDD
"""
# hard code to avoid the error
if self.ctx._conf.contains("spark.default.parallelism"):
return self.ctx.defaultParallelism
else:
return self.getNumPartitions()

def getNumPartitions(self):
"""
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.getNumPartitions()
2
"""
return self._jdstream.partitions().size()
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
return 2


class PipelinedDStream(DStream):
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from pyspark.streaming import utils


class Duration(object):
"""
Duration for Spark Streaming application. Used to set duration
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/streaming/pyprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@

from pyspark.serializers import PickleSerializer


def collect(binary_file_path):
"""
Read pickled file written by SparkStreaming
"""
dse = PickleSerializer()
with open(binary_file_path, 'rb') as tempFile:
for item in dse.load_stream(tempFile):
yield item


def main():
try:
binary_file_path = sys.argv[1]
except:
print "Missed FilePath in argement"
print "Missed FilePath in argements"

if not binary_file_path:
return
Expand All @@ -43,5 +49,6 @@ def main():
print "..."
break


if __name__ =="__main__":
exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* operator, so this PythonDStream will be registered as an output stream and there materialized.
* This function is for PythonAPI.
*/

//TODO move this function to PythonDStream
def pyprint() = dstream.pyprint()

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ DStream[Array[Byte]](prev.ssc){
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* This is equivalent to following python code
* Since python operation is executed by Scala after StreamingContext.start.
* What PairwiseDStream does is equivalent to following python code in pySpark.
*
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
* partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
Expand Down

This file was deleted.

Loading

0 comments on commit 84a021f

Please sign in to comment.