Skip to content
Permalink
Browse files

remove queue and add streaming python channel

  • Loading branch information
chaokunyang committed Dec 1, 2019
1 parent 37232fe commit 8e7296ff016e671af47d8e4dcbbee38c2452efdf
@@ -3,4 +3,4 @@
# cython: embedsignature = True
# cython: language_level = 3

include "includes/native_transfer.pxi"
include "includes/native_channel.pxi"
@@ -8,8 +8,11 @@
import sys
import time

import ray
from ray.streaming.operator import PStrategy
from ray.streaming.runtime.queue.queue_interface import QueueID
from ray.streaming.runtime.channel import ChannelID
from ray.streaming.config import Config
import ray.streaming.runtime.channel as channel

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
@@ -49,7 +52,7 @@ def __init__(self, src_operator_id, src_instance_index,
self.dst_operator_id = dst_operator_id
self.dst_instance_index = dst_instance_index
self.str_qid = str_qid
self.qid = QueueID(str_qid)
self.qid = ChannelID(str_qid)

def __repr__(self):
return "(src({},{}),dst({},{}), qid({}))".format(
@@ -78,32 +81,35 @@ class DataInput(object):
closed (True) or not (False).
"""

def __init__(self, env, queue_link, channels):
def __init__(self, env, channels):
assert len(channels) > 0
self.env = env
self.queue_link = queue_link
self.consumer = None # created in `init` method
self.reader = None # created in `init` method
self.input_channels = channels
self.channel_index = 0
self.max_index = len(channels)
# Tracks the channels that have been closed. qid: close status
self.closed = {}

def init(self):
qids = [channel.str_qid for channel in self.input_channels]
channels = [c.str_qid for c in self.input_channels]
input_actors = []
for channel in self.input_channels:
actor = self.env.execution_graph.get_actor(channel.src_operator_id, channel.src_instance_index)
for c in self.input_channels:
actor = self.env.execution_graph.get_actor(c.src_operator_id, c.src_instance_index)
input_actors.append(actor)
logger.info("DataInput input_actors %s", input_actors)
self.consumer = self.queue_link.register_queue_consumer(qids, input_actors)
conf = {
Config.TASK_JOB_ID: ray.runtime_context._get_runtime_context().current_driver_id,
Config.QUEUE_TYPE: self.env.config.queue_type
}
self.reader = channel.DataReader(channels, input_actors, conf)

def pull(self):
# pull from queue
queue_item = self.consumer.pull(100)
queue_item = self.reader.read(100)
while queue_item is None:
time.sleep(0.001)
queue_item = self.consumer.pull(100)
queue_item = self.reader.read(100)
msg_data = queue_item.body()
if msg_data == _CLOSE_FLAG:
self.closed[queue_item.queue_id] = True
@@ -140,11 +146,10 @@ class DataOutput(object):
least one shuffle_key_channel.
"""

def __init__(self, env, queue_link, channels, partitioning_schemes):
def __init__(self, env, channels, partitioning_schemes):
assert len(channels) > 0
self.env = env
self.queue_link = queue_link
self.producer = None # created in `init` method
self.writer = None # created in `init` method
self.channels = channels
self.key_selector = None
self.round_robin_indexes = [0]
@@ -203,15 +208,19 @@ def __init__(self, env, queue_link, channels, partitioning_schemes):
assert not (self.shuffle_exists and self.shuffle_key_exists)

def init(self):
"""init DataOutput which creates QueueProducer"""
qids = [channel.str_qid for channel in self.channels]
"""init DataOutput which creates DataWriter"""
channel_ids = [c.str_qid for c in self.channels]
to_actors = []
for channel in self.channels:
actor = self.env.execution_graph.get_actor(channel.dst_operator_id, channel.dst_instance_index)
for c in self.channels:
actor = self.env.execution_graph.get_actor(c.dst_operator_id, c.dst_instance_index)
to_actors.append(actor)
logger.info("DataOutput output_actors %s", to_actors)

self.producer = self.queue_link.register_queue_producer(qids, to_actors)
conf = {
Config.TASK_JOB_ID: ray.runtime_context._get_runtime_context().current_driver_id,
Config.QUEUE_TYPE: self.env.config.queue_type
}
self.writer = channel.DataWriter(channel_ids, to_actors, conf)

def close(self):
"""Close the channel (True) by propagating _CLOSE_FLAG
@@ -220,7 +229,7 @@ def close(self):
to sink to notify that the end of data in a stream.
"""
for channel in self.channels:
self.producer.produce(channel.qid, _CLOSE_FLAG)
self.writer.write(channel.qid, _CLOSE_FLAG)
# stop StreamingWriter may cause None flag not sent to peer actor

# Pushes the record to the output
@@ -269,7 +278,7 @@ def push(self, record):
msg_data = pickle.dumps(record)
for channel in target_channels:
# send data to queue
self.producer.produce(channel.qid, msg_data)
self.writer.write(channel.qid, msg_data)

# Pushes a list of records to the output
# Each individual output queue flushes batches to plasma periodically
@@ -32,19 +32,16 @@ def filter_fn(word):

args = parser.parse_args()

ray.init(local_mode=False)
ray.init(local_mode=True)

# A Ray streaming environment with the default configuration
env = Environment(config=Conf(queue_type=Config.NATIVE_QUEUE))
env = Environment(config=Conf(queue_type=Config.MEMORY_QUEUE))

# Stream represents the ouput of the filter and
# can be forked into other dataflows
stream = env.read_text_file(args.input_file) \
.shuffle() \
.flat_map(splitter) \
.set_parallelism(2) \
.filter(filter_fn) \
.set_parallelism(2) \
.inspect(lambda x: print("result", x)) # Prints the contents of the
# stream to stdout
start = time.time()
@@ -61,7 +61,9 @@ cdef extern from "status.h" namespace "ray::streaming" nogil:
cdef extern from "runtime_context.h" namespace "ray::streaming" nogil:
cdef cppclass CRuntimeContext "ray::streaming::RuntimeContext":
CRuntimeContext()
void SetConfig(const uint8_t *data, uint32_t size);
void SetConfig(const uint8_t *data, uint32_t size)
inline void MarkMockTest()
inline c_bool IsMockTest()

cdef extern from "message/message.h" namespace "ray::streaming" nogil:
cdef cppclass CStreamingMessageType "ray::streaming::StreamingMessageType":
@@ -1,6 +1,6 @@
from libc.stdint cimport *
from libcpp cimport bool as c_bool
from libcpp.memory cimport shared_ptr, make_shared, dynamic_pointer_cast
from libcpp.memory cimport shared_ptr, make_shared
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.list cimport list as c_list
@@ -41,10 +41,10 @@ from ray.streaming.includes.libstreaming cimport (

import logging
from ray.function_manager import FunctionDescriptor
from ray.streaming.runtime.queue.queue_interface import QueueInitException, QueueInterruptException
import ray.streaming.runtime.channel as channel


queue_logger = logging.getLogger(__name__)
channel_logger = logging.getLogger(__name__)


cdef class ReaderClient:
@@ -143,14 +143,12 @@ cdef class DataWriter:
raise Exception("use create() to create DataWriter")

@staticmethod
def create(self,
list py_output_channels,
list output_actor_ids: list[ActorID],
uint64_t queue_size,
list py_msg_ids,
bytes config_bytes):
if self.producer:
return self.producer
def create(list py_output_channels,
list output_actor_ids: list[ActorID],
uint64_t queue_size,
list py_msg_ids,
bytes config_bytes,
c_bool is_mock):
cdef:
c_vector[CObjectID] channel_ids = bytes_list_to_qid_vec(py_output_channels)
c_vector[CActorID] actor_ids
@@ -163,9 +161,11 @@ cdef class DataWriter:
msg_ids.push_back(<uint64_t>py_msg_id)

cdef shared_ptr[CRuntimeContext] ctx = make_shared[CRuntimeContext]()
if is_mock:
ctx.get().MarkMockTest()
if config_bytes:
config_data = config_bytes
queue_logger.info("load config, config bytes size: %s", config_data.nbytes)
channel_logger.info("load config, config bytes size: %s", config_data.nbytes)
ctx.get().SetConfig(<uint8_t *>(&config_data[0]), config_data.nbytes)
c_writer = new CDataWriter(ctx)
cdef:
@@ -175,23 +175,23 @@ cdef class DataWriter:
queue_size_vec.push_back(queue_size)
cdef CStreamingStatus status = c_writer.Init(channel_ids, actor_ids, msg_ids, queue_size_vec)
if remain_id_vec.size() != 0:
queue_logger.warning("failed queue amounts => %s", remain_id_vec.size())
channel_logger.warning("failed queue amounts => %s", remain_id_vec.size())
if <uint32_t>status != <uint32_t> libstreaming.StatusOK:
msg = "initialize writer failed, status={}".format(<uint32_t>status)
queue_logger.error(msg)
channel_logger.error(msg)
del c_writer
raise QueueInitException(msg, qid_vector_to_list(remain_id_vec))
raise channel.ChannelInitException(msg, qid_vector_to_list(remain_id_vec))

c_writer.Run()
queue_logger.info("create native producer succeed")
cdef DataWriter writer = DataWriter.__new__()
channel_logger.info("create native writer succeed")
cdef DataWriter writer = DataWriter.__new__(DataWriter)
writer.writer = c_writer
return writer

def __dealloc__(self):
if self.writer != NULL:
del self.writer
queue_logger.info("deleted StreamingWriter")
channel_logger.info("deleted DataWriter")
self.writer = NULL

def write(self, ObjectID qid, const unsigned char[:] value):
@@ -207,7 +207,7 @@ cdef class DataWriter:

def stop(self):
self.writer.Stop()
queue_logger.info("stopped DataWriter")
channel_logger.info("stopped DataWriter")


cdef class DataReader:
@@ -220,16 +220,14 @@ cdef class DataReader:
raise Exception("use create() to create DataReader")

@staticmethod
def create(self,
list py_input_queues,
list input_actor_ids: list[ActorID],
list py_seq_ids,
list py_msg_ids,
int64_t timer_interval,
c_bool is_recreate,
bytes config_bytes):
if self.consumer:
return self.consumer
def create(list py_input_queues,
list input_actor_ids: list[ActorID],
list py_seq_ids,
list py_msg_ids,
int64_t timer_interval,
c_bool is_recreate,
bytes config_bytes,
c_bool is_mock):
cdef:
c_vector[CObjectID] queue_id_vec = bytes_list_to_qid_vec(py_input_queues)
c_vector[CActorID] actor_ids
@@ -246,22 +244,24 @@ cdef class DataReader:
cdef shared_ptr[CRuntimeContext] ctx = make_shared[CRuntimeContext]()
if config_bytes:
config_data = config_bytes
queue_logger.info("load config, config bytes size: %s", config_data.nbytes)
channel_logger.info("load config, config bytes size: %s", config_data.nbytes)
ctx.get().SetConfig(<uint8_t *>(&(config_data[0])), config_data.nbytes)
if is_mock:
ctx.get().MarkMockTest()
c_reader = new CDataReader(ctx)
c_reader.Init(queue_id_vec, actor_ids, seq_ids, msg_ids, timer_interval)
queue_logger.info("create native consumer succeed")
cdef DataReader reader = DataReader.__new__()
channel_logger.info("create native reader succeed")
cdef DataReader reader = DataReader.__new__(DataReader)
reader.reader = c_reader
return reader

def __dealloc__(self):
if self.reader != NULL:
del self.reader
queue_logger.info("deleted DataReader")
channel_logger.info("deleted DataReader")
self.reader = NULL

def pull(self, uint32_t timeout_millis):
def read(self, uint32_t timeout_millis):
cdef:
shared_ptr[CDataBundle] bundle
CStreamingStatus status
@@ -270,11 +270,11 @@ cdef class DataReader:
cdef uint32_t bundle_type = <uint32_t>(bundle.get().meta.get().GetBundleType())
if <uint32_t> status != <uint32_t> libstreaming.StatusOK:
if <uint32_t> status == <uint32_t> libstreaming.StatusInterrupted:
raise QueueInterruptException("consumer interrupted")
raise channel.ChannelInterruptException("reader interrupted")
elif <uint32_t> status == <uint32_t> libstreaming.StatusInitQueueFailed:
raise Exception("init queue failed")
raise Exception("init channel failed")
elif <uint32_t> status == <uint32_t> libstreaming.StatusWaitQueueTimeOut:
raise Exception("wait queue object timeout")
raise Exception("wait channel object timeout")
cdef:
uint32_t msg_nums
CObjectID queue_id
@@ -303,7 +303,7 @@ cdef class DataReader:

def stop(self):
self.reader.Stop()
queue_logger.info("stopped DataReader")
channel_logger.info("stopped DataReader")


cdef c_vector[CObjectID] bytes_list_to_qid_vec(list py_queue_ids) except *:

0 comments on commit 8e7296f

Please sign in to comment.
You can’t perform that action at this time.