Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Director Admin Client #674

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 53 additions & 3 deletions openfl/protocols/base.proto
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright (C) 2020-2021 Intel Corporation
// Copyright (C) 2020-2022 Intel Corporation
// Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

syntax = "proto3";

// This is only needed to initializing the model weights on the aggregator
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

// This is only needed to initialize the model weights on the aggregator
message ModelProto {
repeated NamedTensor tensors = 1;
}
Expand Down Expand Up @@ -64,4 +67,51 @@ message ExperimentDescription {
DownloadStatuses download_statuses = 6;
repeated CollaboratorDescription collaborators = 7;
repeated TaskDescription tasks = 8;
}
}

message ExperimentListItem {
string name = 1;
string status = 2;
uint32 collaborators_amount = 3;
uint32 tasks_amount = 4;
float progress = 5;
string owner = 6;
string experiment_id = 7;
}

// Should we merge NodeInfo, ShardInfo and EnvoyInfo messages?
// Our design is: one Virtual Dataset Shard = one Envoy
message CudaDeviceInfo {
uint64 index = 1;
uint64 memory_total = 2;
uint64 memory_utilized = 3;
string device_utilization = 4;
string cuda_driver_version = 5;
string cuda_version = 6;
string name = 7;
}

message NodeInfo {
string name = 1;
repeated CudaDeviceInfo cuda_devices = 2;
}

message ShardInfo {
NodeInfo node_info = 1;
string shard_description = 2;
uint64 n_samples = 3;
// We just pass numpy shapes
// NAMING: repeated field names should be pluralized
repeated string sample_shape = 4;
repeated string target_shape = 5;
}

message EnvoyInfo {
ShardInfo shard_info = 1;
bool is_online = 2;
bool is_experiment_running = 3;
google.protobuf.Timestamp last_updated = 4;
google.protobuf.Duration valid_duration = 5;
string experiment_name = 6;
string envoy_id = 7;
}
54 changes: 8 additions & 46 deletions openfl/protocols/director.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ syntax = "proto3";

package openfl.director;

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "openfl/protocols/base.proto";

