diff --git a/streaming/BUILD.bazel b/streaming/BUILD.bazel index ce7e90fe..e8181c45 100644 --- a/streaming/BUILD.bazel +++ b/streaming/BUILD.bazel @@ -468,6 +468,7 @@ pyx_library( name = "_streaming", srcs = glob([ "python/raystreaming/_streaming.pyx", + "python/raystreaming/__init__.py", "python/raystreaming/__init__.pxd", "python/raystreaming/includes/*.pxd", "python/raystreaming/includes/*.pxi", diff --git a/streaming/python/raystreaming/__init__.py b/streaming/python/raystreaming/__init__.py index 2eb090c6..ec05c61f 100644 --- a/streaming/python/raystreaming/__init__.py +++ b/streaming/python/raystreaming/__init__.py @@ -1,6 +1,6 @@ # flake8: noqa # Ray should be imported before streaming import ray -from ray.streaming.context import StreamingContext +from raystreaming.context import StreamingContext __all__ = ['StreamingContext'] diff --git a/streaming/python/raystreaming/collector.py b/streaming/python/raystreaming/collector.py index 1760900f..e1b60480 100644 --- a/streaming/python/raystreaming/collector.py +++ b/streaming/python/raystreaming/collector.py @@ -4,11 +4,11 @@ from ray import Language from ray.actor import ActorHandle -from ray.streaming import function -from ray.streaming import message -from ray.streaming import partition -from ray.streaming.runtime import serialization -from ray.streaming.runtime.transfer import ChannelID, DataWriter +from raystreaming import function +from raystreaming import message +from raystreaming import partition +from raystreaming.runtime import serialization +from raystreaming.runtime.transfer import ChannelID, DataWriter logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/context.py b/streaming/python/raystreaming/context.py index 074f8252..aabaeed7 100644 --- a/streaming/python/raystreaming/context.py +++ b/streaming/python/raystreaming/context.py @@ -1,17 +1,17 @@ from abc import ABC, abstractmethod -from ray.streaming.datastream import StreamSource -from ray.streaming.function import LocalFileSourceFunction -from ray.streaming.function import CollectionSourceFunction -from ray.streaming.function import SourceFunction -from ray.streaming.runtime.gateway_client import GatewayClient +from raystreaming.datastream import StreamSource +from raystreaming.function import LocalFileSourceFunction +from raystreaming.function import CollectionSourceFunction +from raystreaming.function import SourceFunction +from raystreaming.runtime.gateway_client import GatewayClient class StreamingContext: """ Main entry point for ray streaming functionality. A StreamingContext is also a wrapper of java - `io.ray.streaming.api.context.StreamingContext` + `raystreaming.api.context.StreamingContext` """ class Builder: diff --git a/streaming/python/raystreaming/datastream.py b/streaming/python/raystreaming/datastream.py index 91193be2..e7f1e718 100644 --- a/streaming/python/raystreaming/datastream.py +++ b/streaming/python/raystreaming/datastream.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod -from ray.streaming import function -from ray.streaming import partition +from raystreaming import function +from raystreaming import partition class Stream(ABC): @@ -114,7 +114,7 @@ class DataStream(Stream): """ Represents a stream of data which applies a transformation executed by python. It's also a wrapper of java - `io.ray.streaming.python.stream.PythonDataStream` + `io.raystreaming.python.stream.PythonDataStream` """ def __init__(self, input_stream, j_stream, streaming_context=None): @@ -127,7 +127,7 @@ def get_language(self): def map(self, func): """ Applies a Map transformation on a :class:`DataStream`. - The transformation calls a :class:`ray.streaming.function.MapFunction` + The transformation calls a :class:`raystreaming.function.MapFunction` for each element of the DataStream. Args: @@ -149,7 +149,7 @@ def map(self, func): def flat_map(self, func): """ Applies a FlatMap transformation on a :class:`DataStream`. The - transformation calls a :class:`ray.streaming.function.FlatMapFunction` + transformation calls a :class:`raystreaming.function.FlatMapFunction` for each element of the DataStream. Each FlatMapFunction call can return any number of elements including none. @@ -173,7 +173,7 @@ def flat_map(self, func): def filter(self, func): """ Applies a Filter transformation on a :class:`DataStream`. The - transformation calls a :class:`ray.streaming.function.FilterFunction` + transformation calls a :class:`raystreaming.function.FilterFunction` for each element of the DataStream. DataStream and retains only those element for which the function returns True. @@ -310,7 +310,7 @@ class JavaDataStream(Stream): """ Represents a stream of data which applies a transformation executed by java. It's also a wrapper of java - `io.ray.streaming.api.stream.DataStream` + `io.raystreaming.api.stream.DataStream` """ def __init__(self, input_stream, j_stream, streaming_context=None): @@ -321,46 +321,46 @@ def get_language(self): return function.Language.JAVA def map(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.map""" + """See io.raystreaming.api.stream.DataStream.map""" return JavaDataStream(self, self._unary_call("map", java_func_class)) def flat_map(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.flatMap""" + """See io.raystreaming.api.stream.DataStream.flatMap""" return JavaDataStream(self, self._unary_call("flatMap", java_func_class)) def filter(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.filter""" + """See io.raystreaming.api.stream.DataStream.filter""" return JavaDataStream(self, self._unary_call("filter", java_func_class)) def union(self, *streams): - """See io.ray.streaming.api.stream.DataStream.union""" + """See io.raystreaming.api.stream.DataStream.union""" assert len(streams) >= 1, "Need at least one stream to union with" j_streams = [s._j_stream for s in streams] j_stream = self._gateway_client().union(self._j_stream, *j_streams) return JavaUnionStream(self, j_stream) def key_by(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.keyBy""" + """See io.raystreaming.api.stream.DataStream.keyBy""" self._check_partition_call() return JavaKeyDataStream(self, self._unary_call("keyBy", java_func_class)) def broadcast(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.broadcast""" + """See io.raystreaming.api.stream.DataStream.broadcast""" self._check_partition_call() return JavaDataStream(self, self._unary_call("broadcast", java_func_class)) def partition_by(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.partitionBy""" + """See io.raystreaming.api.stream.DataStream.partitionBy""" self._check_partition_call() return JavaDataStream(self, self._unary_call("partitionBy", java_func_class)) def sink(self, java_func_class): - """See io.ray.streaming.api.stream.DataStream.sink""" + """See io.raystreaming.api.stream.DataStream.sink""" return JavaStreamSink(self, self._unary_call("sink", java_func_class)) def as_python_stream(self): @@ -393,7 +393,7 @@ def _unary_call(self, func_name, java_func_class): class KeyDataStream(DataStream): """Represents a DataStream returned by a key-by operation. - Wrapper of java io.ray.streaming.python.stream.PythonKeyDataStream + Wrapper of java io.raystreaming.python.stream.PythonKeyDataStream """ def __init__(self, input_stream, j_stream): @@ -403,7 +403,7 @@ def reduce(self, func): """ Applies a reduce transformation on the grouped data stream grouped on by the given key function. - The :class:`ray.streaming.function.ReduceFunction` will receive input + The :class:`raystreaming.function.ReduceFunction` will receive input values based on the key value. Only input values with the same key will go to the same reducer. @@ -439,14 +439,14 @@ def as_java_stream(self): class JavaKeyDataStream(JavaDataStream): """ Represents a DataStream returned by a key-by operation in java. - Wrapper of io.ray.streaming.api.stream.KeyDataStream + Wrapper of io.raystreaming.api.stream.KeyDataStream """ def __init__(self, input_stream, j_stream): super().__init__(input_stream, j_stream) def reduce(self, java_func_class): - """See io.ray.streaming.api.stream.KeyDataStream.reduce""" + """See io.raystreaming.api.stream.KeyDataStream.reduce""" return JavaDataStream(self, super()._unary_call("reduce", java_func_class)) @@ -464,7 +464,7 @@ def as_python_stream(self): class UnionStream(DataStream): """Represents a union stream. - Wrapper of java io.ray.streaming.python.stream.PythonUnionStream + Wrapper of java io.raystreaming.python.stream.PythonUnionStream """ def __init__(self, input_stream, j_stream): @@ -476,7 +476,7 @@ def get_language(self): class JavaUnionStream(JavaDataStream): """Represents a java union stream. - Wrapper of java io.ray.streaming.api.stream.UnionStream + Wrapper of java io.raystreaming.api.stream.UnionStream """ def __init__(self, input_stream, j_stream): @@ -488,7 +488,7 @@ def get_language(self): class StreamSource(DataStream): """Represents a source of the DataStream. - Wrapper of java io.ray.streaming.python.stream.PythonStreamSource + Wrapper of java io.raystreaming.python.stream.PythonStreamSource """ def __init__(self, j_stream, streaming_context, source_func): @@ -514,7 +514,7 @@ def build_source(streaming_context, func): class JavaStreamSource(JavaDataStream): """Represents a source of the java DataStream. - Wrapper of java io.ray.streaming.api.stream.DataStreamSource + Wrapper of java io.raystreaming.api.stream.DataStreamSource """ def __init__(self, j_stream, streaming_context): @@ -535,14 +535,14 @@ def build_source(streaming_context, java_source_func_class): j_func = streaming_context._gateway_client() \ .new_instance(java_source_func_class) j_stream = streaming_context._gateway_client() \ - .call_function("io.ray.streaming.api.stream.DataStreamSource" + .call_function("io.raystreaming.api.stream.DataStreamSource" "fromSource", streaming_context._j_ctx, j_func) return JavaStreamSource(j_stream, streaming_context) class StreamSink(Stream): """Represents a sink of the DataStream. - Wrapper of java io.ray.streaming.python.stream.PythonStreamSink + Wrapper of java io.raystreaming.python.stream.PythonStreamSink """ def __init__(self, input_stream, j_stream, func): @@ -554,7 +554,7 @@ def get_language(self): class JavaStreamSink(Stream): """Represents a sink of the java DataStream. - Wrapper of java io.ray.streaming.api.stream.StreamSink + Wrapper of java io.raystreaming.api.stream.StreamSink """ def __init__(self, input_stream, j_stream): diff --git a/streaming/python/raystreaming/function.py b/streaming/python/raystreaming/function.py index e8a26feb..41c3c3af 100644 --- a/streaming/python/raystreaming/function.py +++ b/streaming/python/raystreaming/function.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from ray import cloudpickle -from ray.streaming.runtime import gateway_client +from raystreaming.runtime import gateway_client class Language(enum.Enum): @@ -307,7 +307,7 @@ def load_function(descriptor_func_bytes: bytes): Deserialize `descriptor_func_bytes` to get function info, then get or load streaming function. Note that this function must be kept in sync with - `io.ray.streaming.runtime.python.GraphPbBuilder.serializeFunction` + `io.raystreaming.runtime.python.GraphPbBuilder.serializeFunction` Args: descriptor_func_bytes: serialized function info diff --git a/streaming/python/raystreaming/includes/common.pxd b/streaming/python/raystreaming/includes/common.pxd deleted file mode 100644 index 7e95bdc9..00000000 --- a/streaming/python/raystreaming/includes/common.pxd +++ /dev/null @@ -1,317 +0,0 @@ -from libcpp cimport bool as c_bool -from libcpp.memory cimport shared_ptr, unique_ptr -from libcpp.string cimport string as c_string - -from libc.stdint cimport uint8_t, int32_t, uint64_t, int64_t, uint32_t -from libcpp.unordered_map cimport unordered_map -from libcpp.vector cimport vector as c_vector -from libcpp.pair cimport pair as c_pair - -from raystreaming.includes.unique_ids cimport ( - CActorID, - CJobID, - CWorkerID, - CObjectID, - CTaskID, - CPlacementGroupID, - CNodeID, -) -from raystreaming.includes.function_descriptor cimport ( - CFunctionDescriptor, -) - - -cdef extern from * namespace "polyfill" nogil: - """ - namespace polyfill { - - template - inline typename std::remove_reference::type&& move(T& t) { - return std::move(t); - } - - template - inline typename std::remove_reference::type&& move(T&& t) { - return std::move(t); - } - - } // namespace polyfill - """ - cdef T move[T](T) - - -cdef extern from "ray/common/status.h" namespace "ray" nogil: - cdef cppclass StatusCode: - pass - - cdef cppclass CRayStatus "ray::Status": - RayStatus() - RayStatus(StatusCode code, const c_string &msg) - RayStatus(const CRayStatus &s) - - @staticmethod - CRayStatus OK() - - @staticmethod - CRayStatus OutOfMemory(const c_string &msg) - - @staticmethod - CRayStatus KeyError(const c_string &msg) - - @staticmethod - CRayStatus Invalid(const c_string &msg) - - @staticmethod - CRayStatus IOError(const c_string &msg) - - @staticmethod - CRayStatus TypeError(const c_string &msg) - - @staticmethod - CRayStatus UnknownError(const c_string &msg) - - @staticmethod - CRayStatus NotImplemented(const c_string &msg) - - @staticmethod - CRayStatus ObjectStoreFull(const c_string &msg) - - @staticmethod - CRayStatus RedisError(const c_string &msg) - - @staticmethod - CRayStatus TimedOut(const c_string &msg) - - @staticmethod - CRayStatus Interrupted(const c_string &msg) - - @staticmethod - CRayStatus IntentionalSystemExit() - - @staticmethod - CRayStatus UnexpectedSystemExit() - - @staticmethod - CRayStatus CreationTaskError() - - @staticmethod - CRayStatus NotFound() - - c_bool ok() - c_bool IsOutOfMemory() - c_bool IsKeyError() - c_bool IsInvalid() - c_bool IsIOError() - c_bool IsTypeError() - c_bool IsUnknownError() - c_bool IsNotImplemented() - c_bool IsObjectStoreFull() - c_bool IsRedisError() - c_bool IsTimedOut() - c_bool IsInterrupted() - c_bool ShouldExitWorker() - c_bool IsNotFound() - - c_string ToString() - c_string CodeAsString() - StatusCode code() - c_string message() - - # We can later add more of the common status factory methods as needed - cdef CRayStatus RayStatus_OK "Status::OK"() - cdef CRayStatus RayStatus_Invalid "Status::Invalid"() - cdef CRayStatus RayStatus_NotImplemented "Status::NotImplemented"() - - -cdef extern from "ray/common/status.h" namespace "ray::StatusCode" nogil: - cdef StatusCode StatusCode_OK "OK" - cdef StatusCode StatusCode_OutOfMemory "OutOfMemory" - cdef StatusCode StatusCode_KeyError "KeyError" - cdef StatusCode StatusCode_TypeError "TypeError" - cdef StatusCode StatusCode_Invalid "Invalid" - cdef StatusCode StatusCode_IOError "IOError" - cdef StatusCode StatusCode_UnknownError "UnknownError" - cdef StatusCode StatusCode_NotImplemented "NotImplemented" - cdef StatusCode StatusCode_RedisError "RedisError" - - -cdef extern from "ray/common/id.h" namespace "ray" nogil: - const CTaskID GenerateTaskId(const CJobID &job_id, - const CTaskID &parent_task_id, - int parent_task_counter) - - -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef cppclass CLanguage "Language": - pass - cdef cppclass CWorkerType "ray::core::WorkerType": - pass - cdef cppclass CTaskType "ray::TaskType": - pass - cdef cppclass CPlacementStrategy "ray::core::PlacementStrategy": - pass - cdef cppclass CDefaultSchedulingStrategy "ray::rpc::DefaultSchedulingStrategy": # noqa: E501 - CDefaultSchedulingStrategy() - cdef cppclass CSpreadSchedulingStrategy "ray::rpc::SpreadSchedulingStrategy": # noqa: E501 - CSpreadSchedulingStrategy() - cdef cppclass CPlacementGroupSchedulingStrategy "ray::rpc::PlacementGroupSchedulingStrategy": # noqa: E501 - CPlacementGroupSchedulingStrategy() - void set_placement_group_id(const c_string& placement_group_id) - void set_placement_group_bundle_index(int64_t placement_group_bundle_index) # noqa: E501 - void set_placement_group_capture_child_tasks(c_bool placement_group_capture_child_tasks) # noqa: E501 - cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy": - CSchedulingStrategy() - void clear_scheduling_strategy() - CSpreadSchedulingStrategy* mutable_spread_scheduling_strategy() - CDefaultSchedulingStrategy* mutable_default_scheduling_strategy() - CPlacementGroupSchedulingStrategy* mutable_placement_group_scheduling_strategy() # noqa: E501 - cdef cppclass CAddress "ray::rpc::Address": - CAddress() - const c_string &SerializeAsString() const - void ParseFromString(const c_string &serialized) - void CopyFrom(const CAddress& address) - const c_string &worker_id() - cdef cppclass CObjectReference "ray::rpc::ObjectReference": - CObjectReference() - CAddress owner_address() const - const c_string &object_id() const - const c_string &call_site() const - -# This is a workaround for C++ enum class since Cython has no corresponding -# representation. -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef CLanguage LANGUAGE_PYTHON "Language::PYTHON" - cdef CLanguage LANGUAGE_CPP "Language::CPP" - cdef CLanguage LANGUAGE_JAVA "Language::JAVA" - -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef CWorkerType WORKER_TYPE_WORKER "ray::core::WorkerType::WORKER" - cdef CWorkerType WORKER_TYPE_DRIVER "ray::core::WorkerType::DRIVER" - cdef CWorkerType WORKER_TYPE_SPILL_WORKER "ray::core::WorkerType::SPILL_WORKER" # noqa: E501 - cdef CWorkerType WORKER_TYPE_RESTORE_WORKER "ray::core::WorkerType::RESTORE_WORKER" # noqa: E501 - cdef CWorkerType WORKER_TYPE_UTIL_WORKER "ray::core::WorkerType::UTIL_WORKER" # noqa: E501 - -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef CTaskType TASK_TYPE_NORMAL_TASK "ray::TaskType::NORMAL_TASK" - cdef CTaskType TASK_TYPE_ACTOR_CREATION_TASK "ray::TaskType::ACTOR_CREATION_TASK" # noqa: E501 - cdef CTaskType TASK_TYPE_ACTOR_TASK "ray::TaskType::ACTOR_TASK" - -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef CPlacementStrategy PLACEMENT_STRATEGY_PACK \ - "ray::core::PlacementStrategy::PACK" - cdef CPlacementStrategy PLACEMENT_STRATEGY_SPREAD \ - "ray::core::PlacementStrategy::SPREAD" - cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \ - "ray::core::PlacementStrategy::STRICT_PACK" - cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_SPREAD \ - "ray::core::PlacementStrategy::STRICT_SPREAD" - -cdef extern from "ray/common/buffer.h" namespace "ray" nogil: - cdef cppclass CBuffer "ray::Buffer": - uint8_t *Data() const - size_t Size() const - - cdef cppclass LocalMemoryBuffer(CBuffer): - LocalMemoryBuffer(uint8_t *data, size_t size, c_bool copy_data) - LocalMemoryBuffer(size_t size) - -cdef extern from "ray/common/ray_object.h" nogil: - cdef cppclass CRayObject "ray::RayObject": - CRayObject(const shared_ptr[CBuffer] &data, - const shared_ptr[CBuffer] &metadata, - const c_vector[CObjectReference] &nested_refs) - c_bool HasData() const - c_bool HasMetadata() const - const size_t DataSize() const - const shared_ptr[CBuffer] &GetData() - const shared_ptr[CBuffer] &GetMetadata() const - c_bool IsInPlasmaError() const - -cdef extern from "ray/core_worker/common.h" nogil: - cdef cppclass CRayFunction "ray::core::RayFunction": - CRayFunction() - CRayFunction(CLanguage language, - const CFunctionDescriptor &function_descriptor) - CLanguage GetLanguage() - const CFunctionDescriptor GetFunctionDescriptor() - - cdef cppclass CTaskArg "ray::TaskArg": - pass - - cdef cppclass CTaskArgByReference "ray::TaskArgByReference": - CTaskArgByReference(const CObjectID &object_id, - const CAddress &owner_address, - const c_string &call_site) - - cdef cppclass CTaskArgByValue "ray::TaskArgByValue": - CTaskArgByValue(const shared_ptr[CRayObject] &data) - - cdef cppclass CTaskOptions "ray::core::TaskOptions": - CTaskOptions() - CTaskOptions(c_string name, int num_returns, - unordered_map[c_string, double] &resources, - c_string concurrency_group_name) - CTaskOptions(c_string name, int num_returns, - unordered_map[c_string, double] &resources, - c_string concurrency_group_name, - c_string serialized_runtime_env) - - cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions": - CActorCreationOptions() - CActorCreationOptions( - int64_t max_restarts, - int64_t max_task_retries, - int32_t max_concurrency, - const unordered_map[c_string, double] &resources, - const unordered_map[c_string, double] &placement_resources, - const c_vector[c_string] &dynamic_worker_options, - c_bool is_detached, c_string &name, c_string &ray_namespace, - c_bool is_asyncio, - const CSchedulingStrategy &scheduling_strategy, - c_string serialized_runtime_env, - const c_vector[CConcurrencyGroup] &concurrency_groups, - c_bool execute_out_of_order, - int32_t max_pending_calls) - - cdef cppclass CPlacementGroupCreationOptions \ - "ray::core::PlacementGroupCreationOptions": - CPlacementGroupCreationOptions() - CPlacementGroupCreationOptions( - const c_string &name, - CPlacementStrategy strategy, - const c_vector[unordered_map[c_string, double]] &bundles, - c_bool is_detached - ) - - cdef cppclass CObjectLocation "ray::core::ObjectLocation": - const CNodeID &GetPrimaryNodeID() const - const uint64_t GetObjectSize() const - const c_vector[CNodeID] &GetNodeIDs() const - c_bool IsSpilled() const - const c_string &GetSpilledURL() const - const CNodeID &GetSpilledNodeID() const - -cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: - cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": - CGcsClientOptions(const c_string &ip, int port, - const c_string &password, - c_bool enable_sync_conn, - c_bool enable_async_conn, - c_bool enable_subscribe_conn) - CGcsClientOptions(const c_string &gcs_address) - -cdef extern from "src/ray/protobuf/gcs.pb.h" nogil: - cdef cppclass CJobConfig "ray::rpc::JobConfig": - const c_string &SerializeAsString() - -cdef extern from "ray/common/task/task_spec.h" nogil: - cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": - CConcurrencyGroup( - const c_string &name, - uint32_t max_concurrency, - const c_vector[CFunctionDescriptor] &c_fds) - CConcurrencyGroup() - c_string GetName() const - uint32_t GetMaxConcurrency() const - c_vector[CFunctionDescriptor] GetFunctionDescriptors() const - diff --git a/streaming/python/raystreaming/includes/common.pxi b/streaming/python/raystreaming/includes/common.pxi deleted file mode 100644 index 0e247b4f..00000000 --- a/streaming/python/raystreaming/includes/common.pxi +++ /dev/null @@ -1,45 +0,0 @@ -from libcpp cimport bool as c_bool -from libcpp.string cimport string as c_string -from libcpp.vector cimport vector as c_vector - -from raystreaming.includes.common cimport ( - CObjectLocation, - CGcsClientOptions, -) - - -cdef class GcsClientOptions: - """Cython wrapper class of C++ `ray::gcs::GcsClientOptions`.""" - cdef: - unique_ptr[CGcsClientOptions] inner - - @classmethod - def from_redis_address( - cls, redis_address, - redis_password, - c_bool enable_sync_conn=True, - c_bool enable_async_conn=True, - c_bool enable_subscribe_conn=True): - if not redis_password: - redis_password = "" - redis_ip, redis_port = redis_address.split(":") - self = GcsClientOptions() - self.inner.reset( - new CGcsClientOptions(redis_ip.encode("ascii"), - int(redis_port), - redis_password.encode("ascii"), - enable_sync_conn, - enable_async_conn, - enable_subscribe_conn)) - return self - - @classmethod - def from_gcs_address(cls, gcs_address): - self = GcsClientOptions() - self.inner.reset( - new CGcsClientOptions(gcs_address.encode("ascii"))) - return self - - cdef CGcsClientOptions* native(self): - return (self.inner.get()) - diff --git a/streaming/python/raystreaming/includes/function_descriptor.pxd b/streaming/python/raystreaming/includes/function_descriptor.pxd deleted file mode 100644 index 7710c5d8..00000000 --- a/streaming/python/raystreaming/includes/function_descriptor.pxd +++ /dev/null @@ -1,69 +0,0 @@ -from libc.stdint cimport uint8_t, uint64_t -from libcpp cimport bool as c_bool -from libcpp.memory cimport unique_ptr, shared_ptr -from libcpp.string cimport string as c_string -from libcpp.unordered_map cimport unordered_map -from libcpp.vector cimport vector as c_vector - -from raystreaming.includes.common cimport ( - CLanguage, -) -from raystreaming.includes.unique_ids cimport ( - CActorID, - CJobID, - CObjectID, - CTaskID, -) - -cdef extern from "src/ray/protobuf/common.pb.h" nogil: - cdef cppclass CFunctionDescriptorType \ - "ray::FunctionDescriptorType": - pass - - cdef CFunctionDescriptorType EmptyFunctionDescriptorType \ - "ray::FunctionDescriptorType::FUNCTION_DESCRIPTOR_NOT_SET" - cdef CFunctionDescriptorType JavaFunctionDescriptorType \ - "ray::FunctionDescriptorType::kJavaFunctionDescriptor" - cdef CFunctionDescriptorType PythonFunctionDescriptorType \ - "ray::FunctionDescriptorType::kPythonFunctionDescriptor" - - -cdef extern from "ray/common/function_descriptor.h" nogil: - cdef cppclass CFunctionDescriptorInterface \ - "ray::CFunctionDescriptorInterface": - CFunctionDescriptorType Type() - c_string ToString() - c_string Serialize() - - ctypedef shared_ptr[CFunctionDescriptorInterface] CFunctionDescriptor \ - "ray::FunctionDescriptor" - - cdef cppclass CFunctionDescriptorBuilder "ray::FunctionDescriptorBuilder": - @staticmethod - CFunctionDescriptor Empty() - - @staticmethod - CFunctionDescriptor BuildJava(const c_string &class_name, - const c_string &function_name, - const c_string &signature) - - @staticmethod - CFunctionDescriptor BuildPython(const c_string &module_name, - const c_string &class_name, - const c_string &function_name, - const c_string &function_source_hash) - - @staticmethod - CFunctionDescriptor Deserialize(const c_string &serialized_binary) - - cdef cppclass CJavaFunctionDescriptor "ray::JavaFunctionDescriptor": - c_string ClassName() - c_string FunctionName() - c_string Signature() - - cdef cppclass CPythonFunctionDescriptor "ray::PythonFunctionDescriptor": - c_string ModuleName() - c_string ClassName() - c_string FunctionName() - c_string FunctionHash() - diff --git a/streaming/python/raystreaming/includes/function_descriptor.pxi b/streaming/python/raystreaming/includes/function_descriptor.pxi deleted file mode 100644 index 042e112b..00000000 --- a/streaming/python/raystreaming/includes/function_descriptor.pxi +++ /dev/null @@ -1,320 +0,0 @@ -from raystreaming.includes.function_descriptor cimport ( - CFunctionDescriptor, - CFunctionDescriptorBuilder, - CPythonFunctionDescriptor, - CJavaFunctionDescriptor, - EmptyFunctionDescriptorType, - JavaFunctionDescriptorType, - PythonFunctionDescriptorType, -) - -import hashlib -import cython -import inspect -import uuid -import ray.ray_constants as ray_constants - - -ctypedef object (*FunctionDescriptor_from_cpp)(const CFunctionDescriptor &) -cdef unordered_map[int, FunctionDescriptor_from_cpp] \ - FunctionDescriptor_constructor_map -cdef CFunctionDescriptorToPython(CFunctionDescriptor function_descriptor): - cdef int function_descriptor_type = function_descriptor.get().Type() - it = FunctionDescriptor_constructor_map.find(function_descriptor_type) - if it == FunctionDescriptor_constructor_map.end(): - raise Exception("Can't construct FunctionDescriptor from type {}" - .format(function_descriptor_type)) - else: - constructor = dereference(it).second - return constructor(function_descriptor) - - -@cython.auto_pickle(False) -cdef class FunctionDescriptor: - def __cinit__(self, *args, **kwargs): - if type(self) == FunctionDescriptor: - raise Exception("type {} is abstract".format(type(self).__name__)) - - def __hash__(self): - return hash(self.descriptor.get().ToString()) - - def __eq__(self, other): - return (type(self) == type(other) and - self.descriptor.get().ToString() == - (other).descriptor.get().ToString()) - - def __repr__(self): - return self.descriptor.get().ToString() - - def to_dict(self): - d = {"type": type(self).__name__} - for k, v in vars(type(self)).items(): - if inspect.isgetsetdescriptor(v): - d[k] = v.__get__(self) - return d - - -FunctionDescriptor_constructor_map[EmptyFunctionDescriptorType] = \ - EmptyFunctionDescriptor.from_cpp - - -@cython.auto_pickle(False) -cdef class EmptyFunctionDescriptor(FunctionDescriptor): - def __cinit__(self): - self.descriptor = CFunctionDescriptorBuilder.Empty() - - def __reduce__(self): - return EmptyFunctionDescriptor, () - - @staticmethod - cdef from_cpp(const CFunctionDescriptor &c_function_descriptor): - return EmptyFunctionDescriptor() - - -FunctionDescriptor_constructor_map[JavaFunctionDescriptorType] = \ - JavaFunctionDescriptor.from_cpp - - -@cython.auto_pickle(False) -cdef class JavaFunctionDescriptor(FunctionDescriptor): - cdef: - CJavaFunctionDescriptor *typed_descriptor - - def __cinit__(self, - class_name, - function_name, - signature): - self.descriptor = CFunctionDescriptorBuilder.BuildJava( - class_name, function_name, signature) - self.typed_descriptor = ( - self.descriptor.get()) - - def __reduce__(self): - return JavaFunctionDescriptor, (self.typed_descriptor.ClassName(), - self.typed_descriptor.FunctionName(), - self.typed_descriptor.Signature()) - - @staticmethod - cdef from_cpp(const CFunctionDescriptor &c_function_descriptor): - cdef CJavaFunctionDescriptor *typed_descriptor = \ - (c_function_descriptor.get()) - return JavaFunctionDescriptor(typed_descriptor.ClassName(), - typed_descriptor.FunctionName(), - typed_descriptor.Signature()) - - @property - def class_name(self): - """Get the class name of current function descriptor. - - Returns: - The class name of the function descriptor. It could be - empty if the function is not a class method. - """ - return self.typed_descriptor.ClassName() - - @property - def function_name(self): - """Get the function name of current function descriptor. - - Returns: - The function name of the function descriptor. - """ - return self.typed_descriptor.FunctionName() - - @property - def signature(self): - """Get the signature of current function descriptor. - - Returns: - The signature of the function descriptor. - """ - return self.typed_descriptor.Signature() - - -FunctionDescriptor_constructor_map[PythonFunctionDescriptorType] = \ - PythonFunctionDescriptor.from_cpp - - -@cython.auto_pickle(False) -cdef class PythonFunctionDescriptor(FunctionDescriptor): - cdef: - CPythonFunctionDescriptor *typed_descriptor - object _function_id - - def __cinit__(self, - module_name, - function_name, - class_name="", - function_source_hash=""): - self.descriptor = CFunctionDescriptorBuilder.BuildPython( - module_name, class_name, function_name, function_source_hash) - self.typed_descriptor = ( - self.descriptor.get()) - - def __reduce__(self): - return PythonFunctionDescriptor, (self.typed_descriptor.ModuleName(), - self.typed_descriptor.FunctionName(), - self.typed_descriptor.ClassName(), - self.typed_descriptor.FunctionHash()) - - @staticmethod - cdef from_cpp(const CFunctionDescriptor &c_function_descriptor): - cdef CPythonFunctionDescriptor *typed_descriptor = \ - (c_function_descriptor.get()) - return PythonFunctionDescriptor(typed_descriptor.ModuleName(), - typed_descriptor.FunctionName(), - typed_descriptor.ClassName(), - typed_descriptor.FunctionHash()) - - @classmethod - def from_function(cls, function, function_uuid): - """Create a FunctionDescriptor from a function instance. - - This function is used to create the function descriptor from - a python function. If a function is a class function, it should - not be used by this function. - - Args: - cls: Current class which is required argument for classmethod. - function: the python function used to create the function - descriptor. - function_uuid: Used to uniquely identify a function. - Ideally we can use the pickled function bytes - but cloudpickle isn't stable in some cases - for the same function. - - Returns: - The FunctionDescriptor instance created according to the function. - """ - module_name = cls._get_module_name(function) - function_name = function.__qualname__ - class_name = "" - - return cls(module_name, function_name, class_name, function_uuid.hex) - - @classmethod - def from_class(cls, target_class): - """Create a FunctionDescriptor from a class. - - Args: - cls: Current class which is required argument for classmethod. - target_class: the python class used to create the function - descriptor. - - Returns: - The FunctionDescriptor instance created according to the class. - """ - module_name = cls._get_module_name(target_class) - class_name = target_class.__qualname__ - # Use a random uuid as function hash to solve actor name conflict. - return cls(module_name, "__init__", class_name, uuid.uuid4().hex) - - @property - def module_name(self): - """Get the module name of current function descriptor. - - Returns: - The module name of the function descriptor. - """ - return self.typed_descriptor.ModuleName() - - @property - def class_name(self): - """Get the class name of current function descriptor. - - Returns: - The class name of the function descriptor. It could be - empty if the function is not a class method. - """ - return self.typed_descriptor.ClassName() - - @property - def function_name(self): - """Get the function name of current function descriptor. - - Returns: - The function name of the function descriptor. - """ - return self.typed_descriptor.FunctionName() - - @property - def function_hash(self): - """Get the hash string of the function source code. - - Returns: - The hex of function hash if the source code is available. - Otherwise, it will be an empty string. - """ - return self.typed_descriptor.FunctionHash() - - @property - def function_id(self): - """Get the function id calculated from this descriptor. - - Returns: - The value of ray.ObjectRef that represents the function id. - """ - if not self._function_id: - self._function_id = self._get_function_id() - return self._function_id - - @property - def repr(self): - """Get the module_name.Optional[class_name].function_name - of the descriptor. - - Returns: - The value of module_name.Optional[class_name].function_name - """ - if self.is_actor_method(): - return ".".join( - [self.module_name, self.class_name, self.function_name]) - else: - return ".".join( - [self.module_name, self.function_name]) - - def _get_function_id(self): - """Calculate the function id of current function descriptor. - - This function id is calculated from all the fields of function - descriptor. - - Returns: - ray.ObjectRef to represent the function descriptor. - """ - function_id_hash = hashlib.shake_128() - # Include the function module and name in the hash. - function_id_hash.update(self.typed_descriptor.ModuleName()) - function_id_hash.update(self.typed_descriptor.FunctionName()) - function_id_hash.update(self.typed_descriptor.ClassName()) - function_id_hash.update(self.typed_descriptor.FunctionHash()) - # Compute the function ID. - function_id = function_id_hash.digest(ray_constants.ID_SIZE) - return ray.FunctionID(function_id) - - @staticmethod - def _get_module_name(object): - """Get the module name from object. If the module is __main__, - get the module name from file. - - Returns: - Module name of object. - """ - module_name = object.__module__ - if module_name == "__main__": - try: - file_path = inspect.getfile(object) - n = inspect.getmodulename(file_path) - if n: - module_name = n - except TypeError: - pass - return module_name - - def is_actor_method(self): - """Wether this function descriptor is an actor method. - - Returns: - True if it's an actor method, False if it's a normal function. - """ - return not self.typed_descriptor.ClassName().empty() diff --git a/streaming/python/raystreaming/includes/libstreaming.pxd b/streaming/python/raystreaming/includes/libstreaming.pxd index 2c8521bc..40bb599f 100644 --- a/streaming/python/raystreaming/includes/libstreaming.pxd +++ b/streaming/python/raystreaming/includes/libstreaming.pxd @@ -8,7 +8,6 @@ from libc.stdint cimport * from libcpp cimport bool as c_bool from libcpp.memory cimport shared_ptr from libcpp.vector cimport vector as c_vector -from libcpp.string cimport string as c_string from libcpp.list cimport list as c_list from cpython cimport PyObject cimport cpython @@ -22,68 +21,20 @@ cdef inline object PyObject_to_object(PyObject* o): cpython.Py_DECREF(result) return result -from raystreaming.includes.common cimport ( +from ray.includes.common cimport ( CLanguage, CRayObject, CRayStatus, - CRayFunction, - CBuffer, + CRayFunction ) -from raystreaming.includes.unique_ids cimport ( +from ray.includes.unique_ids cimport ( CActorID, CJobID, CTaskID, CObjectID, ) -from raystreaming.includes.function_descriptor cimport ( - CFunctionDescriptor, -) - -from raystreaming.includes.function_descriptor import ( - JavaFunctionDescriptor -) - - -cdef class FunctionDescriptor: - cdef: - CFunctionDescriptor descriptor - -cdef class Buffer: - cdef: - shared_ptr[CBuffer] buffer - Py_ssize_t shape - Py_ssize_t strides - - @staticmethod - cdef make(const shared_ptr[CBuffer]& buffer) - -cdef class BaseID: - # To avoid the error of "Python int too large to convert to C ssize_t", - # here `cdef size_t` is required. - cdef size_t hash(self) - - -cdef class ActorID(BaseID): - cdef CActorID data - - cdef CActorID native(self) - - cdef size_t hash(self) - -cdef class ObjectRef(BaseID): - cdef: - CObjectID data - c_string owner_addr - # Flag indicating whether or not this object ref was added to the set - # of active IDs in the core worker so we know whether we should clean - # it up. - c_bool in_core_worker - c_string call_site_data - - cdef CObjectID native(self) - cdef extern from "common/status.h" namespace "ray::streaming" nogil: cdef cppclass CStreamingStatus "ray::streaming::StreamingStatus": pass @@ -247,4 +198,4 @@ cdef extern from "data_writer.h" namespace "ray::streaming" nogil: cdef extern from "ray/common/buffer.h" nogil: cdef cppclass CLocalMemoryBuffer "ray::LocalMemoryBuffer": uint8_t *Data() const - size_t Size() const + size_t Size() const \ No newline at end of file diff --git a/streaming/python/raystreaming/includes/transfer.pxi b/streaming/python/raystreaming/includes/transfer.pxi index ed080481..341cf850 100644 --- a/streaming/python/raystreaming/includes/transfer.pxi +++ b/streaming/python/raystreaming/includes/transfer.pxi @@ -9,27 +9,24 @@ from libcpp.list cimport list as c_list from libcpp.unordered_map cimport unordered_map as c_unordered_map from cython.operator cimport dereference, postincrement -from raystreaming.includes.common cimport ( +from ray.includes.common cimport ( CRayFunction, LANGUAGE_PYTHON, LANGUAGE_JAVA, CBuffer ) -from raystreaming.includes.unique_ids cimport ( +from ray.includes.unique_ids cimport ( CActorID, CObjectID ) - -from raystreaming.includes.libstreaming cimport ( +from ray._raylet cimport ( Buffer, ActorID, ObjectRef, FunctionDescriptor, ) -from raystreaming.includes.function_descriptor import JavaFunctionDescriptor - cimport raystreaming.includes.libstreaming as libstreaming from raystreaming.includes.libstreaming cimport ( CStreamingStatus, @@ -48,6 +45,7 @@ from raystreaming.includes.libstreaming cimport ( CStreamingBarrierHeader, kBarrierHeaderSize, ) +from ray._raylet import JavaFunctionDescriptor import logging @@ -183,7 +181,7 @@ cdef class DataWriter: msg = "initialize writer failed, status={}".format(status) channel_logger.error(msg) del c_writer - import ray.streaming.runtime.transfer as transfer + import raystreaming.runtime.transfer as transfer raise transfer.ChannelInitException(msg, qid_vector_to_list(remain_id_vec)) c_writer.Run() @@ -300,10 +298,10 @@ cdef class DataReader: if status != libstreaming.StatusOK: if status == libstreaming.StatusInterrupted: # avoid cyclic import - import ray.streaming.runtime.transfer as transfer + import raystreaming.runtime.transfer as transfer raise transfer.ChannelInterruptException("reader interrupted") elif status == libstreaming.StatusInitQueueFailed: - import ray.streaming.runtime.transfer as transfer + import raystreaming.runtime.transfer as transfer raise transfer.ChannelInitException("init channel failed") elif status == libstreaming.StatusGetBundleTimeOut: return [] @@ -323,7 +321,7 @@ cdef class DataReader: cdef uint32_t bundle_type = (bundle.get().meta.get().GetBundleType()) # avoid cyclic import - from ray.streaming.runtime.transfer import DataMessage + from raystreaming.runtime.transfer import DataMessage if bundle_type == libstreaming.BundleTypeBundle: msg_nums = bundle.get().meta.get().GetMessageListSize() CStreamingMessageBundle.GetMessageListFromRawData( @@ -367,7 +365,7 @@ cdef class DataReader: :barrier.get().PayloadSize() - kBarrierHeaderSize] barrier_type = barrier_header.barrier_type py_queue_id = queue_id.Binary() - from ray.streaming.runtime.transfer import CheckpointBarrier + from raystreaming.runtime.transfer import CheckpointBarrier return [CheckpointBarrier( barrier_data, timestamp, msg_id, py_queue_id, py_offset_map, barrier_id, barrier_type)] @@ -396,4 +394,4 @@ cdef c_vector[c_string] qid_vector_to_list(c_vector[CObjectID] queue_id_vec): queues = [] for obj_id in queue_id_vec: queues.append(obj_id.Binary()) - return queues + return queues \ No newline at end of file diff --git a/streaming/python/raystreaming/includes/unique_ids.pxd b/streaming/python/raystreaming/includes/unique_ids.pxd deleted file mode 100644 index 65f7252d..00000000 --- a/streaming/python/raystreaming/includes/unique_ids.pxd +++ /dev/null @@ -1,176 +0,0 @@ -from libcpp cimport bool as c_bool -from libcpp.string cimport string as c_string -from libc.stdint cimport uint8_t, uint32_t, int64_t - -cdef extern from "ray/common/id.h" namespace "ray" nogil: - cdef cppclass CBaseID[T]: - @staticmethod - T FromBinary(const c_string &binary) - - @staticmethod - const T Nil() - - @staticmethod - size_t Size() - - size_t Hash() const - c_bool IsNil() const - c_bool operator==(const CBaseID &rhs) const - c_bool operator!=(const CBaseID &rhs) const - const uint8_t *data() const - - c_string Binary() const - c_string Hex() const - - cdef cppclass CUniqueID "ray::UniqueID"(CBaseID): - CUniqueID() - - @staticmethod - size_t Size() - - @staticmethod - CUniqueID FromRandom() - - @staticmethod - CUniqueID FromBinary(const c_string &binary) - - @staticmethod - const CUniqueID Nil() - - @staticmethod - size_t Size() - - cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID): - - @staticmethod - CActorClassID FromBinary(const c_string &binary) - - cdef cppclass CActorID "ray::ActorID"(CBaseID[CActorID]): - - @staticmethod - CActorID FromBinary(const c_string &binary) - - @staticmethod - const CActorID Nil() - - @staticmethod - size_t Size() - - @staticmethod - CActorID Of(CJobID job_id, CTaskID parent_task_id, - int64_t parent_task_counter) - - CJobID JobId() - - cdef cppclass CNodeID "ray::NodeID"(CUniqueID): - - @staticmethod - CNodeID FromBinary(const c_string &binary) - - @staticmethod - CNodeID FromHex(const c_string &hex_str) - - cdef cppclass CConfigID "ray::ConfigID"(CUniqueID): - - @staticmethod - CConfigID FromBinary(const c_string &binary) - - cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID): - - @staticmethod - CFunctionID FromBinary(const c_string &binary) - - cdef cppclass CJobID "ray::JobID"(CBaseID[CJobID]): - - @staticmethod - CJobID FromBinary(const c_string &binary) - - @staticmethod - const CJobID Nil() - - @staticmethod - size_t Size() - - @staticmethod - CJobID FromInt(uint32_t value) - - uint32_t ToInt() - - cdef cppclass CTaskID "ray::TaskID"(CBaseID[CTaskID]): - - @staticmethod - CTaskID FromBinary(const c_string &binary) - - @staticmethod - const CTaskID Nil() - - @staticmethod - size_t Size() - - @staticmethod - CTaskID ForDriverTask(const CJobID &job_id) - - @staticmethod - CTaskID FromRandom(const CJobID &job_id) - - @staticmethod - CTaskID ForActorCreationTask(CActorID actor_id) - - @staticmethod - CTaskID ForActorTask(CJobID job_id, CTaskID parent_task_id, - int64_t parent_task_counter, CActorID actor_id) - - @staticmethod - CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id, - int64_t parent_task_counter) - - CActorID ActorId() const - - CJobID JobId() const - - cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]): - - @staticmethod - int64_t MaxObjectIndex() - - @staticmethod - CObjectID FromBinary(const c_string &binary) - - @staticmethod - CObjectID FromRandom() - - @staticmethod - const CObjectID Nil() - - @staticmethod - CObjectID FromIndex(const CTaskID &task_id, int64_t index) - - @staticmethod - size_t Size() - - c_bool is_put() - - int64_t ObjectIndex() const - - CTaskID TaskId() const - - cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID): - - @staticmethod - CWorkerID FromBinary(const c_string &binary) - - cdef cppclass CPlacementGroupID "ray::PlacementGroupID" \ - (CBaseID[CPlacementGroupID]): - - @staticmethod - CPlacementGroupID FromBinary(const c_string &binary) - - @staticmethod - const CActorID Nil() - - @staticmethod - size_t Size() - - @staticmethod - CPlacementGroupID FromRandom() - diff --git a/streaming/python/raystreaming/includes/unique_ids.pxi b/streaming/python/raystreaming/includes/unique_ids.pxi deleted file mode 100644 index d8466941..00000000 --- a/streaming/python/raystreaming/includes/unique_ids.pxi +++ /dev/null @@ -1,465 +0,0 @@ -"""This is a module for unique IDs in Ray. -We define different types for different IDs for type safety. - -See https://github.com/ray-project/ray/issues/3721. -""" - -# WARNING: Any additional ID types defined in this file must be added to the -# _ID_TYPES list at the bottom of this file. - -from concurrent.futures import Future -import logging -import os - -from raystreaming.includes.unique_ids cimport ( - CActorClassID, - CActorID, - CNodeID, - CConfigID, - CJobID, - CFunctionID, - CObjectID, - CTaskID, - CUniqueID, - CWorkerID, - CPlacementGroupID -) - -import ray -from ray._private.utils import decode - -logger = logging.getLogger(__name__) - - -def check_id(b, size=kUniqueIDSize): - if not isinstance(b, bytes): - raise TypeError("Unsupported type: " + str(type(b))) - if len(b) != size: - raise ValueError("ID string needs to have length " + - str(size) + ", got " + str(len(b))) - - -cdef extern from "ray/common/constants.h" nogil: - cdef int64_t kUniqueIDSize - - -cdef class BaseID: - - cdef size_t hash(self): - pass - - def binary(self): - pass - - def size(self): - pass - - def hex(self): - pass - - def is_nil(self): - pass - - def __hash__(self): - return self.hash() - - def __eq__(self, other): - return type(self) == type(other) and self.binary() == other.binary() - - def __ne__(self, other): - return type(self) != type(other) or self.binary() != other.binary() - - def __bytes__(self): - return self.binary() - - def __hex__(self): - return self.hex() - - def __repr__(self): - return self.__class__.__name__ + "(" + self.hex() + ")" - - def __str__(self): - return self.__repr__() - - def __reduce__(self): - return type(self), (self.binary(),) - - def redis_shard_hash(self): - # NOTE: The hash function used here must match the one in - # GetRedisContext in src/ray/gcs/tables.h. Changes to the - # hash function should only be made through std::hash in - # src/common/common.h. - # Do not use __hash__ that returns signed uint64_t, which - # is different from std::hash in c++ code. - return self.hash() - - -cdef class UniqueID(BaseID): - cdef CUniqueID data - - def __init__(self, id): - check_id(id) - self.data = CUniqueID.FromBinary(id) - - @classmethod - def from_binary(cls, id_bytes): - if not isinstance(id_bytes, bytes): - raise TypeError("Expect bytes, got " + str(type(id_bytes))) - return cls(id_bytes) - - @classmethod - def nil(cls): - return cls(CUniqueID.Nil().Binary()) - - @classmethod - def from_random(cls): - return cls(CUniqueID.FromRandom().Binary()) - - def size(self): - return CUniqueID.Size() - - def binary(self): - return self.data.Binary() - - def hex(self): - return decode(self.data.Hex()) - - def is_nil(self): - return self.data.IsNil() - - cdef size_t hash(self): - return self.data.Hash() - - -cdef class TaskID(BaseID): - cdef CTaskID data - - def __init__(self, id): - check_id(id, CTaskID.Size()) - self.data = CTaskID.FromBinary(id) - - cdef CTaskID native(self): - return self.data - - def size(self): - return CTaskID.Size() - - def binary(self): - return self.data.Binary() - - def hex(self): - return decode(self.data.Hex()) - - def is_nil(self): - return self.data.IsNil() - - def actor_id(self): - return ActorID(self.data.ActorId().Binary()) - - def job_id(self): - return JobID(self.data.JobId().Binary()) - - cdef size_t hash(self): - return self.data.Hash() - - @classmethod - def nil(cls): - return cls(CTaskID.Nil().Binary()) - - @classmethod - def size(cls): - return CTaskID.Size() - - @classmethod - def for_fake_task(cls, job_id): - return cls(CTaskID.FromRandom( - CJobID.FromBinary(job_id.binary())).Binary()) - - @classmethod - def for_driver_task(cls, job_id): - return cls(CTaskID.ForDriverTask( - CJobID.FromBinary(job_id.binary())).Binary()) - - @classmethod - def for_actor_creation_task(cls, actor_id): - assert isinstance(actor_id, ActorID) - return cls(CTaskID.ForActorCreationTask( - CActorID.FromBinary(actor_id.binary())).Binary()) - - @classmethod - def for_actor_task(cls, job_id, parent_task_id, - parent_task_counter, actor_id): - assert isinstance(job_id, JobID) - assert isinstance(parent_task_id, TaskID) - assert isinstance(actor_id, ActorID) - return cls(CTaskID.ForActorTask( - CJobID.FromBinary(job_id.binary()), - CTaskID.FromBinary(parent_task_id.binary()), - parent_task_counter, - CActorID.FromBinary(actor_id.binary())).Binary()) - - @classmethod - def for_normal_task(cls, job_id, parent_task_id, parent_task_counter): - assert isinstance(job_id, JobID) - assert isinstance(parent_task_id, TaskID) - return cls(CTaskID.ForNormalTask( - CJobID.FromBinary(job_id.binary()), - CTaskID.FromBinary(parent_task_id.binary()), - parent_task_counter).Binary()) - -cdef class NodeID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CNodeID.FromBinary(id) - - @classmethod - def from_hex(cls, hex_id): - binary_id = CNodeID.FromHex(hex_id).Binary() - return cls(binary_id) - - cdef CNodeID native(self): - return self.data - - -cdef class JobID(BaseID): - cdef CJobID data - - def __init__(self, id): - check_id(id, CJobID.Size()) - self.data = CJobID.FromBinary(id) - - cdef CJobID native(self): - return self.data - - @classmethod - def from_int(cls, value): - assert value < 2**32, "Maximum JobID integer is 2**32 - 1." - return cls(CJobID.FromInt(value).Binary()) - - @classmethod - def nil(cls): - return cls(CJobID.Nil().Binary()) - - @classmethod - def size(cls): - return CJobID.Size() - - def int(self): - return self.data.ToInt() - - def binary(self): - return self.data.Binary() - - def hex(self): - return decode(self.data.Hex()) - - def size(self): - return CJobID.Size() - - def is_nil(self): - return self.data.IsNil() - - cdef size_t hash(self): - return self.data.Hash() - -cdef class WorkerID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CWorkerID.FromBinary(id) - - cdef CWorkerID native(self): - return self.data - -cdef class ActorID(BaseID): - - def __init__(self, id): - check_id(id, CActorID.Size()) - self.data = CActorID.FromBinary(id) - - @classmethod - def of(cls, job_id, parent_task_id, parent_task_counter): - assert isinstance(job_id, JobID) - assert isinstance(parent_task_id, TaskID) - return cls(CActorID.Of(CJobID.FromBinary(job_id.binary()), - CTaskID.FromBinary(parent_task_id.binary()), - parent_task_counter).Binary()) - - @classmethod - def nil(cls): - return cls(CActorID.Nil().Binary()) - - @classmethod - def from_random(cls): - return cls(os.urandom(CActorID.Size())) - - @classmethod - def size(cls): - return CActorID.Size() - - def size(self): - return CActorID.Size() - - @property - def job_id(self): - return JobID(self.data.JobId().Binary()) - - def binary(self): - return self.data.Binary() - - def hex(self): - return decode(self.data.Hex()) - - def is_nil(self): - return self.data.IsNil() - - cdef size_t hash(self): - return self.data.Hash() - - cdef CActorID native(self): - return self.data - - -cdef class ClientActorRef(ActorID): - - def __init__(self, id: Union[bytes, concurrent.futures.Future]): - self._mutex = threading.Lock() - if isinstance(id, bytes): - self._set_id(id) - elif isinstance(id, Future): - self._id_future = id - else: - raise TypeError("Unexpected type for id {}".format(id)) - - def __dealloc__(self): - if client is None or client.ray is None: - # The client package or client.ray object might be set - # to None when the script exits. Should be safe to skip - # call_release in this case, since the client should have already - # disconnected at this point. - return - if client.ray.is_connected(): - try: - self._wait_for_id() - # cython would suppress this exception as well, but it tries to - # print out the exception which may crash. Log a simpler message - # instead. - except Exception: - logger.info( - "Exception from actor creation is ignored in destructor. " - "To receive this exception in application code, call " - "a method on the actor reference before its destructor " - "is run.") - if not self.data.IsNil(): - client.ray.call_release(self.id) - - def binary(self): - self._wait_for_id() - return self.data.Binary() - - def hex(self): - self._wait_for_id() - return decode(self.data.Hex()) - - def is_nil(self): - self._wait_for_id() - return self.data.IsNil() - - cdef size_t hash(self): - self._wait_for_id() - return self.data.Hash() - - cdef CActorID native(self): - self._wait_for_id() - return self.data - - @property - def id(self): - return self.binary() - - cdef _set_id(self, id): - check_id(id, CActorID.Size()) - self.data = CActorID.FromBinary(id) - client.ray.call_retain(id) - - cdef _wait_for_id(self, timeout=None): - if self._id_future: - with self._mutex: - if self._id_future: - self._set_id(self._id_future.result(timeout=timeout)) - self._id_future = None - - -cdef class FunctionID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CFunctionID.FromBinary(id) - - cdef CFunctionID native(self): - return self.data - - -cdef class ActorClassID(UniqueID): - - def __init__(self, id): - check_id(id) - self.data = CActorClassID.FromBinary(id) - - cdef CActorClassID native(self): - return self.data - -# This type alias is for backward compatibility. -ObjectID = ObjectRef - -cdef class PlacementGroupID(BaseID): - cdef CPlacementGroupID data - - def __init__(self, id): - check_id(id, CPlacementGroupID.Size()) - self.data = CPlacementGroupID.FromBinary(id) - - cdef CPlacementGroupID native(self): - return self.data - - @classmethod - def from_random(cls): - return cls(CPlacementGroupID.FromRandom().Binary()) - - @classmethod - def nil(cls): - return cls(CPlacementGroupID.Nil().Binary()) - - @classmethod - def size(cls): - return CPlacementGroupID.Size() - - def binary(self): - return self.data.Binary() - - def hex(self): - return decode(self.data.Hex()) - - def size(self): - return CPlacementGroupID.Size() - - def is_nil(self): - return self.data.IsNil() - - cdef size_t hash(self): - return self.data.Hash() - -_ID_TYPES = [ - ActorClassID, - ActorID, - NodeID, - JobID, - WorkerID, - FunctionID, - ObjectID, - TaskID, - UniqueID, - PlacementGroupID, -] - diff --git a/streaming/python/raystreaming/operator.py b/streaming/python/raystreaming/operator.py index 9163519d..254a0301 100644 --- a/streaming/python/raystreaming/operator.py +++ b/streaming/python/raystreaming/operator.py @@ -3,12 +3,12 @@ import logging from abc import ABC, abstractmethod -from ray.streaming import function -from ray.streaming import message -from ray.streaming.collector import Collector -from ray.streaming.collector import CollectionCollector -from ray.streaming.function import SourceFunction -from ray.streaming.runtime import gateway_client +from raystreaming import function +from raystreaming import message +from raystreaming.collector import Collector +from raystreaming.collector import CollectionCollector +from raystreaming.function import SourceFunction +from raystreaming.runtime import gateway_client logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/partition.py b/streaming/python/raystreaming/partition.py index fb30ba7c..94c89df0 100644 --- a/streaming/python/raystreaming/partition.py +++ b/streaming/python/raystreaming/partition.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from ray import cloudpickle -from ray.streaming.runtime import gateway_client +from raystreaming.runtime import gateway_client class Partition(ABC): diff --git a/streaming/python/raystreaming/runtime/context_backend.py b/streaming/python/raystreaming/runtime/context_backend.py index 65e811cf..b72a1cba 100644 --- a/streaming/python/raystreaming/runtime/context_backend.py +++ b/streaming/python/raystreaming/runtime/context_backend.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from os import path -from ray.streaming.config import ConfigHelper, Config +from raystreaming.config import ConfigHelper, Config logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/runtime/graph.py b/streaming/python/raystreaming/runtime/graph.py index acfd5b83..1e1eff54 100644 --- a/streaming/python/raystreaming/runtime/graph.py +++ b/streaming/python/raystreaming/runtime/graph.py @@ -2,13 +2,13 @@ import logging import ray -import ray.streaming.generated.remote_call_pb2 as remote_call_pb -import ray.streaming.operator as operator -import ray.streaming.partition as partition +import raystreaming.generated.remote_call_pb2 as remote_call_pb +import raystreaming.operator as operator +import raystreaming.partition as partition from ray._raylet import ActorID from ray.actor import ActorHandle -from ray.streaming.config import Config -from ray.streaming.generated.streaming_pb2 import Language +from raystreaming.config import Config +from raystreaming.generated.streaming_pb2 import Language logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/runtime/processor.py b/streaming/python/raystreaming/runtime/processor.py index 1083713e..ad85204e 100644 --- a/streaming/python/raystreaming/runtime/processor.py +++ b/streaming/python/raystreaming/runtime/processor.py @@ -1,9 +1,9 @@ import logging from abc import ABC, abstractmethod -import ray.streaming.context as context -from ray.streaming import message -from ray.streaming.operator import OperatorType +import raystreaming.context as context +from raystreaming import message +from raystreaming.operator import OperatorType logger = logging.getLogger(__name__) @@ -58,7 +58,7 @@ def load_checkpoint(self, checkpoint_obj): class SourceProcessor(StreamingProcessor): - """Processor for :class:`ray.streaming.operator.SourceOperator` """ + """Processor for :class:`raystreaming.operator.SourceOperator` """ def __init__(self, operator): super().__init__(operator) diff --git a/streaming/python/raystreaming/runtime/remote_call.py b/streaming/python/raystreaming/runtime/remote_call.py index 4f5f082e..6e1233a7 100644 --- a/streaming/python/raystreaming/runtime/remote_call.py +++ b/streaming/python/raystreaming/runtime/remote_call.py @@ -5,8 +5,8 @@ from enum import Enum from ray.actor import ActorHandle -from ray.streaming.generated import remote_call_pb2 -from ray.streaming.runtime.command\ +from raystreaming.generated import remote_call_pb2 +from raystreaming.runtime.command\ import WorkerCommitReport, WorkerRollbackRequest logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/runtime/serialization.py b/streaming/python/raystreaming/runtime/serialization.py index 600e1084..c57af6f6 100644 --- a/streaming/python/raystreaming/runtime/serialization.py +++ b/streaming/python/raystreaming/runtime/serialization.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod import pickle import msgpack -from ray.streaming import message +from raystreaming import message RECORD_TYPE_ID = 0 KEY_RECORD_TYPE_ID = 1 diff --git a/streaming/python/raystreaming/runtime/task.py b/streaming/python/raystreaming/runtime/task.py index 54ec3cf3..336c7b36 100644 --- a/streaming/python/raystreaming/runtime/task.py +++ b/streaming/python/raystreaming/runtime/task.py @@ -6,25 +6,25 @@ from abc import ABC, abstractmethod from typing import Optional -from ray.streaming.collector import OutputCollector -from ray.streaming.config import Config -from ray.streaming.context import RuntimeContextImpl -from ray.streaming.generated import remote_call_pb2 -from ray.streaming.runtime import serialization -from ray.streaming.runtime.command import WorkerCommitReport -from ray.streaming.runtime.failover import Barrier, OpCheckpointInfo -from ray.streaming.runtime.remote_call import RemoteCallMst -from ray.streaming.runtime.serialization import \ +from raystreaming.collector import OutputCollector +from raystreaming.config import Config +from raystreaming.context import RuntimeContextImpl +from raystreaming.generated import remote_call_pb2 +from raystreaming.runtime import serialization +from raystreaming.runtime.command import WorkerCommitReport +from raystreaming.runtime.failover import Barrier, OpCheckpointInfo +from raystreaming.runtime.remote_call import RemoteCallMst +from raystreaming.runtime.serialization import \ PythonSerializer, CrossLangSerializer -from ray.streaming.runtime.transfer import CheckpointBarrier -from ray.streaming.runtime.transfer import DataMessage -from ray.streaming.runtime.transfer import ChannelID, DataWriter, DataReader -from ray.streaming.runtime.transfer import ChannelRecoverInfo -from ray.streaming.runtime.transfer import ChannelInterruptException +from raystreaming.runtime.transfer import CheckpointBarrier +from raystreaming.runtime.transfer import DataMessage +from raystreaming.runtime.transfer import ChannelID, DataWriter, DataReader +from raystreaming.runtime.transfer import ChannelRecoverInfo +from raystreaming.runtime.transfer import ChannelInterruptException if typing.TYPE_CHECKING: - from ray.streaming.runtime.worker import JobWorker - from ray.streaming.runtime.processor import Processor, SourceProcessor + from raystreaming.runtime.worker import JobWorker + from raystreaming.runtime.processor import Processor, SourceProcessor logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/runtime/transfer.py b/streaming/python/raystreaming/runtime/transfer.py index 4cb482de..a7bf3ec6 100644 --- a/streaming/python/raystreaming/runtime/transfer.py +++ b/streaming/python/raystreaming/runtime/transfer.py @@ -6,10 +6,10 @@ from abc import ABC, abstractmethod import ray -import ray.streaming._streaming as _streaming -import ray.streaming.generated.streaming_pb2 as streaming_pb +import raystreaming._streaming as _streaming +import raystreaming.generated.streaming_pb2 as streaming_pb from ray.actor import ActorHandle -from ray.streaming.config import Config +from raystreaming.config import Config from ray._raylet import JavaFunctionDescriptor from ray._raylet import PythonFunctionDescriptor from ray._raylet import Language diff --git a/streaming/python/raystreaming/runtime/worker.py b/streaming/python/raystreaming/runtime/worker.py index d6d8eb02..8997da46 100644 --- a/streaming/python/raystreaming/runtime/worker.py +++ b/streaming/python/raystreaming/runtime/worker.py @@ -6,18 +6,18 @@ from typing import Optional import ray -import ray.streaming.runtime.processor as processor +import raystreaming.runtime.processor as processor from ray.actor import ActorHandle -from ray.streaming.generated import remote_call_pb2 -from ray.streaming.runtime.command import WorkerRollbackRequest -from ray.streaming.runtime.failover import Barrier -from ray.streaming.runtime.graph import ExecutionVertexContext, ExecutionVertex -from ray.streaming.runtime.remote_call import CallResult, RemoteCallMst -from ray.streaming.runtime.context_backend import ContextBackendFactory -from ray.streaming.runtime.task import SourceStreamTask, OneInputStreamTask -from ray.streaming.runtime.transfer import channel_bytes_to_str -from ray.streaming.config import Config -import ray.streaming._streaming as _streaming +from raystreaming.generated import remote_call_pb2 +from raystreaming.runtime.command import WorkerRollbackRequest +from raystreaming.runtime.failover import Barrier +from raystreaming.runtime.graph import ExecutionVertexContext, ExecutionVertex +from raystreaming.runtime.remote_call import CallResult, RemoteCallMst +from raystreaming.runtime.context_backend import ContextBackendFactory +from raystreaming.runtime.task import SourceStreamTask, OneInputStreamTask +from raystreaming.runtime.transfer import channel_bytes_to_str +from raystreaming.config import Config +import raystreaming._streaming as _streaming logger = logging.getLogger(__name__) diff --git a/streaming/python/raystreaming/tests/test_word_count.py b/streaming/python/raystreaming/tests/test_word_count.py index e38fc5ca..1f2d58e1 100644 --- a/streaming/python/raystreaming/tests/test_word_count.py +++ b/streaming/python/raystreaming/tests/test_word_count.py @@ -1,7 +1,7 @@ import os import sys import ray -from ray.streaming import StreamingContext +from raystreaming import StreamingContext from ray._private.test_utils import wait_for_condition