Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
悟言 committed Dec 2, 2019
1 parent 44677de commit 609da4a
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 29 deletions.
13 changes: 8 additions & 5 deletions streaming/src/queue/message.h
Expand Up @@ -16,9 +16,9 @@ namespace streaming {
class Message {
public:
/// Construct a Message instance.
/// \param[in] actor_id ActorID of message sender
/// \param[in] peer_actor_id ActorID of message receiver
/// \param[in] queue_id queue id to identify which queue the message sent to
/// \param[in] actor_id ActorID of message sender.
/// \param[in] peer_actor_id ActorID of message receiver.
/// \param[in] queue_id queue id to identify which queue the message sent to.
/// \param[in] buffer an optional param, a chunk of data to send.
Message(const ActorID &actor_id, const ActorID &peer_actor_id, const ObjectID &queue_id,
std::shared_ptr<LocalMemoryBuffer> buffer = nullptr)
Expand All @@ -33,12 +33,15 @@ class Message {
ObjectID QueueId() { return queue_id_; }
std::shared_ptr<LocalMemoryBuffer> Buffer() { return buffer_; }

/// Pack all meta data and data to a LocalMemoryBuffer, which can be send through direct actor call
/// Serialize all meta data and data to a LocalMemoryBuffer, which can be send through direct actor call.
/// \return serialized buffer .
std::unique_ptr<LocalMemoryBuffer> ToBytes();

/// Get message type.
/// \return message type.
virtual queue::protobuf::StreamingQueueMessageType Type() = 0;

/// All subclass should implement `ToProtobuf` to serialize its own protobuf data
/// All subclass should implement `ToProtobuf` to serialize its own protobuf data.
virtual void ToProtobuf(std::string *output) = 0;
protected:
ActorID actor_id_;
Expand Down
4 changes: 2 additions & 2 deletions streaming/src/queue/queue.cc
Expand Up @@ -22,7 +22,7 @@ QueueItem Queue::FrontProcessed() {
STREAMING_CHECK(buffer_queue_.size() != 0) << "WriterQueue Pop fail";

if (watershed_iter_ == buffer_queue_.begin()) {
return NullQueueItem();
return InvalidQueueItem();
}

QueueItem item = buffer_queue_.front();
Expand All @@ -34,7 +34,7 @@ QueueItem Queue::PopProcessed() {
STREAMING_CHECK(buffer_queue_.size() != 0) << "WriterQueue Pop fail";

if (watershed_iter_ == buffer_queue_.begin()) {
return NullQueueItem();
return InvalidQueueItem();
}

QueueItem item = buffer_queue_.front();
Expand Down
5 changes: 3 additions & 2 deletions streaming/src/queue/queue.h
Expand Up @@ -33,14 +33,15 @@ class Queue {
/// \param[in] transport transport to send items to peer.
Queue(ObjectID queue_id, uint64_t size, std::shared_ptr<Transport> transport)
: queue_id_(queue_id), max_data_size_(size), data_size_(0), data_size_sent_(0) {
buffer_queue_.push_back(NullQueueItem());
buffer_queue_.push_back(InvalidQueueItem());
watershed_iter_ = buffer_queue_.begin();
}

virtual ~Queue() {}

/// Push item into queue, return false is queue is full.
/// Push an item into the queue.
/// \param[in] item the QueueItem object to be send to peer.
/// \return false if the queue is full.
bool Push(QueueItem item);

/// Get the front of item which in processed state.
Expand Down
1 change: 1 addition & 0 deletions streaming/src/queue/queue_client.h
Expand Up @@ -27,6 +27,7 @@ class ReaderClient {
/// Post buffer to downstream queue service, asynchronously.
void OnReaderMessage(std::shared_ptr<LocalMemoryBuffer> buffer);
/// Post buffer to downstream queue service, synchronously.
/// \return handle result.
std::shared_ptr<LocalMemoryBuffer> OnReaderMessageSync(std::shared_ptr<LocalMemoryBuffer> buffer);

private:
Expand Down
15 changes: 7 additions & 8 deletions streaming/src/queue/queue_item.h
Expand Up @@ -73,7 +73,12 @@ class QueueItem {

uint64_t SeqId() { return seq_id_; }
bool IsRaw() { return raw_; }
uint64_t TimeStamp() { return timestamp_; }
size_t DataSize() { return buffer_->Size(); }
std::shared_ptr<LocalMemoryBuffer> Buffer() { return buffer_; }

/// Get max message id in this item.
/// \return max message id.
uint64_t MaxMsgId() {
if (raw_) {
return 0;
Expand All @@ -82,12 +87,6 @@ class QueueItem {
return message_bundle->GetLastMessageId();
}

uint64_t TimeStamp() { return timestamp_; }

size_t DataSize() { return buffer_->Size(); }

std::shared_ptr<LocalMemoryBuffer> Buffer() { return buffer_; }

protected:
uint64_t seq_id_;
uint64_t timestamp_;
Expand All @@ -96,9 +95,9 @@ class QueueItem {
std::shared_ptr<LocalMemoryBuffer> buffer_;
};

class NullQueueItem : public QueueItem {
class InvalidQueueItem : public QueueItem {
public:
NullQueueItem() : QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0) {}
InvalidQueueItem() : QueueItem(QUEUE_INVALID_SEQ_ID, data_, 1, 0) {}
private:
uint8_t data_[1];
};
Expand Down
15 changes: 3 additions & 12 deletions streaming/src/queue/queue_service.h
Expand Up @@ -41,10 +41,12 @@ class QueueService {

/// Dispatch message buffer to asio service synchronously, and wait for handle result.
/// \param[in] buffer serialized message received from peer actor.
/// \return handle result.
std::shared_ptr<LocalMemoryBuffer> DispatchMessageSync(std::shared_ptr<LocalMemoryBuffer> buffer);

/// Get transport to a peer actor specified by actor_id.
/// \param[in] actor_id actor id of peer actor
/// \return transport
std::shared_ptr<Transport> GetOutTransport(const ObjectID &actor_id);

/// The actual function where message being dispatched, called by DispatchMessageAsync and DispatchMessageSync.
Expand All @@ -62,6 +64,7 @@ class QueueService {
void SetPeerActorID(const ObjectID &queue_id, const ActorID &actor_id);

/// Obtain the actor id of the peer actor specified by queue_id.
/// \return actor id
ActorID GetPeerActorID(const ObjectID &queue_id);

/// Release all queues in current queue service.
Expand Down Expand Up @@ -161,17 +164,6 @@ class DownstreamService : public QueueService {
virtual void DispatchMessageInternal(
std::shared_ptr<LocalMemoryBuffer> buffer,
std::function<void(std::shared_ptr<LocalMemoryBuffer>)> callback);
/// Used to support synchronize check queue request
using SendMsgCallback = std::function<void(bool success)>;
struct CheckQueueRequest {
CheckQueueRequest() {}
CheckQueueRequest(const ActorID &actor_id, const ObjectID &queue_id,
SendMsgCallback callback = nullptr)
: actor_id_(actor_id), queue_id_(queue_id), callback_(callback) {}
ActorID actor_id_;
ObjectID queue_id_;
SendMsgCallback callback_;
};

static std::shared_ptr<DownstreamService> CreateService(CoreWorker* core_worker, const ActorID &actor_id);
static std::shared_ptr<DownstreamService> GetService();
Expand All @@ -180,7 +172,6 @@ class DownstreamService : public QueueService {

private:
std::unordered_map<ObjectID, std::shared_ptr<streaming::ReaderQueue>> downstream_queues_;
std::unordered_map<ObjectID, CheckQueueRequest> check_queue_requests_;
static std::shared_ptr<DownstreamService> downstream_service_;
};

Expand Down
2 changes: 2 additions & 0 deletions streaming/src/queue/transport.h
Expand Up @@ -31,6 +31,7 @@ class Transport {
/// \param[in] buffer buffer to be sent.
/// \param[in] function the function descriptor of peer's function.
/// \param[in] timeout_ms max time to wait for result.
/// \return peer function's result.
virtual std::shared_ptr<LocalMemoryBuffer> SendForResult(
std::shared_ptr<LocalMemoryBuffer> buffer, RayFunction &function, int64_t timeout_ms);
/// Send buffer and get result with retry.
Expand All @@ -39,6 +40,7 @@ class Transport {
/// \param[in] function the function descriptor of peer's function.
/// \param[in] max retry count
/// \param[in] timeout_ms max time to wait for result.
/// \return peer function's result.
std::shared_ptr<LocalMemoryBuffer> SendForResultWithRetry(
std::unique_ptr<LocalMemoryBuffer> buffer, RayFunction &function, int retry_cnt, int64_t timeout_ms);

Expand Down

0 comments on commit 609da4a

Please sign in to comment.