// DirectorScientist (DirectorFrontend) and DirectorEnvoy services should be separated
// And placed in distinct packages openfl.director.envoy and openfl.director.scientist
service Director {
// 1. Envoy RPCs
rpc UpdateShardInfo(UpdateShardInfoRequest) returns (UpdateShardInfoResponse) {}
Expand All @@ -31,35 +32,11 @@ service Director {
rpc GetTrainedModel(GetTrainedModelRequest) returns (TrainedModelResponse) {}
rpc GetMetricStream(GetMetricStreamRequest) returns (stream GetMetricStreamResponse) {}
rpc RemoveExperimentData(RemoveExperimentRequest) returns (RemoveExperimentResponse) {}
rpc GetEnvoys(GetEnvoysRequest) returns (GetEnvoysResponse) {}
}

message CudaDeviceInfo {
uint64 index = 1;
uint64 memory_total = 2;
uint64 memory_utilized = 3;
string device_utilization = 4;
string cuda_driver_version = 5;
string cuda_version = 6;
string name = 7;
rpc GetEnvoysList(GetEnvoysListRequest) returns (GetEnvoysListResponse) {}
}

// Envoy Messages

message NodeInfo {
string name = 1;
repeated CudaDeviceInfo cuda_devices = 2;
}

message ShardInfo {
NodeInfo node_info = 1;
string shard_description = 2;
uint64 n_samples = 3;
// We just pass numpy shapes
repeated string sample_shape = 4;
repeated string target_shape = 5;
}

message UpdateShardInfoRequest {
ShardInfo shard_info = 1;
}
Expand All @@ -83,7 +60,7 @@ message GetExperimentDataRequest {

message ExperimentData {
uint32 size = 1; // size, in bytes, of the data sent in npbytes
bytes npbytes = 2; //actual data
bytes npbytes = 2; // actual data
}

message UpdateEnvoyStatusRequest {
Expand All @@ -109,14 +86,6 @@ message SetExperimentFailedResponse {}

message GetExperimentsListRequest {}

message ExperimentListItem {
string name = 1;
string status = 2;
uint32 collaborators_amount = 3;
uint32 tasks_amount = 4;
float progress = 5;
}

message GetExperimentsListResponse {
repeated ExperimentListItem experiments = 1;
}
Expand All @@ -131,6 +100,8 @@ message GetExperimentDescriptionResponse {

// Frontend. API Messages

// ExperimentData and ExperimentInfo messages may be united
// to mirror ExperimentRepresentation class oneness
message ExperimentInfo {
string name = 1;
repeated string collaborator_names = 2;
Expand Down Expand Up @@ -189,17 +160,8 @@ message RemoveExperimentResponse {
bool acknowledgement = 1;
}

message EnvoyInfo {
ShardInfo shard_info = 1;
bool is_online = 2;
bool is_experiment_running = 3;
google.protobuf.Timestamp last_updated = 4;
google.protobuf.Duration valid_duration = 5;
string experiment_name = 6;
}

message GetEnvoysRequest {}
message GetEnvoysListRequest {}

message GetEnvoysResponse {
message GetEnvoysListResponse {
repeated EnvoyInfo envoy_infos = 1;
}
118 changes: 118 additions & 0 deletions openfl/protocols/director_admin.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (C) 2020-2022 Intel Corporation
// Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.

syntax = "proto3";

package openfl.director.admin;

import "google/protobuf/duration.proto";
import "openfl/protocols/base.proto";

service DirectorAdmin {
// Director control
rpc StopDirectorService (StopDirectorServiceRequest) returns (StopDirectorServiceResponse) {}
rpc PauseExperimentsExecution (PauseExperimentsExecutionRequest) returns (PauseExperimentsExecutionResponse) {}
rpc ResumeExperimentsExecution (ResumeExperimentsExecutionRequest) returns (PauseExperimentsExecutionResponse) {}
// Experiment registry
rpc GetExperimentDescription (GetExperimentDescriptionRequest)
returns (GetExperimentDescriptionResponse) {}
rpc GetExperimentsList (GetExperimentsListRequest) returns (GetExperimentsListResponse) {}
rpc AbortExperiment (AbortExperimentRequest) returns (AbortExperimentResponse) {}
rpc ApproveExperiment (ApproveExperimentRequest) returns (ApproveExperimentResponse) {}
// Envoy registry
rpc GetEnvoysList (GetEnvoysListRequest) returns (GetEnvoysListResponse) {}
rpc RemoveEnvoy (RemoveEnvoyRequest) returns (RemoveEnvoyResponse) {}
// DistributedSystemSettings
// Should we make one huge message?
// An alternative would be dividing Director settings and Envoy settings.
// Another option is creating numerous distinct smaller messages
// for different components of the system.
rpc GetCurrentSettings (GetCurrentSettingsRequest) returns (GetCurrentSettingsResponse) {}
rpc UpdateSettings (UpdateSettingsRequest) returns (UpdateSettingsResponse) {}

// Questionable
// rpc GetSystemStatus(GetSystemStatusRequest) returns (GetSystemStatusResponse) {}
// rpc RemoveExperimentData(RemoveExperimentRequest) returns (RemoveExperimentResponse) {}
}

message StopDirectorServiceRequest {}

message StopDirectorServiceResponse {
bool acknowledgement = 1;
}

message PauseExperimentsExecutionRequest {}

message PauseExperimentsExecutionResponse {
bool acknowledgement = 1;
}

message ResumeExperimentsExecutionRequest {}

message ResumeExperimentsExecutionResponse {
bool acknowledgement = 1;
}

message GetExperimentDescriptionRequest {
string experiment_id = 1;
}

message GetExperimentDescriptionResponse {
ExperimentDescription experiment = 1;
}

message GetExperimentsListRequest {}

message GetExperimentsListResponse {
repeated ExperimentListItem experiments = 1;
}

message AbortExperimentRequest {
string experiment_id = 1;
}

message AbortExperimentResponse {
bool acknowledgement = 1;
}

message ApproveExperimentRequest {
string experiment_id = 1;
}

message ApproveExperimentResponse {
bool acknowledgement = 1;
}

message GetEnvoysListRequest {}

message GetEnvoysListResponse {
repeated EnvoyInfo envoy_infos = 1;
}

message RemoveEnvoyRequest {
string envoy_id = 1;
}

message RemoveEnvoyResponse {
bool acknowledgement = 1;
}

message DirectorSettings {
google.protobuf.Duration envoy_health_check_period = 1;
bool install_python_requirements = 2;
google.protobuf.Duration some_timeout = 3;
}

message GetCurrentSettingsRequest {}

message GetCurrentSettingsResponse {
DirectorSettings director_settings = 1;
}

message UpdateSettingsRequest {
DirectorSettings director_settings = 1;
}

message UpdateSettingsResponse {
bool acknowledgement = 1;
}
9 changes: 5 additions & 4 deletions openfl/transport/grpc/director_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor
from openfl.pipelines import NoCompressionPipeline
from openfl.protocols import base_pb2
from openfl.protocols import director_pb2
from openfl.protocols import director_pb2_grpc
from openfl.protocols import interceptors
Expand Down Expand Up @@ -62,15 +63,15 @@ def report_shard_info(self, shard_descriptor: Type[ShardDescriptor],
"""Report shard info to the director."""
logger.info('Send report UpdateShardInfo')
# True considered as successful registration
shard_info = director_pb2.ShardInfo(
shard_info = base_pb2.ShardInfo(
shard_description=shard_descriptor.dataset_description,
sample_shape=shard_descriptor.sample_shape,
target_shape=shard_descriptor.target_shape
)

shard_info.node_info.name = self.shard_name
shard_info.node_info.cuda_devices.extend(
director_pb2.CudaDeviceInfo(index=cuda_device)
base_pb2.CudaDeviceInfo(index=cuda_device)
for cuda_device in cuda_devices
)

Expand Down Expand Up @@ -135,7 +136,7 @@ def send_health_check(
if cuda_devices_info is not None:
try:
cuda_messages = [
director_pb2.CudaDeviceInfo(**item)
base_pb2.CudaDeviceInfo(**item)
for item in cuda_devices_info
]
except Exception as e:
Expand Down Expand Up @@ -291,7 +292,7 @@ def remove_experiment_data(self, name):

def get_envoys(self, raw_result=False):
"""Get envoys info."""
envoys = self.stub.GetEnvoys(director_pb2.GetEnvoysRequest())
envoys = self.stub.GetEnvoysList(director_pb2.GetEnvoysListRequest())
if raw_result:
return envoys
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
Expand Down
12 changes: 6 additions & 6 deletions openfl/transport/grpc/director_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def GetDatasetInfo(self, request, context): # NOQA:N802
logger.info('Request GetDatasetInfo has got!')

sample_shape, target_shape = self.director.get_dataset_info()
shard_info = director_pb2.ShardInfo(
shard_info = base_pb2.ShardInfo(
sample_shape=sample_shape,
target_shape=target_shape
)
Expand Down Expand Up @@ -297,14 +297,14 @@ async def UpdateEnvoyStatus(self, request, context): # NOQA:N802

return resp

async def GetEnvoys(self, request, context): # NOQA:N802
async def GetEnvoysList(self, request, context): # NOQA:N802
"""Get a status information about envoys."""
envoy_infos = self.director.get_envoys()
envoy_statuses = []
for envoy_info in envoy_infos:
envoy_info_message = director_pb2.EnvoyInfo(
envoy_info_message = base_pb2.EnvoyInfo(
shard_info=ParseDict(
envoy_info['shard_info'], director_pb2.ShardInfo(),
envoy_info['shard_info'], base_pb2.ShardInfo(),
ignore_unknown_fields=True),
is_online=envoy_info['is_online'],
is_experiment_running=envoy_info['is_experiment_running'])
Expand All @@ -313,14 +313,14 @@ async def GetEnvoys(self, request, context): # NOQA:N802

envoy_statuses.append(envoy_info_message)

return director_pb2.GetEnvoysResponse(envoy_infos=envoy_statuses)
return director_pb2.GetEnvoysListResponse(envoy_infos=envoy_statuses)

async def GetExperimentsList(self, request, context): # NOQA:N802
"""Get list of experiments description."""
caller = self.get_caller(context)
experiments = self.director.get_experiments_list(caller)
experiment_list = [
director_pb2.ExperimentListItem(**exp)
base_pb2.ExperimentListItem(**exp)
for exp in experiments
]
return director_pb2.GetExperimentsListResponse(
Expand Down