Skip to content

Commit

Permalink
remove queue_id in interface
Browse files Browse the repository at this point in the history
  • Loading branch information
悟言 committed Nov 26, 2019
1 parent 8b8a7fb commit e03622c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 77 deletions.
24 changes: 11 additions & 13 deletions streaming/src/channel.cc
Expand Up @@ -17,7 +17,7 @@ StreamingQueueProducer::StreamingQueueProducer(std::shared_ptr<Config> &transfer
STREAMING_LOG(INFO) << "Producer Init";

queue_writer_ =
std::make_shared<StreamingQueueWriter>(p_channel_info.actor_id);
std::make_shared<StreamingQueueWriter>(p_channel_info.channel_id, p_channel_info.actor_id);
}

StreamingQueueProducer::~StreamingQueueProducer() {
Expand Down Expand Up @@ -55,17 +55,16 @@ StreamingStatus StreamingQueueProducer::CreateTransferChannel() {
}

StreamingStatus StreamingQueueProducer::CreateQueue() {
auto &channel_id = channel_info.channel_id;
queue_writer_->CreateQueue(channel_id, channel_info.queue_size, channel_info.actor_id);
queue_writer_->CreateQueue(channel_info.queue_size, channel_info.actor_id);

STREAMING_LOG(INFO) << "q id => " << channel_id << ", queue size => "
STREAMING_LOG(INFO) << "q id => " << channel_info.channel_id << ", queue size => "
<< channel_info.queue_size;

return StreamingStatus::OK;
}

StreamingStatus StreamingQueueProducer::DestroyTransferChannel() {
RAY_IGNORE_EXPR(queue_writer_->DeleteQueue(channel_info.channel_id));
RAY_IGNORE_EXPR(queue_writer_->DeleteQueue());
return StreamingStatus::OK;
}

Expand All @@ -76,16 +75,15 @@ StreamingStatus StreamingQueueProducer::ClearTransferCheckpoint(

StreamingStatus StreamingQueueProducer::NotifyChannelConsumed(uint64_t channel_offset) {
Status st =
queue_writer_->SetQueueEvictionLimit(channel_info.channel_id, channel_offset);
queue_writer_->SetQueueEvictionLimit(channel_offset);
STREAMING_CHECK(st.code() == StatusCode::OK)
<< " exception in clear barrier in writerwith client returned => " << st.message();
return StreamingStatus::OK;
}

StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data,
uint32_t data_size) {
Status status = queue_writer_->PushQueueItem(channel_info.channel_id,
channel_info.current_seq_id + 1, data,
Status status = queue_writer_->PushQueueItem(channel_info.current_seq_id + 1, data,
data_size, current_time_ms());

if (status.code() != StatusCode::OK) {
Expand All @@ -110,7 +108,7 @@ StreamingQueueConsumer::StreamingQueueConsumer(std::shared_ptr<Config> &transfer
STREAMING_LOG(INFO) << "Consumer Init";

queue_reader_ =
std::make_shared<StreamingQueueReader>(c_channel_info.actor_id);
std::make_shared<StreamingQueueReader>(c_channel_info.channel_id, c_channel_info.actor_id);
}

StreamingQueueConsumer::~StreamingQueueConsumer() {
Expand All @@ -121,15 +119,15 @@ StreamingStatus StreamingQueueConsumer::CreateTransferChannel() {
// subscribe next seq id from checkpoint id
// pull remote queue to local store if scheduler connection is set
bool success = queue_reader_->GetQueue(
channel_info.channel_id, channel_info.current_seq_id + 1, channel_info.actor_id);
channel_info.current_seq_id + 1, channel_info.actor_id);
if (!success) {
return StreamingStatus::InitQueueFailed;
}
return StreamingStatus::OK;
}

StreamingStatus StreamingQueueConsumer::DestroyTransferChannel() {
RAY_IGNORE_EXPR(queue_reader_->DeleteQueue(channel_info.channel_id));
RAY_IGNORE_EXPR(queue_reader_->DeleteQueue());
return StreamingStatus::OK;
}

Expand All @@ -142,13 +140,13 @@ StreamingStatus StreamingQueueConsumer::ConsumeItemFromChannel(uint64_t &offset_
uint8_t *&data,
uint32_t &data_size,
uint32_t timeout) {
auto st = queue_reader_->GetQueueItem(channel_info.channel_id, data, data_size,
auto st = queue_reader_->GetQueueItem(data, data_size,
offset_id, timeout);
return StreamingStatus::OK;
}

StreamingStatus StreamingQueueConsumer::NotifyChannelConsumed(uint64_t offset_id) {
queue_reader_->NotifyConsumedItem(channel_info.channel_id, offset_id);
queue_reader_->NotifyConsumedItem(offset_id);
return StreamingStatus::OK;
}

Expand Down
88 changes: 37 additions & 51 deletions streaming/src/queue/queue_interface.cc
Expand Up @@ -6,103 +6,90 @@ namespace ray {
namespace streaming {

/// code below is interface implementation of streaming queue
StreamingQueueWriter::StreamingQueueWriter(const ActorID& actor_id)
: actor_id_(actor_id) {
StreamingQueueWriter::StreamingQueueWriter(const ObjectID &queue_id, const ActorID& actor_id)
: queue_id_(queue_id), actor_id_(actor_id) {
upstream_service_ = ray::streaming::UpstreamService::GetService(nullptr, actor_id_);
}

void StreamingQueueWriter::CreateQueue(const ObjectID &queue_id, int64_t max_size,
ActorID &peer_actor_id) {
STREAMING_LOG(INFO) << "CreateQueue qid: " << queue_id << " data_size: " << max_size;
if (upstream_service_->UpstreamQueueExists(queue_id)) {
void StreamingQueueWriter::CreateQueue(int64_t max_size, ActorID &peer_actor_id) {
STREAMING_LOG(INFO) << "CreateQueue qid: " << queue_id_ << " data_size: " << max_size;
if (upstream_service_->UpstreamQueueExists(queue_id_)) {
RAY_LOG(INFO) << "StreamingQueueWriter::CreateQueue duplicate!!!";
return;
}

upstream_service_->AddPeerActor(queue_id, peer_actor_id);
auto queue = upstream_service_->CreateUpstreamQueue(queue_id, actor_id_, peer_actor_id, max_size);
STREAMING_CHECK(queue != nullptr);
upstream_service_->AddPeerActor(queue_id_, peer_actor_id);
queue_ = upstream_service_->CreateUpstreamQueue(queue_id_, actor_id_, peer_actor_id, max_size);
STREAMING_CHECK(queue_ != nullptr);

std::vector<ObjectID> queue_ids, failed_queues;
queue_ids.push_back(queue_id);
queue_ids.push_back(queue_id_);
upstream_service_->WaitQueues(queue_ids, 10*1000, failed_queues, DOWNSTREAM);
}

Status StreamingQueueWriter::SetQueueEvictionLimit(const ObjectID &queue_id,
uint64_t eviction_limit) {
std::shared_ptr<WriterQueue> queue = upstream_service_->GetUpQueue(queue_id);
STREAMING_CHECK(queue != nullptr);

queue->SetQueueEvictionLimit(eviction_limit);
Status StreamingQueueWriter::SetQueueEvictionLimit(uint64_t eviction_limit) {
queue_->SetQueueEvictionLimit(eviction_limit);
return Status::OK();
}

void StreamingQueueWriter::GetMinConsumedSeqID(const ObjectID &queue_id,
uint64_t &min_consumed_id) {
std::shared_ptr<WriterQueue> queue = upstream_service_->GetUpQueue(queue_id);
STREAMING_CHECK(queue != nullptr);

min_consumed_id = queue->GetMinConsumedSeqID();
void StreamingQueueWriter::GetMinConsumedSeqID(uint64_t &min_consumed_id) {
min_consumed_id = queue_->GetMinConsumedSeqID();
}

Status StreamingQueueWriter::PushQueueItem(const ObjectID &queue_id, uint64_t seq_id,
Status StreamingQueueWriter::PushQueueItem(uint64_t seq_id,
uint8_t *data, uint32_t data_size,
uint64_t timestamp) {
STREAMING_LOG(INFO) << "QueueWriter::PushQueueItem:"
<< " qid: " << queue_id << " seq_id: " << seq_id
<< " qid: " << queue_id_ << " seq_id: " << seq_id
<< " data_size: " << data_size;
std::shared_ptr<WriterQueue> queue = upstream_service_->GetUpQueue(queue_id);
STREAMING_CHECK(queue != nullptr);

Status status = queue->Push(seq_id, data, data_size, timestamp, false);
Status status = queue_->Push(seq_id, data, data_size, timestamp, false);
if (status.IsOutOfMemory()) {
Status st = queue->TryEvictItems();
Status st = queue_->TryEvictItems();
if (!st.ok()) {
STREAMING_LOG(INFO) << "Evict fail.";
return st;
}

st = queue->Push(seq_id, data, data_size, timestamp, false);
st = queue_->Push(seq_id, data, data_size, timestamp, false);
STREAMING_LOG(INFO) << "After evict PushQueueItem: " << st.ok();
return st;
}

queue->Send();
queue_->Send();
return status;
}

Status StreamingQueueWriter::DeleteQueue(const ObjectID &queue_id) {
Status StreamingQueueWriter::DeleteQueue() {
return Status::OK();
}

StreamingQueueReader::StreamingQueueReader(const ActorID& actor_id)
: actor_id_(actor_id) {
StreamingQueueReader::StreamingQueueReader(const ObjectID &queue_id, const ActorID& actor_id)
: queue_id_(queue_id), actor_id_(actor_id) {
downstream_service_ = ray::streaming::DownstreamService::GetService(nullptr, actor_id_);
}

/// Create queue and pull queue (if needed), synchronously.
bool StreamingQueueReader::GetQueue(const ObjectID &queue_id,
uint64_t start_seq_id, ActorID &peer_actor_id) {
STREAMING_LOG(INFO) << "GetQueue qid: " << queue_id << " start_seq_id: " << start_seq_id;
if (downstream_service_->DownstreamQueueExists(queue_id)) {
bool StreamingQueueReader::GetQueue(uint64_t start_seq_id, ActorID &peer_actor_id) {
STREAMING_LOG(INFO) << "GetQueue qid: " << queue_id_ << " start_seq_id: " << start_seq_id;
if (downstream_service_->DownstreamQueueExists(queue_id_)) {
RAY_LOG(INFO) << "StreamingQueueReader::GetQueue duplicate!!!";
return true;
}

downstream_service_->AddPeerActor(queue_id, peer_actor_id);
downstream_service_->AddPeerActor(queue_id_, peer_actor_id);

STREAMING_LOG(INFO) << "Create ReaderQueue " << queue_id
STREAMING_LOG(INFO) << "Create ReaderQueue " << queue_id_
<< " pull from start_seq_id: " << start_seq_id;
downstream_service_->CreateDownstreamQueue(queue_id, actor_id_, peer_actor_id);
queue_ = downstream_service_->CreateDownstreamQueue(queue_id_, actor_id_, peer_actor_id);
return true;
}

Status StreamingQueueReader::GetQueueItem(const ObjectID &queue_id, uint8_t *&data,
Status StreamingQueueReader::GetQueueItem(uint8_t *&data,
uint32_t &data_size, uint64_t &seq_id,
uint64_t timeout_ms) {
STREAMING_LOG(INFO) << "GetQueueItem qid: " << queue_id;
auto queue = downstream_service_->GetDownQueue(queue_id);
QueueItem item = queue->PopPendingBlockTimeout(timeout_ms * 1000);
STREAMING_LOG(INFO) << "GetQueueItem qid: " << queue_id_;
STREAMING_CHECK(queue_ != nullptr);
QueueItem item = queue_->PopPendingBlockTimeout(timeout_ms * 1000);
if (item.SeqId() == QUEUE_INVALID_SEQ_ID) {
STREAMING_LOG(INFO) << "GetQueueItem timeout.";
data = nullptr;
Expand All @@ -115,20 +102,19 @@ Status StreamingQueueReader::GetQueueItem(const ObjectID &queue_id, uint8_t *&da
seq_id = item.SeqId();
data_size = item.Buffer()->Size();

STREAMING_LOG(DEBUG) << "GetQueueItem qid: " << queue_id
STREAMING_LOG(DEBUG) << "GetQueueItem qid: " << queue_id_
<< " seq_id: " << seq_id
<< " msg_id: " << item.MaxMsgId()
<< " data_size: " << data_size;
return Status::OK();
}

void StreamingQueueReader::NotifyConsumedItem(const ObjectID &queue_id, uint64_t seq_id) {
STREAMING_LOG(DEBUG) << "QueueReader::NotifyConsumedItem";
auto queue = downstream_service_->GetDownQueue(queue_id);
queue->OnConsumed(seq_id);
void StreamingQueueReader::NotifyConsumedItem(uint64_t seq_id) {
STREAMING_CHECK(queue_ != nullptr);
queue_->OnConsumed(seq_id);
}

Status StreamingQueueReader::DeleteQueue(const ObjectID &queue_id) {
Status StreamingQueueReader::DeleteQueue() {
return Status::OK();
}

Expand Down
28 changes: 15 additions & 13 deletions streaming/src/queue/queue_interface.h
Expand Up @@ -20,39 +20,40 @@ class StreamingQueueWriter {
/// \param[in] core_worker The C++ pointer point to CoreWorker object of current Actor, obtained in Python/Java
/// \param[in] async_func peer's asynchronous entry point function descriptor for direct call in Python/Java
/// \param[in] sync_func peer's synchronous entry point function descriptor for direct call in Python/Java
StreamingQueueWriter(const ActorID& actor_id);
StreamingQueueWriter(const ObjectID &queue_id, const ActorID& actor_id);
~StreamingQueueWriter() {}

/// Create a upstream queue, using \param queue_id to identify this queue
/// \param[in] queue_id
/// \param[in] max_size max storage size of the queue in bytes, PushQueueItem will get OutOfMemory if max_size reached.
/// \param[in] peer_actor_id the actor id of peer actor
void CreateQueue(const ObjectID &queue_id, int64_t max_size,
ActorID &peer_actor_id);
void CreateQueue(int64_t max_size, ActorID &peer_actor_id);

/// Set max evict limit offset, this queue can evict data less than the offset
/// \param[in] queue_id
/// \param[in] eviction_limit max evict limit offset
Status SetQueueEvictionLimit(const ObjectID &queue_id, uint64_t eviction_limit);
Status SetQueueEvictionLimit(uint64_t eviction_limit);

/// Get consumed offset of corresponded downstream queue
/// \param[in] queue_id queue id of upstream queue
/// \param[out] min_consumed_id minimum consumed offset of downstream queue
void GetMinConsumedSeqID(const ObjectID &queue_id, uint64_t &min_consumed_id);
void GetMinConsumedSeqID(uint64_t &min_consumed_id);

/// Write a queue item into queue, this item will be sent to corresponded downstream queue
/// \param[in] queue_id
/// \param[in] seq_id sequential id of this queue item
/// \param[in] data data buffer pointer, should be freed by the caller
/// \param[in] data_size length of the data buffer
/// \param[in] timestamp timestamp in milliseconds of the time when the queu item was pushed
Status PushQueueItem(const ObjectID &queue_id, uint64_t seq_id, uint8_t *data,
Status PushQueueItem(uint64_t seq_id, uint8_t *data,
uint32_t data_size, uint64_t timestamp);
Status DeleteQueue(const ObjectID &queue_id);
Status DeleteQueue();

private:
std::shared_ptr<ray::streaming::UpstreamService> upstream_service_;
ObjectID queue_id_;
ActorID actor_id_;
std::shared_ptr<WriterQueue> queue_;
};

/// The interfaces streaming queue exposed to DataReader.
Expand All @@ -61,34 +62,35 @@ class StreamingQueueReader {
/// \param[in] core_worker The C++ pointer point to CoreWorker object of current Actor, obtained in Python/Java
/// \param[in] async_func peer's asynchronous entry point function descriptor for direct call in Python/Java
/// \param[in] sync_func peer's synchronous entry point function descriptor for direct call in Python/Java
StreamingQueueReader(const ActorID& actor_id);
StreamingQueueReader(const ObjectID &queue_id, const ActorID& actor_id);
~StreamingQueueReader() {}

/// Create a downstream queue, using \param queue_id to identify this queue
/// \param[in] queue_id
/// \param[in] start_seq_id last consumed offset before failover.
/// \param[in] peer_actor_id the actor id of peer actor
bool GetQueue(const ObjectID &queue_id, uint64_t start_seq_id,
ActorID &peer_actor_id);
bool GetQueue(uint64_t start_seq_id, ActorID &peer_actor_id);

/// Read the latest queue item from the queue identified by \param queue_id
/// \param[in] queue_id
/// \param[in] data the start address of a data buffer to hold item
/// \param[out] data_size return data buffer size
/// \param[out] seq_id return the sequential id of the latest queue item
/// \param[in] timeout_ms timeout in milliseconds, data will be nullptr if we can not read a item when timeout.
Status GetQueueItem(const ObjectID &queue_id, uint8_t *&data, uint32_t &data_size,
Status GetQueueItem(uint8_t *&data, uint32_t &data_size,
uint64_t &seq_id, uint64_t timeout_ms = -1);

/// Notify downstream's consumed offset to corresponded upstream queue.
/// \param[in] queue_id queue id of the upstream queue
/// \param[in] seq_id consumed offset of the downstream queue
void NotifyConsumedItem(const ObjectID &queue_id, uint64_t seq_id);
Status DeleteQueue(const ObjectID &queue_id);
void NotifyConsumedItem(uint64_t seq_id);
Status DeleteQueue();

private:
std::shared_ptr<ray::streaming::DownstreamService> downstream_service_;
ObjectID queue_id_;
ActorID actor_id_;
std::shared_ptr<ReaderQueue> queue_;
};

} // namespace streaming
Expand Down

0 comments on commit e03622c

Please sign in to comment.