Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijunfu committed Jul 9, 2019
1 parent be05ed6 commit 878cae4
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 27 deletions.
5 changes: 3 additions & 2 deletions src/ray/core_worker/mock_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ namespace ray {
class MockWorker {
public:
MockWorker(const std::string &store_socket, const std::string &raylet_socket)
: worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket,
JobID::FromRandom(), std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4)) {}
: worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket,
JobID::FromRandom(),
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4)) {}

void Run() {
// Start executing tasks.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/task_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
std::placeholders::_1);
task_receivers_.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskReceiver>(
new CoreWorkerRayletTaskReceiver(raylet_client, main_service_, worker_server_, func)));
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
raylet_client, main_service_, worker_server_, func)));

// Start RPC server after all the task receivers are properly initialized.
worker_server_.Run();
Expand Down
8 changes: 4 additions & 4 deletions src/ray/core_worker/transport/raylet_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) {
}

CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(
std::unique_ptr<RayletClient> &raylet_client,
boost::asio::io_service &io_service, rpc::GrpcServer &server,
const TaskHandler &task_handler)
std::unique_ptr<RayletClient> &raylet_client, boost::asio::io_service &io_service,
rpc::GrpcServer &server, const TaskHandler &task_handler)
: raylet_client_(raylet_client),
task_service_(io_service, *this), task_handler_(task_handler) {
task_service_(io_service, *this),
task_handler_(task_handler) {
server.RegisterService(task_service_);
}

Expand Down
9 changes: 5 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,8 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto client_id = from_flatbuf<ClientID>(*message->worker_id());
client->SetClientID(client_id);
Language language = static_cast<Language>(message->language());
auto worker = std::make_shared<Worker>(message->worker_pid(), language,
message->port(), client, client_call_manager_);
auto worker = std::make_shared<Worker>(message->worker_pid(), language, message->port(),
client, client_call_manager_);
if (message->is_worker()) {
// Register the new worker.
bool use_push_task = worker->UsePush();
Expand Down Expand Up @@ -1781,8 +1781,9 @@ bool NodeManager::AssignTask(const Task &task) {
// expense of calling `MoveTask` for each of the assigned tasks.
// TODO(zhijunfu): after all workers are fully migrated to push mode, the
// `post` below and swap queue can be removed.
io_service_.post(
[this, status, worker, task_id]() { FinishAssignTask(task_id, *worker, status.ok()); });
io_service_.post([this, status, worker, task_id]() {
FinishAssignTask(task_id, *worker, status.ok());
});
} else {
FinishAssignTask(task_id, *worker, status.ok());
}
Expand Down
13 changes: 6 additions & 7 deletions src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,10 @@ void Worker::AcquireTaskCpuResources(const ResourceIdSet &cpu_resources) {
task_resource_ids_.Release(cpu_resources);
}

bool Worker::UsePush() const {
return rpc_client_ != nullptr;
}
bool Worker::UsePush() const { return rpc_client_ != nullptr; }

void Worker::AssignTask(const Task &task, const ResourceIdSet &resource_id_set,
const std::function<void(Status)> finish_assign_callback) {
const std::function<void(Status)> finish_assign_callback) {
const TaskSpecification &spec = task.GetTaskSpecification();
if (rpc_client_ != nullptr) {
// Use push mode.
Expand All @@ -140,13 +138,14 @@ void Worker::AssignTask(const Task &task, const ResourceIdSet &resource_id_set,
flatbuffers::FlatBufferBuilder fbb;
auto resource_id_set_flatbuf = resource_id_set.ToFlatbuf(fbb);

auto message = protocol::CreateGetTaskReply(
fbb, fbb.CreateString(spec.Serialize()), fbb.CreateVector(resource_id_set_flatbuf));
auto message =
protocol::CreateGetTaskReply(fbb, fbb.CreateString(spec.Serialize()),
fbb.CreateVector(resource_id_set_flatbuf));
fbb.Finish(message);
Connection()->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), finish_assign_callback);
}
}
}

} // namespace raylet
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "ray/raylet/task.h"
#include "ray/rpc/worker/worker_client.h"


namespace ray {

namespace raylet {
Expand Down Expand Up @@ -58,10 +57,10 @@ class Worker {
void ResetTaskResourceIds();
ResourceIdSet ReleaseTaskCpuResources();
void AcquireTaskCpuResources(const ResourceIdSet &cpu_resources);

bool UsePush() const;
void AssignTask(const Task &task, const ResourceIdSet &resource_id_set,
const std::function<void(Status)> finish_assign_callback);
const std::function<void(Status)> finish_assign_callback);

private:
/// The worker's PID.
Expand Down Expand Up @@ -91,8 +90,9 @@ class Worker {
// of a task.
ResourceIdSet task_resource_ids_;
std::unordered_set<TaskID> blocked_task_ids_;
/// The `ClientCallManager` object that is shared by `WorkerTaskClient` from all workers.
rpc::ClientCallManager &client_call_manager_;
/// The `ClientCallManager` object that is shared by `WorkerTaskClient` from all
/// workers.
rpc::ClientCallManager &client_call_manager_;
/// The rpc client to send tasks to this worker.
std::unique_ptr<rpc::WorkerTaskClient> rpc_client_;
};
Expand Down
10 changes: 7 additions & 3 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ class WorkerPoolMock : public WorkerPool {

class WorkerPoolTest : public ::testing::Test {
public:
WorkerPoolTest() : worker_pool_(), io_service_(), error_message_type_(1),
client_call_manager_(io_service) {}
WorkerPoolTest()
: worker_pool_(),
io_service_(),
error_message_type_(1),
client_call_manager_(io_service) {}

std::shared_ptr<Worker> CreateWorker(pid_t pid,
const Language &language = Language::PYTHON) {
Expand All @@ -86,7 +89,8 @@ class WorkerPoolTest : public ::testing::Test {
auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket),
"worker", {}, error_message_type_);
return std::shared_ptr<Worker>(new Worker(pid, language, -1, client, client_call_manager_));
return std::shared_ptr<Worker>(
new Worker(pid, language, -1, client, client_call_manager_));
}

void SetWorkerCommands(const WorkerCommandMap &worker_commands) {
Expand Down

0 comments on commit 878cae4

Please sign in to comment.