New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GCS]GCS adapts to job table pub sub #8145
[GCS]GCS adapts to job table pub sub #8145
Conversation
Can one of the admins verify this patch? |
Test PASSed. |
@@ -36,6 +38,7 @@ class DefaultJobInfoHandler : public rpc::JobInfoHandler { | |||
|
|||
private: | |||
gcs::RedisGcsClient &gcs_client_; | |||
const std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (!status.ok()) { | ||
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id | ||
<< ", driver pid = " << request.data().driver_pid(); | ||
} else { | ||
RAY_LOG(DEBUG) << "Finished adding job, job id = " << job_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to use INFO
here as well as the very beginning line of this function as this RPC is very low frequency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} else { | ||
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Binary(), | ||
job_table_data->SerializeAsString(), nullptr)); | ||
RAY_LOG(DEBUG) << "Finished marking job state, job id = " << job_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use INFO here as well as the beginning line of this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if (!status.ok()) { | ||
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; | ||
} else { | ||
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Binary(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this lead to pub twice because that the AsyncMarkFinished
will publish too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also change the job subscribe code which only subscribe gcs_pub_sub_ publish messages.
src/ray/gcs/redis_gcs_client.h
Outdated
@@ -81,6 +81,8 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { | |||
return redis_client_->GetPrimaryContext(); | |||
} | |||
|
|||
std::shared_ptr<RedisClient> GetRedisClient() { return redis_client_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr GetRedisClient() const { return redis_client_; }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Test PASSed. |
Why are these changes needed?
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.