Skip to content

Commit

Permalink
initial commit for pySparkStreaming
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 78d4220 commit 454981d
Show file tree
Hide file tree
Showing 16 changed files with 1,043 additions and 5 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<<<<<<< HEAD
<version>1.2.0-SNAPSHOT</version>
=======
<version>1.0.0</version>
>>>>>>> initial commit for pySparkStreaming
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
private[spark] class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object PythonRunner {
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
env.put("PYSPARK_PYTHON", pythonExec)
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
Expand Down
22 changes: 22 additions & 0 deletions examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import sys
from operator import add

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
exit(-1)
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))

lines = ssc.textFileStream(sys.argv[1])
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
ssc.start()
ssc.awaitTermination()
3 changes: 3 additions & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__author__ = 'ktakagiw'
133 changes: 133 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
__author__ = 'ktakagiw'


#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import os
import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile

from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.context import SparkContext

from py4j.java_collections import ListConverter

from pyspark.streaming.dstream import DStream

class StreamingContext(object):
"""
Main entry point for Spark functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
broadcast variables on that cluster.
"""

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
gateway=None, duration=None):
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
@param master: Cluster URL to connect to
(e.g. mesos://host:port, spark://host:port, local[4]).
@param appName: A name for your job, to display on the cluster web UI.
@param sparkHome: Location where Spark is installed on cluster nodes.
@param pyFiles: Collection of .zip or .py files to send to the cluster
and add to PYTHONPATH. These can be paths on the local file
system or HDFS, HTTP, HTTPS, or FTP URLs.
@param environment: A dictionary of environment variables to set on
worker nodes.
@param batchSize: The number of Python objects represented as a single
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
@param duration: A L{Duration} Duration for SparkStreaming
"""
# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)

# Initialize StremaingContext in function to allow subclass specific initialization
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

def actorStream(self, props, name, storageLevel, supervisorStrategy):
raise NotImplementedError

def addStreamingListener(self, streamingListener):
raise NotImplementedError

def awaitTermination(self, timeout=None):
if timeout:
self._jssc.awaitTermination(timeout)
else:
self._jssc.awaitTermination()

def checkpoint(self, directory):
raise NotImplementedError

def fileStream(self, directory, filter=None, newFilesOnly=None):
raise NotImplementedError

def networkStream(self, receiver):
raise NotImplementedError

def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
raise NotImplementedError

def rawSocketStream(self, hostname, port, storagelevel):
raise NotImplementedError

def remember(self, duration):
raise NotImplementedError

def socketStream(hostname, port, converter,storageLevel):
raise NotImplementedError

def start(self):
self._jssc.start()

def stop(self, stopSparkContext=True):
raise NotImplementedError

def textFileStream(self, directory):
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def transform(self, seq):
raise NotImplementedError

def union(self, seq):
raise NotImplementedError

Loading

0 comments on commit 454981d

Please sign in to comment.