Skip to content

Commit

Permalink
Move top level RayletClient to ray::raylet::RayletClient (#6404)
Browse files Browse the repository at this point in the history
  • Loading branch information
walterddr authored and pcmoritz committed Dec 10, 2019
1 parent 8c34e83 commit c1d4ab8
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 119 deletions.
2 changes: 1 addition & 1 deletion python/ray/includes/libraylet.pxd
Expand Up @@ -41,7 +41,7 @@ ctypedef pair[c_vector[CObjectID], c_vector[CObjectID]] WaitResultPair


cdef extern from "ray/raylet/raylet_client.h" nogil:
cdef cppclass CRayletClient "RayletClient":
cdef cppclass CRayletClient "ray::raylet::RayletClient":
CRayletClient(const c_string &raylet_socket,
const CWorkerID &worker_id,
c_bool is_worker, const CJobID &job_id,
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/core_worker.cc
Expand Up @@ -128,7 +128,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
auto grpc_client = rpc::NodeManagerWorkerClient::make(
node_ip_address, node_manager_port, *client_call_manager_);
ClientID local_raylet_id;
local_raylet_client_ = std::shared_ptr<RayletClient>(new RayletClient(
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
std::move(grpc_client), raylet_socket,
WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()),
(worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentJobID(),
Expand Down Expand Up @@ -210,8 +210,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
[this](const rpc::Address &address) {
auto grpc_client = rpc::NodeManagerWorkerClient::make(
address.ip_address(), address.port(), *client_call_manager_);
return std::shared_ptr<RayletClient>(
new RayletClient(std::move(grpc_client)));
return std::shared_ptr<raylet::RayletClient>(
new raylet::RayletClient(std::move(grpc_client)));
},
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds()));
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.h
Expand Up @@ -89,7 +89,7 @@ class CoreWorker {

WorkerContext &GetWorkerContext() { return worker_context_; }

RayletClient &GetRayletClient() { return *local_raylet_client_; }
raylet::RayletClient &GetRayletClient() { return *local_raylet_client_; }

const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); }

Expand Down Expand Up @@ -525,7 +525,7 @@ class CoreWorker {
// shared_ptr for direct calls because we can lease multiple workers through
// one client, and we need to keep the connection alive until we return all
// of the workers.
std::shared_ptr<RayletClient> local_raylet_client_;
std::shared_ptr<raylet::RayletClient> local_raylet_client_;

// Thread that runs a boost::asio service to process IO events.
std::thread io_thread_;
Expand Down
Expand Up @@ -109,7 +109,7 @@ std::shared_ptr<RayObject> GetRequest::Get(const ObjectID &object_id) const {
CoreWorkerMemoryStore::CoreWorkerMemoryStore(
std::function<void(const RayObject &, const ObjectID &)> store_in_plasma,
std::shared_ptr<ReferenceCounter> counter,
std::shared_ptr<RayletClient> raylet_client)
std::shared_ptr<raylet::RayletClient> raylet_client)
: store_in_plasma_(store_in_plasma),
ref_counter_(counter),
raylet_client_(raylet_client) {}
Expand Down
Expand Up @@ -29,7 +29,7 @@ class CoreWorkerMemoryStore {
CoreWorkerMemoryStore(
std::function<void(const RayObject &, const ObjectID &)> store_in_plasma = nullptr,
std::shared_ptr<ReferenceCounter> counter = nullptr,
std::shared_ptr<RayletClient> raylet_client = nullptr);
std::shared_ptr<raylet::RayletClient> raylet_client = nullptr);
~CoreWorkerMemoryStore(){};

/// Put an object with specified ID into object store.
Expand Down Expand Up @@ -124,7 +124,7 @@ class CoreWorkerMemoryStore {
std::shared_ptr<ReferenceCounter> ref_counter_ = nullptr;

// If set, this will be used to notify worker blocked / unblocked on get calls.
std::shared_ptr<RayletClient> raylet_client_ = nullptr;
std::shared_ptr<raylet::RayletClient> raylet_client_ = nullptr;

/// Protects the data structures below.
absl::Mutex mu_;
Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Expand Up @@ -7,7 +7,8 @@
namespace ray {

CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
const std::string &store_socket, const std::shared_ptr<RayletClient> raylet_client,
const std::string &store_socket,
const std::shared_ptr<raylet::RayletClient> raylet_client,
std::function<Status()> check_signals)
: raylet_client_(raylet_client) {
check_signals_ = check_signals;
Expand Down Expand Up @@ -128,7 +129,7 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
return Status::OK();
}

Status UnblockIfNeeded(const std::shared_ptr<RayletClient> &client,
Status UnblockIfNeeded(const std::shared_ptr<raylet::RayletClient> &client,
const WorkerContext &ctx) {
if (ctx.CurrentTaskIsDirectCall()) {
if (ctx.ShouldReleaseResourcesOnBlockingCalls()) {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/store_provider/plasma_store_provider.h
Expand Up @@ -20,7 +20,7 @@ namespace ray {
class CoreWorkerPlasmaStoreProvider {
public:
CoreWorkerPlasmaStoreProvider(const std::string &store_socket,
const std::shared_ptr<RayletClient> raylet_client,
const std::shared_ptr<raylet::RayletClient> raylet_client,
std::function<Status()> check_signals);

~CoreWorkerPlasmaStoreProvider();
Expand Down Expand Up @@ -83,7 +83,7 @@ class CoreWorkerPlasmaStoreProvider {
static void WarnIfAttemptedTooManyTimes(int num_attempts,
const absl::flat_hash_set<ObjectID> &remaining);

const std::shared_ptr<RayletClient> raylet_client_;
const std::shared_ptr<raylet::RayletClient> raylet_client_;
plasma::PlasmaClient store_client_;
std::mutex store_client_mutex_;
std::function<Status()> check_signals_;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Expand Up @@ -151,7 +151,7 @@ CoreWorkerDirectTaskReceiver::CoreWorkerDirectTaskReceiver(
exit_handler_(exit_handler),
task_main_io_service_(main_io_service) {}

void CoreWorkerDirectTaskReceiver::Init(RayletClient &raylet_client) {
void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client) {
waiter_.reset(new DependencyWaiterImpl(raylet_client));
}

Expand Down
7 changes: 4 additions & 3 deletions src/ray/core_worker/transport/direct_actor_transport.h
Expand Up @@ -167,7 +167,8 @@ class DependencyWaiter {

class DependencyWaiterImpl : public DependencyWaiter {
public:
DependencyWaiterImpl(RayletClient &raylet_client) : raylet_client_(raylet_client) {}
DependencyWaiterImpl(raylet::RayletClient &raylet_client)
: raylet_client_(raylet_client) {}

void Wait(const std::vector<ObjectID> &dependencies,
std::function<void()> on_dependencies_available) override {
Expand All @@ -187,7 +188,7 @@ class DependencyWaiterImpl : public DependencyWaiter {
private:
int64_t next_request_id_ = 0;
std::unordered_map<int64_t, std::function<void()>> requests_;
RayletClient &raylet_client_;
raylet::RayletClient &raylet_client_;
};

/// Wraps a thread-pool to block posts until the pool has free slots. This is used
Expand Down Expand Up @@ -436,7 +437,7 @@ class CoreWorkerDirectTaskReceiver {
}

/// Initialize this receiver. This must be called prior to use.
void Init(RayletClient &client);
void Init(raylet::RayletClient &client);

/// Handle a `PushTask` request.
///
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/raylet_transport.cc
Expand Up @@ -6,7 +6,7 @@
namespace ray {

CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(
const WorkerID &worker_id, std::shared_ptr<RayletClient> &raylet_client,
const WorkerID &worker_id, std::shared_ptr<raylet::RayletClient> &raylet_client,
const TaskHandler &task_handler, const std::function<void()> &exit_handler)
: worker_id_(worker_id),
raylet_client_(raylet_client),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/transport/raylet_transport.h
Expand Up @@ -17,7 +17,7 @@ class CoreWorkerRayletTaskReceiver {
std::vector<std::shared_ptr<RayObject>> *return_objects)>;

CoreWorkerRayletTaskReceiver(const WorkerID &worker_id,
std::shared_ptr<RayletClient> &raylet_client,
std::shared_ptr<raylet::RayletClient> &raylet_client,
const TaskHandler &task_handler,
const std::function<void()> &exit_handler);

Expand All @@ -37,7 +37,7 @@ class CoreWorkerRayletTaskReceiver {
WorkerID worker_id_;
/// Reference to the core worker's raylet client. This is a pointer ref so that it
/// can be initialized by core worker after this class is constructed.
std::shared_ptr<RayletClient> &raylet_client_;
std::shared_ptr<raylet::RayletClient> &raylet_client_;
/// The callback function to process a task.
TaskHandler task_handler_;
/// The callback function to exit the worker.
Expand Down

0 comments on commit c1d4ab8

Please sign in to comment.