Skip to content

Commit

Permalink
Horizontal Secure XGBoost Support (NVIDIA#2562)
Browse files Browse the repository at this point in the history
* Updated FOBS readme to add DatumManager, added agrpcs as secure scheme

* Implemented horizontal calls in nvflare plugin

* Added support for horizontal secure XGBoost

* Fixed a few horizontal issues

* Added reliable message

* Added ReliableMessage parameters

* Added log for debugging empty rcv_buf

* Added finally block to finish duplicate seq

* Removed debug statements

* format change

* Add in process client api tests (NVIDIA#2549)

* Add in process client api tests

* Fix headers

* Fix comments

* Add client controller executor (NVIDIA#2530)

* add client controller executor

* address comments

* enhance abort, set peer props

* remove asserts

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* Add option in dashboard cli for AWS vpc and subnet

* [2.5] Clean up to allow creation of nvflare light (NVIDIA#2573)

* clean up to allow creation of nvflare light

* move defs to cellnet

* Enable patch and build for nvflight (NVIDIA#2574)

* add FedBN Implementation on NVFlare research folder - a local batch normalization federated learning method  (NVIDIA#2524)

* add research/fedbn

* delete redudant controller and correct figs requirements

* update plot_requirements

* rewrite fedbn

* update jobs

* remove workspace

* update README

* simplify job simulator_run to take only one workspace parameter. (NVIDIA#2528)

* Add missing client api test jobs (NVIDIA#2535)

* Fixed the simulator server workspace root dir (NVIDIA#2533)

* Fixed the simulator server root dir error.

* Added unit test for SimulatorRunner start_server_app.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

* Improve InProcessClientAPIExecutor  (NVIDIA#2536)

* 1. rename ExeTaskFnWrapper class to TaskScriptRunner
2. Replace implementation of the inprocess function exection from calling a main() function to user runpy.run_path() which reduce the user requirements to have main() function
3. redirect print() to logger.info()

* 1. rename ExeTaskFnWrapper class to TaskScriptRunner
2. Replace implementation of the inprocess function exection from calling a main() function to user runpy.run_path() which reduce the user requirements to have main() function
3. redirect print() to logger.info()

* make result check and result pull use the same configurable variable

* rename exec_task_fn_wrapper to task_script_runner.py

* fix typo

* FIX MLFLow and Tensorboard Output to be consistent with new Workspace root changes (NVIDIA#2537)

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1. Remove the default code to use configuration
2. fix some broken notebook

* rollback changes

* Fix decorator issue (NVIDIA#2542)

* update create and run job script

* FLModel summary (NVIDIA#2544)

* add FLModel Summary

* format

* remove jobs folder

* expose aggregate_fn to users for overwriting (NVIDIA#2539)

* handle cases where the script with relative path in Script Runner (NVIDIA#2543)

* handle cases where the script with relative path

* handle cases where the script with relative path

* add more unit test cases and change the file search logics

* code format

* add more unit test cases and change the file search logics

* Lr newton raphson (NVIDIA#2529)

* Implement federated logistic regression with second-order newton raphson.

Update file headers.

Update README.

Update README.

Fix README.

Refine README.

Update README.

Added more logging for the job status changing. (NVIDIA#2480)

* Added more logging for the job status changing.

* Fixed a logging call error.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

Fix update client status (NVIDIA#2508)

* check workflow id before updating client status

* change order of checks

Add user guide on how to deploy to EKS (NVIDIA#2510)

* Add user guide on how to deploy to EKS

* Address comments

Improve dead client handling (NVIDIA#2506)

* dev

* test dead client cmd

* added more info for dead client tracing

* remove unused imports

* fix unit test

* fix test case

* address PR comments

---------

Co-authored-by: Sean Yang <seany314@gmail.com>

Enhance WFController (NVIDIA#2505)

* set flmodel variables in basefedavg

* make round info optional, fix inproc api bug

temporarily disable preflight tests (NVIDIA#2521)

Upgrade dependencies (NVIDIA#2516)

Use full path for PSI components (NVIDIA#2437) (NVIDIA#2517)

Multiple bug fixes from 2.4 (NVIDIA#2518)

* [2.4] Support client custom code in simulator (NVIDIA#2447)

* Support client custom code in simulator

* Fix client custom code

* Remove cancel_futures args (NVIDIA#2457)

* Fix sub_worker_process shutdown (NVIDIA#2458)

* Set GRPC_ENABLE_FORK_SUPPORT to False (NVIDIA#2474)

Pythonic job creation (NVIDIA#2483)

* WIP: constructed the FedJob.

* WIP: server_app josn export.

* generate the job app config.

* fully functional pythonic job creation.

* Added simulator_run for pythonic API.

* reformat.

* Added filters support for pythonic job creation.

* handled the direct import case in fed_job.

* refactor.

* Added the resource_spec set function for FedJob.

* refactored.

* Moved the ClientApp and ServerApp into fed_app.py.

* Refactored: removed the _FilterDef class.

* refactored.

* Rename job config classes (NVIDIA#3)

* rename config related classes

* add client api example

* fix metric streaming

* add to() routine

* Enable obj in the constructor as paramenter.

* Added support for the launcher script.

* refactored.

* reformat.

* Update the comment.

* re-arrange the package location.

* Added add_ext_script() for BaseAppConfig.

* codestyle fix.

* Removed the client-api-pt example.

* removed no used import.

* fixed the in_time_accumulate_weighted_aggregator_test.py

* Added Enum parameter support.

* Added docstring.

* Added ability to handle parameters from base class.

* Move the parameter data format conversion to the START_RUN event for InProcessClientAPIExecutor.

* Added params_exchange_format for PTInProcessClientAPIExecutor.

* codestyle fix.

* Fixed a custom code folder structure issue.

* work for sub-folder custom files.

* backed to handle parameters from base classes.

* Support folder structure job config.

* Added support for flat folder from '.XXX' import.

* codestyle fix.

* refactored and add docstring.

* Address some of the PR reviews.

---------

Co-authored-by: Holger Roth <6304754+holgerroth@users.noreply.github.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>
Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

Enhancements from 2.4 (NVIDIA#2519)

* Starts heartbeat after task is pull and before task execution (NVIDIA#2415)

* Starts pipe handler heartbeat send/check after task is pull before task execution (NVIDIA#2442)

* [2.4] Improve cell pipe timeout handling (NVIDIA#2441)

* improve cell pipe timeout handling

* improved end and abort handling

* improve timeout handling

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* [2.4] Enhance launcher executor (NVIDIA#2433)

* Update LauncherExecutor logs and execution setup timeout

* Change name

* [2.4] Fire and forget for pipe handler control messages (NVIDIA#2413)

* Fire and forget for pipe handler control messages

* Add default timeout value

* fix wait-for-reply (NVIDIA#2478)

* Fix pipe handler timeout in task exchanger and launcher executor (NVIDIA#2495)

* Fix metric relay pipe handler timeout (NVIDIA#2496)

* Rely on launcher check_run_status to pause/resume hb (NVIDIA#2502)

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

---------

Co-authored-by: Yan Cheng <58191769+yanchengnv@users.noreply.github.com>
Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

Update ci cd from 2.4 (NVIDIA#2520)

* Update github actions (NVIDIA#2450)

* Fix premerge (NVIDIA#2467)

* Fix issues on hello-world TF2 notebook

* Fix tf integration test (NVIDIA#2504)

* Add client api integration tests

---------

Co-authored-by: Isaac Yang <isaacy@nvidia.com>
Co-authored-by: Sean Yang <seany314@gmail.com>

use controller name for stats (NVIDIA#2522)

Simulator workspace re-design (NVIDIA#2492)

* Redesign simulator workspace structure.

* working, needs clean.

* Changed the simulator workspacce structure to be consistent with POC.

* Moved the logfile init to start_server_app().

* optimzed.

* adjust the stats pool location.

* Addressed the PR views.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

Simulator end run for all clients (NVIDIA#2514)

* Provide an option to run END_RUN for all clients.

* Added end_run_all option for simulator to run END_RUN event for all clients.

* Fixed a add_argument type, added help message.

* Changed to use add_argument(() compatible with python 3.8.

* reformat.

* rewrite the _end_run_clients() and add docstring for easier understanding.

* reformat.

* adjusting the locking in the _end_run_clients.

* Fixed a potential None pointer error.

* renamed the clients_finished_end_run variable.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Sean Yang <seany314@gmail.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

Secure XGBoost Integration (NVIDIA#2512)

* Updated FOBS readme to add DatumManager, added agrpcs as secure scheme

* Refactoring

* Refactored the secure version to histogram_based_v2

* Replaced Paillier with a mock encryptor

* Added license header

* Put mock back

* Added metrics_writer back and fixed GRPC error reply

simplify job simulator_run to take only one workspace parameter. (NVIDIA#2528)

Fix README.

Fix file links in README.

Fix file links in README.

Add comparison between centralized and federated training code.

Add missing client api test jobs (NVIDIA#2535)

Fixed the simulator server workspace root dir (NVIDIA#2533)

* Fixed the simulator server root dir error.

* Added unit test for SimulatorRunner start_server_app.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

Improve InProcessClientAPIExecutor  (NVIDIA#2536)

* 1. rename ExeTaskFnWrapper class to TaskScriptRunner
2. Replace implementation of the inprocess function exection from calling a main() function to user runpy.run_path() which reduce the user requirements to have main() function
3. redirect print() to logger.info()

* 1. rename ExeTaskFnWrapper class to TaskScriptRunner
2. Replace implementation of the inprocess function exection from calling a main() function to user runpy.run_path() which reduce the user requirements to have main() function
3. redirect print() to logger.info()

* make result check and result pull use the same configurable variable

* rename exec_task_fn_wrapper to task_script_runner.py

* fix typo

Update README for launching python script.

Modify tensorboard logdir.

Link to environment setup instructions.

expose aggregate_fn to users for overwriting (NVIDIA#2539)

FIX MLFLow and Tensorboard Output to be consistent with new Workspace root changes (NVIDIA#2537)

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1) fix mlruns and tb_events dirs due to workspace directory changes
2) for MLFLow, add tracking_rui default to workspace_dir / <job_id>/mlruns instead current default <workspace_dir>/mlruns. This is a) consistent with Tensorboard 2) avoid job output oeverwrite the 1st job

* 1. Remove the default code to use configuration
2. fix some broken notebook

* rollback changes

Fix decorator issue (NVIDIA#2542)

Remove line number in code link.

FLModel summary (NVIDIA#2544)

* add FLModel Summary

* format

formatting

Update KM example, add 2-stage solution without HE (NVIDIA#2541)

* add KM without HE, update everything

* fix license header

* fix license header - update year to 2024

* fix format

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

* update license

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Holger Roth <hroth@nvidia.com>

* Add information about dig (bind9-dnsutils) in the document

* format update

* Update KM example, add 2-stage solution without HE (NVIDIA#2541)

* add KM without HE, update everything

* fix license header

* fix license header - update year to 2024

* fix format

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>

* Update monai readme to remove logging.conf (NVIDIA#2552)

* MONAI mednist example (NVIDIA#2532)

* add monai notebook

* add training script

* update example

* update notebook

* use job template

* call init later

* swith back

* add gitignore

* update notebooks

* add readmes

* send received model to GPU

* use monai tb stats handler

* formatting

* Improve AWS cloud launch script

* Add in process client api tests (NVIDIA#2549)

* Add in process client api tests

* Fix headers

* Fix comments

* Add client controller executor (NVIDIA#2530)

* add client controller executor

* address comments

* enhance abort, set peer props

* remove asserts

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* Add option in dashboard cli for AWS vpc and subnet

* add note on README visualization

* update README

* update readme

* update readme

* update readme

* [2.5] Clean up to allow creation of nvflare light (NVIDIA#2573)

* clean up to allow creation of nvflare light

* move defs to cellnet

* Enable patch and build for nvflight (NVIDIA#2574)

* verified commit

---------

Co-authored-by: Yuhong Wen <yuhongw@nvidia.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>
Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Sean Yang <seany314@gmail.com>
Co-authored-by: Zhijin <zhijinl@nvidia.com>
Co-authored-by: Holger Roth <hroth@nvidia.com>
Co-authored-by: Isaac Yang <isaacy@nvidia.com>
Co-authored-by: Ziyue Xu <ziyue.xu@gmail.com>
Co-authored-by: Ziyue Xu <71786575+ZiyueXu77@users.noreply.github.com>
Co-authored-by: Holger Roth <6304754+holgerroth@users.noreply.github.com>
Co-authored-by: Yan Cheng <58191769+yanchengnv@users.noreply.github.com>

* fix MLFLOW example (NVIDIA#2575)

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* BugFix: InProcessClientAPIExecutor's TaskScriptRunner (NVIDIA#2558)

* 1) find script full path to indicate which site script to avoid loading run script
2) make sure the task script failed will cause the client to return failure status which will trigger job stop rather wait forever
3) add different unit tests

* sort key in unit test

* add logic to improve error message

* style format

* add more tests and logics

* code format

* code format

* fix steps error

* fix global steps

* rollback some changes and split it into another PR

* rollback some changes and split it into another PR

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* update client_api.png (NVIDIA#2577)

* Fix the simulator worker sys path (NVIDIA#2561)

* Fixed the simulator worker sys path.

* fixed the get_new_sys_path() logic, added in unit test.

* fixed isort.

* Changed the _get_new_sys_path() implementation.

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>

* ReliableMessage register is changed to register aux message. Added support for Mac with vertical

---------

Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>
Co-authored-by: Sean Yang <seany314@gmail.com>
Co-authored-by: Isaac Yang <isaacy@nvidia.com>
Co-authored-by: Yan Cheng <58191769+yanchengnv@users.noreply.github.com>
Co-authored-by: Minghui Chen <50226876+MinghuiChen43@users.noreply.github.com>
Co-authored-by: Yuhong Wen <yuhongw@nvidia.com>
Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Zhijin <zhijinl@nvidia.com>
Co-authored-by: Holger Roth <hroth@nvidia.com>
Co-authored-by: Ziyue Xu <ziyue.xu@gmail.com>
Co-authored-by: Ziyue Xu <71786575+ZiyueXu77@users.noreply.github.com>
Co-authored-by: Holger Roth <6304754+holgerroth@users.noreply.github.com>
  • Loading branch information
13 people committed May 22, 2024
1 parent b11febd commit 7b33e3f
Show file tree
Hide file tree
Showing 18 changed files with 373 additions and 166 deletions.
4 changes: 2 additions & 2 deletions integration/xgboost/processor/src/dam/dam.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void print_buffer(uint8_t *buffer, int size) {
}

// DamEncoder ======
void DamEncoder::AddFloatArray(std::vector<double> &value) {
void DamEncoder::AddFloatArray(const std::vector<double> &value) {
if (encoded) {
std::cout << "Buffer is already encoded" << std::endl;
return;
Expand All @@ -38,7 +38,7 @@ void DamEncoder::AddFloatArray(std::vector<double> &value) {
entries->push_back(new Entry(kDataTypeFloatArray, buffer, value.size()));
}

void DamEncoder::AddIntArray(std::vector<int64_t> &value) {
void DamEncoder::AddIntArray(const std::vector<int64_t> &value) {
std::cout << "AddIntArray called, size: " << value.size() << std::endl;
if (encoded) {
std::cout << "Buffer is already encoded" << std::endl;
Expand Down
4 changes: 2 additions & 2 deletions integration/xgboost/processor/src/include/dam.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class DamEncoder {
this->data_set_id = data_set_id;
}

void AddIntArray(std::vector<int64_t> &value);
void AddIntArray(const std::vector<int64_t> &value);

void AddFloatArray(std::vector<double> &value);
void AddFloatArray(const std::vector<double> &value);

std::uint8_t * Finish(size_t &size);

Expand Down
13 changes: 9 additions & 4 deletions integration/xgboost/processor/src/include/nvflare_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const int kDataSetHGPairs = 1;
const int kDataSetAggregation = 2;
const int kDataSetAggregationWithFeatures = 3;
const int kDataSetAggregationResult = 4;
const int kDataSetHistograms = 5;
const int kDataSetHistogramResult = 6;

class NVFlareProcessor: public processing::Processor {
private:
Expand Down Expand Up @@ -51,11 +53,11 @@ class NVFlareProcessor: public processing::Processor {
free(buffer);
}

void* ProcessGHPairs(size_t &size, std::vector<double>& pairs) override;
void* ProcessGHPairs(size_t *size, const std::vector<double>& pairs) override;

void* HandleGHPairs(size_t &size, void *buffer, size_t buf_size) override;
void* HandleGHPairs(size_t *size, void *buffer, size_t buf_size) override;

void InitAggregationContext(const std::vector<uint32_t> &cuts, std::vector<int> &slots) override {
void InitAggregationContext(const std::vector<uint32_t> &cuts, const std::vector<int> &slots) override {
if (this->slots_.empty()) {
this->cuts_ = std::vector<uint32_t>(cuts);
this->slots_ = std::vector<int>(slots);
Expand All @@ -64,8 +66,11 @@ class NVFlareProcessor: public processing::Processor {
}
}

void *ProcessAggregation(size_t &size, std::map<int, std::vector<int>> nodes) override;
void *ProcessAggregation(size_t *size, std::map<int, std::vector<int>> nodes) override;

std::vector<double> HandleAggregation(void *buffer, size_t buf_size) override;

void *ProcessHistograms(size_t *size, const std::vector<double>& histograms) override;

std::vector<double> HandleHistograms(void *buffer, size_t buf_size) override;
};
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ using std::vector;
using std::cout;
using std::endl;

void* NVFlareProcessor::ProcessGHPairs(size_t &size, std::vector<double>& pairs) {
void* NVFlareProcessor::ProcessGHPairs(size_t *size, const std::vector<double>& pairs) {
cout << "ProcessGHPairs called with pairs size: " << pairs.size() << endl;
gh_pairs_ = new std::vector<double>(pairs);

DamEncoder encoder(kDataSetHGPairs);
encoder.AddFloatArray(pairs);
auto buffer = encoder.Finish(size);
auto buffer = encoder.Finish(*size);

return buffer;
}

void* NVFlareProcessor::HandleGHPairs(size_t &size, void *buffer, size_t buf_size) {
void* NVFlareProcessor::HandleGHPairs(size_t *size, void *buffer, size_t buf_size) {
cout << "HandleGHPairs called with buffer size: " << buf_size << " Active: " << active_ << endl;
size = buf_size;
*size = buf_size;
return buffer;
}

void *NVFlareProcessor::ProcessAggregation(size_t &size, std::map<int, std::vector<int>> nodes) {
void *NVFlareProcessor::ProcessAggregation(size_t *size, std::map<int, std::vector<int>> nodes) {
cout << "ProcessAggregation called with " << nodes.size() << " nodes" << endl;

int64_t data_set_id;
Expand Down Expand Up @@ -107,7 +107,7 @@ void *NVFlareProcessor::ProcessAggregation(size_t &size, std::map<int, std::vect
encoder.AddIntArray(rows);
}

auto buffer = encoder.Finish(size);
auto buffer = encoder.Finish(*size);
return buffer;
}

Expand All @@ -124,7 +124,8 @@ std::vector<double> NVFlareProcessor::HandleAggregation(void *buffer, size_t buf
while (remaining > kPrefixLen) {
DamDecoder decoder(reinterpret_cast<uint8_t *>(pointer), remaining);
if (!decoder.IsValid()) {
cout << "Not DAM encoded buffer ignored at offset: " << (int)(pointer - (char *)buffer) << endl;
cout << "Not DAM encoded buffer ignored at offset: "
<< static_cast<int>((pointer - reinterpret_cast<char *>(buffer))) << endl;
break;
}
auto size = decoder.Size();
Expand Down Expand Up @@ -153,6 +154,31 @@ std::vector<double> NVFlareProcessor::HandleAggregation(void *buffer, size_t buf
return result;
}

void *NVFlareProcessor::ProcessHistograms(size_t *size, const std::vector<double>& histograms) {
cout << "ProcessHistograms called with " << histograms.size() << " entries" << endl;

DamEncoder encoder(kDataSetHistograms);
encoder.AddFloatArray(histograms);
return encoder.Finish(*size);
}

std::vector<double> NVFlareProcessor::HandleHistograms(void *buffer, size_t buf_size) {
cout << "HandleHistograms called with buffer size: " << buf_size << endl;

DamDecoder decoder(reinterpret_cast<uint8_t *>(buffer), buf_size);
if (!decoder.IsValid()) {
cout << "Not DAM encoded buffer, ignored" << endl;
return std::vector<double>();
}

if (decoder.GetDataSetId() != kDataSetHistogramResult) {
cout << "Invalid dataset: " << decoder.GetDataSetId() << endl;
return std::vector<double>();
}

return decoder.DecodeFloatArray();
}

extern "C" {

processing::Processor *LoadProcessor(char *plugin_name) {
Expand All @@ -163,4 +189,5 @@ processing::Processor *LoadProcessor(char *plugin_name) {

return new NVFlareProcessor();
}
}

} // extern "C"
10 changes: 9 additions & 1 deletion nvflare/apis/utils/reliable_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,28 @@ class ReliableMessage:
_logger = logging.getLogger("ReliableMessage")

@classmethod
def register_request_handler(cls, topic: str, handler_f):
def register_request_handler(cls, topic: str, handler_f, fl_ctx: FLContext):
"""Register a handler for the reliable message with this topic
Args:
topic: The topic of the reliable message
handler_f: The callback function to handle the request in the form of
handler_f(topic, request, fl_ctx)
fl_ctx: FL Context
"""
if not cls._enabled:
raise RuntimeError("ReliableMessage is not enabled. Please call ReliableMessage.enable() to enable it")
if not callable(handler_f):
raise TypeError(f"handler_f must be callable but {type(handler_f)}")
cls._topic_to_handle[topic] = handler_f

# ReliableMessage also sends aux message directly if tx_timeout is too small
engine = fl_ctx.get_engine()
engine.register_aux_message_handler(
topic=topic,
message_handle_func=handler_f,
)

@classmethod
def _get_or_create_receiver(cls, topic: str, request: Shareable, handler_f) -> _RequestReceiver:
tx_id = request.get_header(HEADER_TX_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time

import grpc

import nvflare.app_opt.xgboost.histogram_based_v2.proto.federated_pb2 as pb2
Expand All @@ -23,14 +26,12 @@
from nvflare.fuel.f3.drivers.net_utils import get_open_tcp_port
from nvflare.security.logging import secure_format_exception

DUPLICATE_REQ_MAX_HOLD_TIME = 3600.0


class GrpcClientAdaptor(XGBClientAdaptor, FederatedServicer):
def __init__(
self,
int_server_grpc_options=None,
in_process=True,
):
XGBClientAdaptor.__init__(self, in_process)
def __init__(self, int_server_grpc_options=None, in_process=True, per_msg_timeout=10.0, tx_timeout=100.0):
XGBClientAdaptor.__init__(self, in_process, per_msg_timeout, tx_timeout)
self.int_server_grpc_options = int_server_grpc_options
self.in_process = in_process
self.internal_xgb_server = None
Expand All @@ -41,6 +42,8 @@ def __init__(
self._app_dir = None
self._workspace = None
self._run_dir = None
self._lock = threading.Lock()
self._pending_req = {}

def initialize(self, fl_ctx: FLContext):
self._client_name = fl_ctx.get_identity_name()
Expand Down Expand Up @@ -129,59 +132,108 @@ def _abort(self, reason: str):

def Allgather(self, request: pb2.AllgatherRequest, context):
try:
if self._check_duplicate_seq("allgather", request.rank, request.sequence_number):
return pb2.AllgatherReply(receive_buffer=bytes())

rcv_buf, _ = self._send_all_gather(
rank=request.rank,
seq=request.sequence_number,
send_buf=request.send_buffer,
)

return pb2.AllgatherReply(receive_buffer=rcv_buf)
except Exception as ex:
self._abort(reason=f"send_all_gather exception: {secure_format_exception(ex)}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
return pb2.AllgatherReply(receive_buffer=None)
finally:
self._finish_pending_req("allgather", request.rank, request.sequence_number)

def AllgatherV(self, request: pb2.AllgatherVRequest, context):
try:
if self._check_duplicate_seq("allgatherv", request.rank, request.sequence_number):
return pb2.AllgatherVReply(receive_buffer=bytes())

rcv_buf = self._do_all_gather_v(
rank=request.rank,
seq=request.sequence_number,
send_buf=request.send_buffer,
)

return pb2.AllgatherVReply(receive_buffer=rcv_buf)
except Exception as ex:
self._abort(reason=f"send_all_gather_v exception: {secure_format_exception(ex)}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
return pb2.AllgatherVReply(receive_buffer=None)
finally:
self._finish_pending_req("allgatherv", request.rank, request.sequence_number)

def Allreduce(self, request: pb2.AllreduceRequest, context):
try:
if self._check_duplicate_seq("allreduce", request.rank, request.sequence_number):
return pb2.AllreduceReply(receive_buffer=bytes())

rcv_buf, _ = self._send_all_reduce(
rank=request.rank,
seq=request.sequence_number,
data_type=request.data_type,
reduce_op=request.reduce_operation,
send_buf=request.send_buffer,
)

return pb2.AllreduceReply(receive_buffer=rcv_buf)
except Exception as ex:
self._abort(reason=f"send_all_reduce exception: {secure_format_exception(ex)}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
return pb2.AllreduceReply(receive_buffer=None)
finally:
self._finish_pending_req("allreduce", request.rank, request.sequence_number)

def Broadcast(self, request: pb2.BroadcastRequest, context):
try:
if self._check_duplicate_seq("broadcast", request.rank, request.sequence_number):
return pb2.BroadcastReply(receive_buffer=bytes())

rcv_buf = self._do_broadcast(
rank=request.rank,
send_buf=request.send_buffer,
seq=request.sequence_number,
root=request.root,
)

return pb2.BroadcastReply(receive_buffer=rcv_buf)
except Exception as ex:
self._abort(reason=f"send_broadcast exception: {secure_format_exception(ex)}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
return pb2.BroadcastReply(receive_buffer=None)
finally:
self._finish_pending_req("broadcast", request.rank, request.sequence_number)

def _check_duplicate_seq(self, op: str, rank: int, seq: int):
with self._lock:
event = self._pending_req.get((rank, seq), None)
if event:
self.logger.info(f"Duplicate seq {op=} {rank=} {seq=}, wait till original req is done")
event.wait(DUPLICATE_REQ_MAX_HOLD_TIME)
time.sleep(1) # To ensure the first request is returned first
self.logger.info(f"Duplicate seq {op=} {rank=} {seq=} returned with empty buffer")
return True

with self._lock:
self._pending_req[(rank, seq)] = threading.Event()
return False

def _finish_pending_req(self, op: str, rank: int, seq: int):
with self._lock:
event = self._pending_req.get((rank, seq), None)
if not event:
self.logger.error(f"No pending req {op=} {rank=} {seq=}")
return

event.set()
del self._pending_req[(rank, seq)]
self.logger.info(f"Request seq {op=} {rank=} {seq=} finished processing")
Loading

0 comments on commit 7b33e3f

Please sign in to comment.