Skip to content

Commit

Permalink
[Python]replace ray.streaming by raystreaming (#20)
Browse files Browse the repository at this point in the history
* replace ray.streaming by raystreaming

* streaming cython building with ray lib

* convert transfer modudle name to raystreaming

* delete inner setup.py

* Update streaming/python/raystreaming/operator.py

Co-authored-by: Qing Wang <kingchin1218@126.com>

* Update streaming/python/raystreaming/partition.py

Co-authored-by: Qing Wang <kingchin1218@126.com>

Co-authored-by: ashione <lingxuan.zlx@antgroup.com>
Co-authored-by: Qing Wang <kingchin1218@126.com>
  • Loading branch information
3 people committed Feb 18, 2022
1 parent e5dbf03 commit 84a19c8
Show file tree
Hide file tree
Showing 25 changed files with 106 additions and 1,548 deletions.
1 change: 1 addition & 0 deletions streaming/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ pyx_library(
name = "_streaming",
srcs = glob([
"python/raystreaming/_streaming.pyx",
"python/raystreaming/__init__.py",
"python/raystreaming/__init__.pxd",
"python/raystreaming/includes/*.pxd",
"python/raystreaming/includes/*.pxi",
Expand Down
2 changes: 1 addition & 1 deletion streaming/python/raystreaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flake8: noqa
# Ray should be imported before streaming
import ray
from ray.streaming.context import StreamingContext
from raystreaming.context import StreamingContext

__all__ = ['StreamingContext']
10 changes: 5 additions & 5 deletions streaming/python/raystreaming/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from ray import Language
from ray.actor import ActorHandle
from ray.streaming import function
from ray.streaming import message
from ray.streaming import partition
from ray.streaming.runtime import serialization
from ray.streaming.runtime.transfer import ChannelID, DataWriter
from raystreaming import function
from raystreaming import message
from raystreaming import partition
from raystreaming.runtime import serialization
from raystreaming.runtime.transfer import ChannelID, DataWriter

logger = logging.getLogger(__name__)

Expand Down
12 changes: 6 additions & 6 deletions streaming/python/raystreaming/context.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from abc import ABC, abstractmethod

from ray.streaming.datastream import StreamSource
from ray.streaming.function import LocalFileSourceFunction
from ray.streaming.function import CollectionSourceFunction
from ray.streaming.function import SourceFunction
from ray.streaming.runtime.gateway_client import GatewayClient
from raystreaming.datastream import StreamSource
from raystreaming.function import LocalFileSourceFunction
from raystreaming.function import CollectionSourceFunction
from raystreaming.function import SourceFunction
from raystreaming.runtime.gateway_client import GatewayClient


class StreamingContext:
"""
Main entry point for ray streaming functionality.
A StreamingContext is also a wrapper of java
`io.ray.streaming.api.context.StreamingContext`
`raystreaming.api.context.StreamingContext`
"""

class Builder:
Expand Down
52 changes: 26 additions & 26 deletions streaming/python/raystreaming/datastream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod

from ray.streaming import function
from ray.streaming import partition
from raystreaming import function
from raystreaming import partition


class Stream(ABC):
Expand Down Expand Up @@ -114,7 +114,7 @@ class DataStream(Stream):
"""
Represents a stream of data which applies a transformation executed by
python. It's also a wrapper of java
`io.ray.streaming.python.stream.PythonDataStream`
`io.raystreaming.python.stream.PythonDataStream`
"""

def __init__(self, input_stream, j_stream, streaming_context=None):
Expand All @@ -127,7 +127,7 @@ def get_language(self):
def map(self, func):
"""
Applies a Map transformation on a :class:`DataStream`.
The transformation calls a :class:`ray.streaming.function.MapFunction`
The transformation calls a :class:`raystreaming.function.MapFunction`
for each element of the DataStream.
Args:
Expand All @@ -149,7 +149,7 @@ def map(self, func):
def flat_map(self, func):
"""
Applies a FlatMap transformation on a :class:`DataStream`. The
transformation calls a :class:`ray.streaming.function.FlatMapFunction`
transformation calls a :class:`raystreaming.function.FlatMapFunction`
for each element of the DataStream.
Each FlatMapFunction call can return any number of elements including
none.
Expand All @@ -173,7 +173,7 @@ def flat_map(self, func):
def filter(self, func):
"""
Applies a Filter transformation on a :class:`DataStream`. The
transformation calls a :class:`ray.streaming.function.FilterFunction`
transformation calls a :class:`raystreaming.function.FilterFunction`
for each element of the DataStream.
DataStream and retains only those element for which the function
returns True.
Expand Down Expand Up @@ -310,7 +310,7 @@ class JavaDataStream(Stream):
"""
Represents a stream of data which applies a transformation executed by
java. It's also a wrapper of java
`io.ray.streaming.api.stream.DataStream`
`io.raystreaming.api.stream.DataStream`
"""

def __init__(self, input_stream, j_stream, streaming_context=None):
Expand All @@ -321,46 +321,46 @@ def get_language(self):
return function.Language.JAVA

def map(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.map"""
"""See io.raystreaming.api.stream.DataStream.map"""
return JavaDataStream(self, self._unary_call("map", java_func_class))

def flat_map(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.flatMap"""
"""See io.raystreaming.api.stream.DataStream.flatMap"""
return JavaDataStream(self, self._unary_call("flatMap",
java_func_class))

def filter(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.filter"""
"""See io.raystreaming.api.stream.DataStream.filter"""
return JavaDataStream(self, self._unary_call("filter",
java_func_class))

def union(self, *streams):
"""See io.ray.streaming.api.stream.DataStream.union"""
"""See io.raystreaming.api.stream.DataStream.union"""
assert len(streams) >= 1, "Need at least one stream to union with"
j_streams = [s._j_stream for s in streams]
j_stream = self._gateway_client().union(self._j_stream, *j_streams)
return JavaUnionStream(self, j_stream)

def key_by(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.keyBy"""
"""See io.raystreaming.api.stream.DataStream.keyBy"""
self._check_partition_call()
return JavaKeyDataStream(self,
self._unary_call("keyBy", java_func_class))

def broadcast(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.broadcast"""
"""See io.raystreaming.api.stream.DataStream.broadcast"""
self._check_partition_call()
return JavaDataStream(self,
self._unary_call("broadcast", java_func_class))

def partition_by(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.partitionBy"""
"""See io.raystreaming.api.stream.DataStream.partitionBy"""
self._check_partition_call()
return JavaDataStream(self,
self._unary_call("partitionBy", java_func_class))

def sink(self, java_func_class):
"""See io.ray.streaming.api.stream.DataStream.sink"""
"""See io.raystreaming.api.stream.DataStream.sink"""
return JavaStreamSink(self, self._unary_call("sink", java_func_class))

def as_python_stream(self):
Expand Down Expand Up @@ -393,7 +393,7 @@ def _unary_call(self, func_name, java_func_class):

class KeyDataStream(DataStream):
"""Represents a DataStream returned by a key-by operation.
Wrapper of java io.ray.streaming.python.stream.PythonKeyDataStream
Wrapper of java io.raystreaming.python.stream.PythonKeyDataStream
"""

def __init__(self, input_stream, j_stream):
Expand All @@ -403,7 +403,7 @@ def reduce(self, func):
"""
Applies a reduce transformation on the grouped data stream grouped on
by the given key function.
The :class:`ray.streaming.function.ReduceFunction` will receive input
The :class:`raystreaming.function.ReduceFunction` will receive input
values based on the key value. Only input values with the same key will
go to the same reducer.
Expand Down Expand Up @@ -439,14 +439,14 @@ def as_java_stream(self):
class JavaKeyDataStream(JavaDataStream):
"""
Represents a DataStream returned by a key-by operation in java.
Wrapper of io.ray.streaming.api.stream.KeyDataStream
Wrapper of io.raystreaming.api.stream.KeyDataStream
"""

def __init__(self, input_stream, j_stream):
super().__init__(input_stream, j_stream)

def reduce(self, java_func_class):
"""See io.ray.streaming.api.stream.KeyDataStream.reduce"""
"""See io.raystreaming.api.stream.KeyDataStream.reduce"""
return JavaDataStream(self,
super()._unary_call("reduce", java_func_class))

Expand All @@ -464,7 +464,7 @@ def as_python_stream(self):

class UnionStream(DataStream):
"""Represents a union stream.
Wrapper of java io.ray.streaming.python.stream.PythonUnionStream
Wrapper of java io.raystreaming.python.stream.PythonUnionStream
"""

def __init__(self, input_stream, j_stream):
Expand All @@ -476,7 +476,7 @@ def get_language(self):

class JavaUnionStream(JavaDataStream):
"""Represents a java union stream.
Wrapper of java io.ray.streaming.api.stream.UnionStream
Wrapper of java io.raystreaming.api.stream.UnionStream
"""

def __init__(self, input_stream, j_stream):
Expand All @@ -488,7 +488,7 @@ def get_language(self):

class StreamSource(DataStream):
"""Represents a source of the DataStream.
Wrapper of java io.ray.streaming.python.stream.PythonStreamSource
Wrapper of java io.raystreaming.python.stream.PythonStreamSource
"""

def __init__(self, j_stream, streaming_context, source_func):
Expand All @@ -514,7 +514,7 @@ def build_source(streaming_context, func):

class JavaStreamSource(JavaDataStream):
"""Represents a source of the java DataStream.
Wrapper of java io.ray.streaming.api.stream.DataStreamSource
Wrapper of java io.raystreaming.api.stream.DataStreamSource
"""

def __init__(self, j_stream, streaming_context):
Expand All @@ -535,14 +535,14 @@ def build_source(streaming_context, java_source_func_class):
j_func = streaming_context._gateway_client() \
.new_instance(java_source_func_class)
j_stream = streaming_context._gateway_client() \
.call_function("io.ray.streaming.api.stream.DataStreamSource"
.call_function("io.raystreaming.api.stream.DataStreamSource"
"fromSource", streaming_context._j_ctx, j_func)
return JavaStreamSource(j_stream, streaming_context)


class StreamSink(Stream):
"""Represents a sink of the DataStream.
Wrapper of java io.ray.streaming.python.stream.PythonStreamSink
Wrapper of java io.raystreaming.python.stream.PythonStreamSink
"""

def __init__(self, input_stream, j_stream, func):
Expand All @@ -554,7 +554,7 @@ def get_language(self):

class JavaStreamSink(Stream):
"""Represents a sink of the java DataStream.
Wrapper of java io.ray.streaming.api.stream.StreamSink
Wrapper of java io.raystreaming.api.stream.StreamSink
"""

def __init__(self, input_stream, j_stream):
Expand Down
4 changes: 2 additions & 2 deletions streaming/python/raystreaming/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC, abstractmethod

from ray import cloudpickle
from ray.streaming.runtime import gateway_client
from raystreaming.runtime import gateway_client


class Language(enum.Enum):
Expand Down Expand Up @@ -307,7 +307,7 @@ def load_function(descriptor_func_bytes: bytes):
Deserialize `descriptor_func_bytes` to get function info, then
get or load streaming function.
Note that this function must be kept in sync with
`io.ray.streaming.runtime.python.GraphPbBuilder.serializeFunction`
`io.raystreaming.runtime.python.GraphPbBuilder.serializeFunction`
Args:
descriptor_func_bytes: serialized function info
Expand Down
Loading

0 comments on commit 84a19c8

Please sign in to comment.