Skip to content

Commit

Permalink
Update "master" to "dispatch"/"dispatcher" in tf.data service termino…
Browse files Browse the repository at this point in the history
…logy.

Dispatcher is more descriptive and follows the guidance in https://developers.google.com/style/word-list#master

PiperOrigin-RevId: 321613785
Change-Id: Iaa576d35f0581e21278101f8b31201ba737a6865
  • Loading branch information
aaudiber committed Jul 16, 2020
1 parent 0aa1d61 commit 6b9a9d9
Show file tree
Hide file tree
Showing 30 changed files with 367 additions and 353 deletions.
44 changes: 22 additions & 22 deletions tensorflow/core/data/service/BUILD
Expand Up @@ -28,8 +28,8 @@ tf_proto_library(
)

tf_proto_library(
name = "master_proto",
srcs = ["master.proto"],
name = "dispatcher_proto",
srcs = ["dispatcher.proto"],
has_services = 1,
cc_api_version = 2,
protodeps = tf_additional_all_protos() + [
Expand All @@ -49,17 +49,17 @@ tf_proto_library(
)

cc_library(
name = "master_impl",
srcs = ["master_impl.cc"],
name = "dispatcher_impl",
srcs = ["dispatcher_impl.cc"],
hdrs = [
"master_impl.h",
"dispatcher_impl.h",
],
deps = [
":common_proto_cc",
":credentials_factory",
":data_service",
":dispatcher_proto_cc",
":grpc_util",
":master_proto_cc",
":worker_cc_grpc_proto",
":worker_proto_cc",
"//tensorflow/c:c_api_internal",
Expand All @@ -86,9 +86,9 @@ cc_library(
deps = [
":common_proto_cc",
":credentials_factory",
":dispatcher_cc_grpc_proto",
":dispatcher_proto_cc",
":grpc_util",
":master_cc_grpc_proto",
":master_proto_cc",
":worker_proto_cc",
"//tensorflow/c:c_api_internal",
"//tensorflow/c:tf_status_helper",
Expand Down Expand Up @@ -207,12 +207,12 @@ tf_cc_test(
)

cc_library(
name = "grpc_master_impl",
srcs = ["grpc_master_impl.cc"],
hdrs = ["grpc_master_impl.h"],
name = "grpc_dispatcher_impl",
srcs = ["grpc_dispatcher_impl.cc"],
hdrs = ["grpc_dispatcher_impl.h"],
deps = [
":master_cc_grpc_proto",
":master_impl",
":dispatcher_cc_grpc_proto",
":dispatcher_impl",
"//tensorflow/core/distributed_runtime/rpc:grpc_util",
tf_grpc_cc_dependency(),
],
Expand Down Expand Up @@ -250,7 +250,7 @@ cc_library(
],
deps = [
":credentials_factory",
":grpc_master_impl",
":grpc_dispatcher_impl",
":grpc_util",
":grpc_worker_impl",
"//tensorflow/core:lib",
Expand All @@ -268,9 +268,9 @@ cc_library(
],
deps = [
":credentials_factory",
":dispatcher_cc_grpc_proto",
":dispatcher_proto_cc",
":grpc_util",
":master_cc_grpc_proto",
":master_proto_cc",
":worker_cc_grpc_proto",
":worker_proto_cc",
"//tensorflow/core:framework",
Expand All @@ -287,12 +287,12 @@ tf_cc_test(
tags = ["no_windows"],
deps = [
":data_service",
":grpc_master_impl",
":dispatcher_cc_grpc_proto",
":dispatcher_proto_cc",
":grpc_dispatcher_impl",
":grpc_util",
":grpc_worker_impl",
":local_credentials_factory",
":master_cc_grpc_proto",
":master_proto_cc",
":server_lib",
":test_cluster",
":test_util",
Expand All @@ -309,11 +309,11 @@ tf_cc_test(
)

cc_grpc_library(
name = "master_cc_grpc_proto",
srcs = [":master_proto"],
name = "dispatcher_cc_grpc_proto",
srcs = [":dispatcher_proto"],
generate_mocks = True,
grpc_only = True,
deps = [":master_proto_cc"],
deps = [":dispatcher_proto_cc"],
)

cc_grpc_library(
Expand Down
40 changes: 20 additions & 20 deletions tensorflow/core/data/service/data_service.cc
Expand Up @@ -18,8 +18,8 @@ limitations under the License.
#include "grpcpp/create_channel.h"
#include "grpcpp/security/credentials.h"
#include "tensorflow/core/data/service/credentials_factory.h"
#include "tensorflow/core/data/service/dispatcher.grpc.pb.h"
#include "tensorflow/core/data/service/grpc_util.h"
#include "tensorflow/core/data/service/master.grpc.pb.h"
#include "tensorflow/core/data/service/worker.grpc.pb.h"
#include "tensorflow/core/framework/dataset.h"

Expand Down Expand Up @@ -54,8 +54,8 @@ std::string ProcessingModeToString(ProcessingMode mode) {
}
}

Status DataServiceMasterClient::RegisterDataset(GraphDef dataset,
int64* dataset_id) {
Status DataServiceDispatcherClient::RegisterDataset(GraphDef dataset,
int64* dataset_id) {
TF_RETURN_IF_ERROR(EnsureInitialized());
GetOrRegisterDatasetRequest req;
*req.mutable_dataset()->mutable_graph() = dataset;
Expand All @@ -69,9 +69,9 @@ Status DataServiceMasterClient::RegisterDataset(GraphDef dataset,
return Status::OK();
}

Status DataServiceMasterClient::CreateJob(int64 dataset_id,
ProcessingMode processing_mode,
int64* job_id) {
Status DataServiceDispatcherClient::CreateJob(int64 dataset_id,
ProcessingMode processing_mode,
int64* job_id) {
TF_RETURN_IF_ERROR(EnsureInitialized());
CreateJobRequest req;
req.set_dataset_id(dataset_id);
Expand All @@ -88,11 +88,9 @@ Status DataServiceMasterClient::CreateJob(int64 dataset_id,
return Status::OK();
}

Status DataServiceMasterClient::GetOrCreateJob(int64 dataset_id,
ProcessingMode processing_mode,
const std::string& job_name,
int job_name_index,
int64* job_id) {
Status DataServiceDispatcherClient::GetOrCreateJob(
int64 dataset_id, ProcessingMode processing_mode,
const std::string& job_name, int job_name_index, int64* job_id) {
TF_RETURN_IF_ERROR(EnsureInitialized());
GetOrCreateJobRequest req;
req.set_dataset_id(dataset_id);
Expand All @@ -112,9 +110,9 @@ Status DataServiceMasterClient::GetOrCreateJob(int64 dataset_id,
return Status::OK();
}

Status DataServiceMasterClient::GetTasks(int64 job_id,
std::vector<TaskInfo>* tasks,
bool* job_finished) {
Status DataServiceDispatcherClient::GetTasks(int64 job_id,
std::vector<TaskInfo>* tasks,
bool* job_finished) {
TF_RETURN_IF_ERROR(EnsureInitialized());
GetTasksRequest req;
req.set_job_id(job_id);
Expand All @@ -132,7 +130,8 @@ Status DataServiceMasterClient::GetTasks(int64 job_id,
return Status::OK();
}

Status DataServiceMasterClient::GetWorkers(std::vector<WorkerInfo>* workers) {
Status DataServiceDispatcherClient::GetWorkers(
std::vector<WorkerInfo>* workers) {
TF_RETURN_IF_ERROR(EnsureInitialized());
GetWorkersRequest req;
GetWorkersResponse resp;
Expand All @@ -148,12 +147,12 @@ Status DataServiceMasterClient::GetWorkers(std::vector<WorkerInfo>* workers) {
return Status::OK();
}

Status DataServiceMasterClient::EnsureInitialized() {
Status DataServiceDispatcherClient::EnsureInitialized() {
std::shared_ptr<grpc::ChannelCredentials> credentials;
TF_RETURN_IF_ERROR(
CredentialsFactory::CreateClientCredentials(protocol_, &credentials));
auto channel = grpc::CreateChannel(address_, credentials);
stub_ = MasterService::NewStub(channel);
stub_ = DispatcherService::NewStub(channel);
return Status::OK();
}

Expand Down Expand Up @@ -187,10 +186,11 @@ Status DataServiceWorkerClient::EnsureInitialized() {
return Status::OK();
}

Status CreateDataServiceMasterClient(
Status CreateDataServiceDispatcherClient(
const std::string& address, const std::string& protocol,
std::unique_ptr<DataServiceMasterClient>* out) {
auto client = absl::make_unique<DataServiceMasterClient>(address, protocol);
std::unique_ptr<DataServiceDispatcherClient>* out) {
auto client =
absl::make_unique<DataServiceDispatcherClient>(address, protocol);
TF_RETURN_IF_ERROR(client->Initialize());
*out = std::move(client);
return Status::OK();
Expand Down
22 changes: 11 additions & 11 deletions tensorflow/core/data/service/data_service.h
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
#ifndef TENSORFLOW_CORE_DATA_SERVICE_DATA_SERVICE_H_
#define TENSORFLOW_CORE_DATA_SERVICE_DATA_SERVICE_H_

#include "tensorflow/core/data/service/master.grpc.pb.h"
#include "tensorflow/core/data/service/dispatcher.grpc.pb.h"
#include "tensorflow/core/data/service/worker.grpc.pb.h"
#include "tensorflow/core/framework/dataset.h"
#include "tensorflow/core/framework/op_kernel.h"
Expand Down Expand Up @@ -67,11 +67,11 @@ class DataServiceClientBase {
const std::string protocol_;
};

// Client for communicating with the tf.data service master.
class DataServiceMasterClient : public DataServiceClientBase {
// Client for communicating with the tf.data service dispatcher.
class DataServiceDispatcherClient : public DataServiceClientBase {
public:
DataServiceMasterClient(const std::string& address,
const std::string& protocol)
DataServiceDispatcherClient(const std::string& address,
const std::string& protocol)
: DataServiceClientBase(address, protocol) {}

// Registers a dataset with the tf.data service, and stores the generated
Expand All @@ -90,21 +90,21 @@ class DataServiceMasterClient : public DataServiceClientBase {
const std::string& job_name, int job_name_index,
int64* job_id);

// Queries the master for the tasks associated with the specified job.
// Queries the dispatcher for the tasks associated with the specified job.
// The tasks will be stored in *tasks, and whether the job is finished will
// be stored in `*job_finished`.
Status GetTasks(int64 job_id, std::vector<TaskInfo>* tasks,
bool* job_finished);

// Queries the master for its registered workers. The worker info will be
// Queries the dispatcher for its registered workers. The worker info will be
// stored in `*workers`.
Status GetWorkers(std::vector<WorkerInfo>* workers);

protected:
Status EnsureInitialized() override;

private:
std::unique_ptr<MasterService::Stub> stub_;
std::unique_ptr<DispatcherService::Stub> stub_;
};

// Client for communicating with the tf.data service worker.
Expand All @@ -127,10 +127,10 @@ class DataServiceWorkerClient : public DataServiceClientBase {
std::unique_ptr<WorkerService::Stub> stub_;
};

// Creates and initializes a new tf.data service master client.
Status CreateDataServiceMasterClient(
// Creates and initializes a new tf.data service dispatcher client.
Status CreateDataServiceDispatcherClient(
const std::string& address, const std::string& protocol,
std::unique_ptr<DataServiceMasterClient>* out);
std::unique_ptr<DataServiceDispatcherClient>* out);

// Creates and initializes a new tf.data service worker client.
Status CreateDataServiceWorkerClient(
Expand Down
9 changes: 5 additions & 4 deletions tensorflow/core/data/service/data_service_test.cc
Expand Up @@ -19,9 +19,9 @@ limitations under the License.
#include "grpcpp/security/credentials.h"
#include "absl/strings/str_split.h"
#include "tensorflow/core/data/compression_utils.h"
#include "tensorflow/core/data/service/dispatcher.grpc.pb.h"
#include "tensorflow/core/data/service/dispatcher.pb.h"
#include "tensorflow/core/data/service/grpc_util.h"
#include "tensorflow/core/data/service/master.grpc.pb.h"
#include "tensorflow/core/data/service/master.pb.h"
#include "tensorflow/core/data/service/server_lib.h"
#include "tensorflow/core/data/service/test_cluster.h"
#include "tensorflow/core/data/service/test_util.h"
Expand Down Expand Up @@ -66,9 +66,10 @@ TEST(DataService, ProcessingModeToString) {
TEST(DataService, GetWorkers) {
TestCluster cluster(1);
TF_ASSERT_OK(cluster.Initialize());
DataServiceMasterClient master(cluster.MasterAddress(), kProtocol);
DataServiceDispatcherClient dispatcher(cluster.DispatcherAddress(),
kProtocol);
std::vector<WorkerInfo> workers;
TF_EXPECT_OK(master.GetWorkers(&workers));
TF_EXPECT_OK(dispatcher.GetWorkers(&workers));
EXPECT_EQ(1, workers.size());
}

Expand Down
Expand Up @@ -110,11 +110,11 @@ message GetWorkersResponse {
repeated WorkerInfo workers = 1;
}

service MasterService {
// Registers a worker with the master.
service DispatcherService {
// Registers a worker with the dispatcher.
rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerResponse);

// Updates the master with information about the worker's state.
// Updates the dispatcher with information about the worker's state.
rpc WorkerUpdate(WorkerUpdateRequest) returns (WorkerUpdateResponse);

// Registers a dataset with the server, or returns its id if it is already
Expand All @@ -134,6 +134,6 @@ service MasterService {
// Reports a list of all tasks for a job.
rpc GetTasks(GetTasksRequest) returns (GetTasksResponse);

// Reports a list of all workers registered with the master.
// Reports a list of all workers registered with the dispatcher.
rpc GetWorkers(GetWorkersRequest) returns (GetWorkersResponse);
}

0 comments on commit 6b9a9d9

Please sign in to comment.