# Credit card fraud detection with Federated XGBoost

This notebook shows how to convert an existing tabular credit dataset, enrich and pre-process the data using a single site (like a centralized dataset), and then convert this centralized process into federated ETL steps easily. Then, construct a federated XGBoost; the only thing the user needs to define is the XGBoost data loader. 


## Step 1: Data Preparation 
First, we prepare the data by adding random transactional information to the base creditcard dataset following the below script:

* [prepare data](./notebooks/1.1.prepare_data.ipynb)

## Step 2: Feature Analysis

For this stage, we would like to analyze the data, understand the features, and derive (and encode) secondary features that can be more useful for building the model.

Towards this goal, there are two options:
1. **Feature Enrichment**: This process involves adding new features based on the existing data. For example, we can calculate the average transaction amount for each currency and add this as a new feature. 
2. **Feature Encoding**: This process involves encoding the current features and transforming them to embedding space via machine learning models. This model can be either pre-trained, or trained with the candidate dataset.

Considering the fact that the only two numerical features in the dataset are "Amount" and "Time", we will perform feature enrichment first. Optionally, we can also perform feature encoding. In this example, we use a graph neural network (GNN); we will train the GNN model in a federated, unsupervised fashion and then use the model to encode the features for all sites. 

### Step 2.1: Rule-based Feature Enrichment

#### Single-site Enrichment and Additional Processing
The detailed feature enrichment step is illustrated using one site as example: 

* [feature_enrichments with-one-site](./notebooks/2.1.1.feature_enrichment.ipynb)

Similarly, we examine the additional pre-processing step using one site: 

* [pre-processing with one-site](./notebooks/2.1.2.pre_process.ipynb)


#### Federated Job to Perform on All Sites
In order to run feature enrichment and processing job on each site similar to above steps, we wrote federated ETL job scripts for client-side based on single-site implementations.

* [enrichment script](./src/enrich.py)
* [pre-processing script](./src/pre_process.py) 

Then we define job scripts on server-side to trigger and coordinate running client-side scripts on each site: 

* [enrich_job.py](./src/enrich_job.py)
* [pre-processing-job](./src/pre_process_job.py)

Example script as below:
```
# Define the enrich_ctrl workflow and send to server
    enrich_ctrl = ETLController(task_name="enrich")
    job.to(enrich_ctrl, "server", id="enrich")

    # Add clients
    for site_name in site_names:
        executor = ScriptExecutor(task_script_path=task_script_path, task_script_args=task_script_args)
        job.to(executor, site_name, tasks=["enrich"], gpu=0)
```

### (Optional) Step 2.2: GNN-based Feature Encoding
Based on raw features, or combining the derived features from **Step 2.1**, we can use machine learning models to encode the features. 
In this example, we use federated GNN to learn and generate the feature embeddings.

First, we construct a graph based on the transaction data. Each node represents a transaction, and the edges represent the relationships between transactions. We then use the GNN to learn the embeddings of the nodes, which represent the transaction features.

#### Single-site operation example: graph construction
The detailed graph construction step is illustrated using one site as example:

* [graph_construction with one-site](./notebooks/graph_construct.ipynb)

The detailed GNN training and encoding step is illustrated using one site as example:

* [gnn_training_encoding with one-site](./notebooks/gnn_train_encode.ipynb)

#### Federated Job to Perform on All Sites
In order to run feature graph construction job on each site similar to the enrichment and processing steps, we wrote federated ETL job scripts for client-side based on single-site implementations.

* [graph_construction script](./src/graph_construct.py)
* [gnn_train_encode script](./src/gnn_train_encode.py)

Similarily, we define job scripts on server-side to trigger and coordinate running client-side scripts on each site: 

* [graph_construction_job.py](./src/graph_construct_job.py)
* [gnn_train_encode_job.py](./src/gnn_train_encode_job.py)

The resulting GNN encodings will be merged with the normalized data for enhancing the feature.

## Step 3: Federated XGBoost 

Now that we have the data ready, either enriched and normalized features, or GNN feature embeddings, we can fit them with XGBoost. NVIDIA FLARE has already written XGBoost Controller and Executor for the job. All we need to provide is the data loader to fit into the XGBoost.

To specify the controller and executor, we need to define a Job. You can find the job construction in

* [xgb_job.py](./src/xgb_job.py)
* [xgb_job_embed.py](./src/xgb_job_embed.py)

Below is main part of the code

```
    controller = XGBFedController(
        num_rounds=num_rounds,
        training_mode="horizontal",
        xgb_params=xgb_params,
        xgb_options={"early_stopping_rounds": early_stopping_rounds},
    )
    job.to(controller, "server")

    # Add clients
    for site_name in site_names:
        executor = FedXGBHistogramExecutor(data_loader_id="data_loader")
        job.to(executor, site_name, gpu=0)
        data_loader = CreditCardDataLoader(root_dir=root_dir, file_postfix=file_postfix)
        job.to(data_loader, site_name, id="data_loader")
```
> file_postfix
  file_postfix is default to "_normalized.csv", we are loading the normalized csv files normalized by pre-processing step. 
  the files are 
  * train__normalized.csv
  * test__normalized.csv
  

Notice we assign defined a [```CreditCardDataLoader```](./src/xgb_data_loader.py), this a XGBLoader we defined to load the credit card dataset. 

```
import os
from typing import Optional, Tuple

import pandas as pd
import xgboost as xgb
from xgboost.core import DataSplitMode

from src.app_opt.xgboost.data_loader import XGBDataLoader


class CreditCardDataLoader(XGBDataLoader):
    def __init__(self, root_dir: str, file_postfix: str):
        self.dataset_names = ["train", "test"]
        self.base_file_names = {}
        self.root_dir = root_dir
        self.file_postfix = file_postfix
        for name in self.dataset_names:
            self.base_file_names[name] = name + file_postfix
        self.numerical_columns = [
            "Timestamp",
            "Amount",
            "trans_volume",
            "total_amount",
            "average_amount",
            "hist_trans_volume",
            "hist_total_amount",
            "hist_average_amount",
            "x2_y1",
            "x3_y2",
        ]

    def load_data(self, client_id: str, split_mode: int) -> Tuple[xgb.DMatrix, xgb.DMatrix]:
        data = {}
        for ds_name in self.dataset_names:
            print("\nloading for site = ", client_id, f"{ds_name} dataset \n")
            file_name = os.path.join(self.root_dir, client_id, self.base_file_names[ds_name])
            df = pd.read_csv(file_name)
            data_num = len(data)

            # split to feature and label
            y = df["Class"]
            x = df[self.numerical_columns]
            data[ds_name] = (x, y, data_num)


        # training
        x_train, y_train, total_train_data_num = data["train"]
        data_split_mode = DataSplitMode(split_mode)
        dmat_train = xgb.DMatrix(x_train, label=y_train, data_split_mode=data_split_mode)

        # validation
        x_valid, y_valid, total_valid_data_num = data["test"]
        dmat_valid = xgb.DMatrix(x_valid, label=y_valid, data_split_mode=data_split_mode)

        return dmat_train, dmat_valid
```

We are now ready to run all the code

## Run All the Jobs End-to-end
Here we are going to run each job in sequence. For real-world use case,

* prepare data is not needed, as you already have the data
* feature enrichment / encoding scripts need to be defined based on your own technique
* for XGBoost Job, you will need to write your own data loader 

Note: All Sender SICs are considered clients: they are 
* 'ZHSZUS33_Bank_1'
* 'SHSHKHH1_Bank_2'
* 'YXRXGB22_Bank_3'
* 'WPUWDEFF_Bank_4'
* 'YMNYFRPP_Bank_5'
* 'FBSFCHZH_Bank_6'
* 'YSYCESMM_Bank_7'
* 'ZNZZAU3M_Bank_8'
* 'HCBHSGSG_Bank_9'
* 'XITXUS33_Bank_10' 

Total 10 banks

### Prepare Data

In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("samayashar/fraud-detection-transactions-dataset")
input_csv = f"{path}/synthetic_fraud_dataset.csv"


# only generate config file, or also run the simulated job (on the same machine)
config_only = False
# the workdir is used to store the job config and the simulated job results for each node
work_dir = "/tmp/czt/jobs/workdir"
# the processed dataset folder is used to store the processed data, preparing for each node
processed_dataset_folder = "/tmp/czt/dataset"
# the output folder is used to store the output for each results, preparing for each node
out_folder = "/tmp/czt/output"

!mkdir -p {out_folder}
!mkdir -p {processed_dataset_folder}

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
! python3 ./utils/prepare_data.py -i {input_csv} -o {processed_dataset_folder}

Historical DataFrame size: 27500
Training DataFrame size: 17500
Testing DataFrame size: 5000
Saved HCBHSGSG history transactions to /tmp/czt/dataset/HCBHSGSG_Bank_9/history.csv
Saved XITXUS33 history transactions to /tmp/czt/dataset/XITXUS33_Bank_10/history.csv
Saved YSYCESMM history transactions to /tmp/czt/dataset/YSYCESMM_Bank_7/history.csv
Saved YXRXGB22 history transactions to /tmp/czt/dataset/YXRXGB22_Bank_3/history.csv
Saved ZNZZAU3M history transactions to /tmp/czt/dataset/ZNZZAU3M_Bank_8/history.csv
Saved HCBHSGSG train transactions to /tmp/czt/dataset/HCBHSGSG_Bank_9/train.csv
Saved XITXUS33 train transactions to /tmp/czt/dataset/XITXUS33_Bank_10/train.csv
Saved YSYCESMM train transactions to /tmp/czt/dataset/YSYCESMM_Bank_7/train.csv
Saved YXRXGB22 train transactions to /tmp/czt/dataset/YXRXGB22_Bank_3/train.csv
Saved ZNZZAU3M train transactions to /tmp/czt/dataset/ZNZZAU3M_Bank_8/train.csv
Saved HCBHSGSG test transactions to /tmp/czt/dataset/HCBHSGSG_Bank_9/test.csv
Saved X

In [3]:
site_names = [
    "HCBHSGSG_Bank_9",
    "XITXUS33_Bank_10",
    "YSYCESMM_Bank_7",
    "YXRXGB22_Bank_3",
    "ZNZZAU3M_Bank_8",
]

!echo {' '.join(site_names)}

HCBHSGSG_Bank_9 XITXUS33_Bank_10 YSYCESMM_Bank_7 YXRXGB22_Bank_3 ZNZZAU3M_Bank_8


In [4]:
from nvflare import FedJob
from nvflare.app_common.workflows.etl_controller import ETLController
from nvflare.job_config.script_runner import ScriptRunner

### Enrich data

In [None]:
job = FedJob(name="enrich_job")

enrich_ctrl = ETLController(task_name="enrich")
job.to(enrich_ctrl, "server", id="enrich")

# Add clients
for site_name in site_names:
    executor = ScriptRunner(
        # for this, we output the enriched data to the same folder
        script="src/enrich.py",
        script_args=f"-i {processed_dataset_folder} -o {processed_dataset_folder}",
    )
    job.to(executor, site_name, tasks=["enrich"])

if work_dir:
    print(f"{work_dir=}")
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir='/tmp/czt/jobs/workdir'
[38m2025-08-05 13:47:38,520 - ETLController - INFO - Initializing BaseModelController workflow.[0m
[38m2025-08-05 13:47:38,521 - ETLController - INFO - Beginning model controller run.[0m
[38m2025-08-05 13:47:38,522 - ETLController - INFO - enrich task started.[0m
[38m2025-08-05 13:47:38,522 - ETLController - INFO - Sampled clients: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:47:38,522 - ETLController - INFO - Sending task enrich to ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:47:43,625 - TaskScriptRunner - INFO - start task run() with full path: /tmp/czt/jobs/workdir/XITXUS33_Bank_10/simulate_job/app_XITXUS33_Bank_10/custom/src/enrich.py[0m
[38m2025-08-05 13:47:43,633 - TaskScriptRunner - INFO - start task run() with full path: /tmp/czt/jobs/workdir/YSYCESMM_Bank_7/simulate_job/app_YSYCESMM_Bank_

### Pre-Process Data

In [6]:
job = FedJob(name="pre_processing_job")

pre_process_ctrl = ETLController(task_name="pre_process")
job.to(pre_process_ctrl, "server", id="pre_process")

# Add clients
for site_name in site_names:
    executor = ScriptRunner(
        script="src/pre_process.py",
        script_args=f"-i {processed_dataset_folder} -o {out_folder}",
    )
    job.to(executor, site_name, tasks=["pre_process"])

if work_dir:
    print(f"{work_dir=}")
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir='/tmp/czt/jobs/workdir'
[38m2025-08-05 13:47:51,081 - ETLController - INFO - Initializing BaseModelController workflow.[0m
[38m2025-08-05 13:47:51,082 - ETLController - INFO - Beginning model controller run.[0m
[38m2025-08-05 13:47:51,082 - ETLController - INFO - pre_process task started.[0m
[38m2025-08-05 13:47:51,083 - ETLController - INFO - Sampled clients: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:47:51,083 - ETLController - INFO - Sending task pre_process to ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:47:56,190 - TaskScriptRunner - INFO - start task run() with full path: /tmp/czt/jobs/workdir/XITXUS33_Bank_10/simulate_job/app_XITXUS33_Bank_10/custom/src/pre_process.py[0m
[38m2025-08-05 13:47:56,198 - TaskScriptRunner - INFO - start task run() with full path: /tmp/czt/jobs/workdir/HCBHSGSG_Bank_9/simulate_job/app

Exception in thread Thread-2 (run):
Traceback (most recent call last):
  File "/home/soraxas/.local/share/uv/python/cpython-3.12.8-linux-x86_64-gnu/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/home/soraxas/.local/share/uv/python/cpython-3.12.8-linux-x86_64-gnu/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/executors/task_script_runner.py", line 71, in run
    raise e
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/executors/task_script_runner.py", line 54, in run
    runpy.run_path(self.script_full_path, run_name="__main__")
  File "<frozen runpy>", line 287, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/tmp/czt/jobs/workdir/HCBHSGSG_Bank_9/simulate_job/app_HCBHSGSG_Bank_9/custom/src/pre_process.py", line 

[31m2025-08-05 13:47:57,221 - InProcessClientAPI - ERROR - ask to abort job: reason: pre_process' is aborted, abort_signal_triggered[0m
[31m2025-08-05 13:47:57,258 - InProcessClientAPI - ERROR - ask to abort job: reason: pre_process' is aborted, abort_signal_triggered[0m
[31m2025-08-05 13:47:57,260 - WFCommServer - ERROR - processing error in result_received_cb on task pre_process(5aee7f3a-1bc0-42d4-b753-ab1a33f2ee1b): ValueError: the shareable is not a valid DXO - expect content_type DXO but got None[0m
[31m2025-08-05 13:47:57,267 - WFCommServer - ERROR - Traceback (most recent call last):
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/apis/impl/wf_comm_server.py", line 421, in _do_process_submission
    task.result_received_cb(client_task=client_task, fl_ctx=fl_ctx)
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/workflows/base_model_controller.py", line 221, in _process_result
    result_model = FLModelUtils.from

### Construct Graph

In [7]:
job = FedJob(name="graph_construct_job")

graph_construct_ctrl = ETLController(task_name="graph_construct")
job.to(graph_construct_ctrl, "server", id="graph_construct")

# Add clients
for site_name in site_names:
    executor = ScriptRunner(
        script="src/graph_construct.py",
        script_args=f"-i {processed_dataset_folder} -o {out_folder}",
    )
    job.to(executor, site_name, tasks=["graph_construct"])

if work_dir:
    print(f"{work_dir=}")
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir='/tmp/czt/jobs/workdir'
[38m2025-08-05 13:48:07,230 - ETLController - INFO - Initializing BaseModelController workflow.[0m
[38m2025-08-05 13:48:07,231 - ETLController - INFO - Beginning model controller run.[0m
[38m2025-08-05 13:48:07,232 - ETLController - INFO - graph_construct task started.[0m
[38m2025-08-05 13:48:07,232 - ETLController - INFO - Sampled clients: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:07,232 - ETLController - INFO - Sending task graph_construct to ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:12,272 - TaskScriptRunner - INFO - start task run() with full path: /tmp/czt/jobs/workdir/YSYCESMM_Bank_7/simulate_job/app_YSYCESMM_Bank_7/custom/src/graph_construct.py[0m
[38m2025-08-05 13:48:12,282 - PTInProcessClientAPIExecutor - INFO - execute for task (graph_construct)[0m
[38m2025-08-05 13:48:12,2

### GNN Training and Encoding

In [8]:
from torch_geometric.nn import GraphSAGE

from nvflare import FedJob
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.model import PTModel
from nvflare.job_config.script_runner import ScriptRunner

job = FedJob(name="gnn_train_encode_job")

# Define the controller workflow and send to server
controller = FedAvg(
    num_clients=len(site_names),
    num_rounds=2,
)
job.to(controller, "server")

# Define the model
model = GraphSAGE(
    in_channels=10,
    hidden_channels=64,
    num_layers=2,
    out_channels=64,
)
job.to(PTModel(model), "server")

# Add clients
for site_name in site_names:
    executor = ScriptRunner(
        script="src/gnn_train_encode.py",
        script_args=f"-i {processed_dataset_folder} -o {out_folder}",
    )
    job.to(executor, site_name)

if work_dir:
    print(f"{work_dir=}")
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir='/tmp/czt/jobs/workdir'
[38m2025-08-05 13:48:30,998 - FedAvg - INFO - Initializing BaseModelController workflow.[0m
[38m2025-08-05 13:48:30,999 - FedAvg - INFO - Beginning model controller run.[0m
[38m2025-08-05 13:48:30,999 - FedAvg - INFO - Start FedAvg.[0m
[38m2025-08-05 13:48:30,999 - FedAvg - INFO - loading initial model from persistor[0m
[38m2025-08-05 13:48:30,999 - PTFileModelPersistor - INFO - Both source_ckpt_file_full_name and ckpt_preload_path are not provided. Using the default model weights initialized on the persistor side.[0m
[38m2025-08-05 13:48:31,000 - FedAvg - INFO - Round 0 started.[0m
[38m2025-08-05 13:48:31,000 - FedAvg - INFO - Sampled clients: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:31,000 - FedAvg - INFO - Sending task train to ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:35,808 -

Exception in thread Thread-2 (run):
Traceback (most recent call last):
  File "/home/soraxas/.local/share/uv/python/cpython-3.12.8-linux-x86_64-gnu/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/home/soraxas/.local/share/uv/python/cpython-3.12.8-linux-x86_64-gnu/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/executors/task_script_runner.py", line 71, in run
    raise e
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/executors/task_script_runner.py", line 54, in run
    runpy.run_path(self.script_full_path, run_name="__main__")
  File "<frozen runpy>", line 287, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/tmp/czt/jobs/workdir/XITXUS33_Bank_10/simulate_job/app_XITXUS33_Bank_10/custom/src/gnn_train_encode.py"

[31m2025-08-05 13:48:37,840 - InProcessClientAPI - ERROR - ask to abort job: reason: train' is aborted, abort_signal_triggered[0m
[31m2025-08-05 13:48:37,847 - InProcessClientAPI - ERROR - ask to abort job: reason: train' is aborted, abort_signal_triggered[0m
[31m2025-08-05 13:48:37,850 - WFCommServer - ERROR - processing error in result_received_cb on task train(5ae1fbc8-950d-4f7a-a205-1f84a4d440a0): ValueError: the shareable is not a valid DXO - expect content_type DXO but got None[0m
[31m2025-08-05 13:48:37,853 - WFCommServer - ERROR - Traceback (most recent call last):
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/apis/impl/wf_comm_server.py", line 421, in _do_process_submission
    task.result_received_cb(client_task=client_task, fl_ctx=fl_ctx)
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/nvflare/app_common/workflows/base_model_controller.py", line 221, in _process_result
    result_model = FLModelUtils.from_shareable(result)

### GNN Encoding Merge

In [9]:
! python3 ./utils/merge_feat.py -i {out_folder}

Processing folder:  ZNZZAU3M_Bank_8
Traceback (most recent call last):
  File "/home/soraxas/work/czt/./utils/merge_feat.py", line 93, in <module>
    main()
  File "/home/soraxas/work/czt/./utils/merge_feat.py", line 66, in main
    original_feat = pd.read_csv(original_feat_file)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1026, in read_csv
    return _read(filepath_or_buffer, kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 620, in _read
    parser = TextFileReader(filepath_or_buffer, **kwds)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/soraxas/work/czt/.venv/lib/python3.12/site-packages/pandas/io/parsers/readers.py", line 1620, in __init__
    self._engine = self._make_engine(f, self.engine)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File

### Run XGBoost Job
#### Without GNN embeddings

In [10]:
from nvflare.app_opt.xgboost.histogram_based_v2.fed_controller import XGBFedController
from nvflare.app_opt.xgboost.histogram_based_v2.fed_executor import (
    FedXGBHistogramExecutor,
)

from xgb_data_loader import CreditCardDataLoader


num_rounds = 10
early_stopping_rounds = 10
xgb_params = {
    "max_depth": 8,
    "eta": 0.1,
    "objective": "binary:logistic",
    "eval_metric": "auc",
    "tree_method": "hist",
    "nthread": 16,
}

job = FedJob(name="xgb_job")

# Define the controller workflow and send to server
controller = XGBFedController(
    num_rounds=num_rounds,
    data_split_mode=0,
    secure_training=False,
    xgb_params=xgb_params,
    xgb_options={"early_stopping_rounds": early_stopping_rounds},
)
job.to(controller, "server")

# Add clients
for site_name in site_names:
    executor = FedXGBHistogramExecutor(data_loader_id="data_loader")
    job.to(executor, site_name)
    data_loader = CreditCardDataLoader(
        root_dir=processed_dataset_folder, file_postfix="_normalized.csv"
    )
    job.to(data_loader, site_name, id="data_loader")

if work_dir:
    print("work_dir=", work_dir)
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir= /tmp/czt/jobs/workdir
[38m2025-08-05 13:48:47,297 - XGBFedController - INFO - Waiting for clients to be ready: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:47,298 - XGBFedController - INFO - Configuring clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:47,298 - XGBFedController - INFO - sending task config to clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:52,203 - FedXGBHistogramExecutor - INFO - got my rank: 2[0m
[38m2025-08-05 13:48:52,215 - FedXGBHistogramExecutor - INFO - got my rank: 3[0m
[38m2025-08-05 13:48:52,217 - XGBFedController - INFO - successfully configured client YSYCESMM_Bank_7[0m
[38m2025-08-05 13:48:52,225 - XGBFedController - INFO - successfully configured client YXRXGB22_Bank_3[0m
[38m2025-08-05 13:48:52,246

[13:48:52] Insecure federated server listening on 0.0.0.0:10342, world size 5


[38m2025-08-05 13:48:53,927 - XGBFedController - INFO - Starting clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:53,928 - XGBFedController - INFO - sending task start to clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:48:54,239 - GrpcClientAdaptor - INFO - Start internal server at 127.0.0.1:19922[0m
[38m2025-08-05 13:48:54,256 - GrpcClientAdaptor - INFO - Start internal server at 127.0.0.1:25277[0m
[38m2025-08-05 13:48:54,267 - GrpcServer - INFO - XGBServer: added insecure port at 127.0.0.1:19922[0m
[38m2025-08-05 13:48:54,268 - GrpcServer - INFO - starting gRPC Server[0m
[38m2025-08-05 13:48:54,271 - GrpcClientAdaptor - INFO - Started internal server at 127.0.0.1:19922[0m
[38m2025-08-05 13:48:54,272 - GrpcClientAdaptor - INFO - starting XGBoost Server in another thread[0m
[38m2025-08-05 13:48:54,272 - XGBClientR

#### With GNN embeddings

In [11]:
from xgb_embed_data_loader import CreditCardEmbedDataLoader

from nvflare import FedJob
from nvflare.app_opt.xgboost.histogram_based_v2.fed_controller import XGBFedController
from nvflare.app_opt.xgboost.histogram_based_v2.fed_executor import (
    FedXGBHistogramExecutor,
)

num_rounds = 10
early_stopping_rounds = 10
xgb_params = {
    "max_depth": 8,
    "eta": 0.1,
    "objective": "binary:logistic",
    "eval_metric": "auc",
    "tree_method": "hist",
    "nthread": 16,
}

job = FedJob(name="xgb_job_embed")

# Define the controller workflow and send to server
controller = XGBFedController(
    num_rounds=num_rounds,
    data_split_mode=0,
    secure_training=False,
    xgb_params=xgb_params,
    xgb_options={"early_stopping_rounds": early_stopping_rounds},
)
job.to(controller, "server")

# Add clients
for site_name in site_names:
    executor = FedXGBHistogramExecutor(data_loader_id="data_loader")
    job.to(executor, site_name)
    data_loader = CreditCardEmbedDataLoader(
        root_dir=processed_dataset_folder, file_postfix="_embedding.csv"
    )
    job.to(data_loader, site_name, id="data_loader")

if work_dir:
    print("work_dir=", work_dir)
    job.export_job(work_dir)

if not config_only:
    job.simulator_run(work_dir)

work_dir= /tmp/czt/jobs/workdir
[38m2025-08-05 13:49:01,909 - XGBFedController - INFO - Waiting for clients to be ready: ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:49:01,909 - XGBFedController - INFO - Configuring clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:49:01,909 - XGBFedController - INFO - sending task config to clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:49:06,757 - FedXGBHistogramExecutor - INFO - got my rank: 0[0m
[38m2025-08-05 13:49:06,765 - FedXGBHistogramExecutor - INFO - got my rank: 4[0m
[38m2025-08-05 13:49:06,782 - FedXGBHistogramExecutor - INFO - got my rank: 2[0m
[38m2025-08-05 13:49:06,782 - XGBFedController - INFO - successfully configured client HCBHSGSG_Bank_9[0m
[38m2025-08-05 13:49:06,792 - XGBFedController - INF

[13:49:07] Insecure federated server listening on 0.0.0.0:19913, world size 5


[38m2025-08-05 13:49:08,072 - XGBFedController - INFO - Starting clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:49:08,073 - XGBFedController - INFO - sending task start to clients ['HCBHSGSG_Bank_9', 'XITXUS33_Bank_10', 'YSYCESMM_Bank_7', 'YXRXGB22_Bank_3', 'ZNZZAU3M_Bank_8'][0m
[38m2025-08-05 13:49:08,812 - GrpcClientAdaptor - INFO - Start internal server at 127.0.0.1:45038[0m
[38m2025-08-05 13:49:08,826 - GrpcClientAdaptor - INFO - Start internal server at 127.0.0.1:25509[0m
[38m2025-08-05 13:49:08,828 - GrpcServer - INFO - XGBServer: added insecure port at 127.0.0.1:45038[0m
[38m2025-08-05 13:49:08,829 - GrpcServer - INFO - starting gRPC Server[0m
[38m2025-08-05 13:49:08,831 - GrpcClientAdaptor - INFO - Started internal server at 127.0.0.1:45038[0m
[38m2025-08-05 13:49:08,832 - GrpcClientAdaptor - INFO - starting XGBoost Server in another thread[0m
[38m2025-08-05 13:49:08,833 - XGBClientR

## Prepare Job for POC and Production

With job running well in simulator, we are ready to run in a POC mode, so we can simulate the deployment in localhost or simply deploy to production. 

All we need is the job definition; we can use the job.export_job() method to generate the job configuration and export it to a given directory. For example, in xgb_job.py, we have the following

```
    if work_dir:
        print("work_dir=", work_dir)
        job.export_job(work_dir)

    if not args.config_only:
        job.simulator_run(work_dir)
```

let's try this out and then look at the directory. We use ```tree``` command if you have it. othewise, simply use ```ls -al ```

In [12]:
!find {work_dir} -type f -path "*/simulate_job/*"

/tmp/czt/jobs/workdir/ZNZZAU3M_Bank_8/simulate_job/meta.json
/tmp/czt/jobs/workdir/ZNZZAU3M_Bank_8/simulate_job/app_ZNZZAU3M_Bank_8/custom/xgb_embed_data_loader.py
/tmp/czt/jobs/workdir/ZNZZAU3M_Bank_8/simulate_job/app_ZNZZAU3M_Bank_8/config/config_fed_client.json
/tmp/czt/jobs/workdir/YXRXGB22_Bank_3/simulate_job/meta.json
/tmp/czt/jobs/workdir/YXRXGB22_Bank_3/simulate_job/app_YXRXGB22_Bank_3/custom/xgb_embed_data_loader.py
/tmp/czt/jobs/workdir/YXRXGB22_Bank_3/simulate_job/app_YXRXGB22_Bank_3/config/config_fed_client.json
/tmp/czt/jobs/workdir/YSYCESMM_Bank_7/simulate_job/meta.json
/tmp/czt/jobs/workdir/YSYCESMM_Bank_7/simulate_job/app_YSYCESMM_Bank_7/custom/xgb_embed_data_loader.py
/tmp/czt/jobs/workdir/YSYCESMM_Bank_7/simulate_job/app_YSYCESMM_Bank_7/config/config_fed_client.json
/tmp/czt/jobs/workdir/XITXUS33_Bank_10/simulate_job/meta.json
/tmp/czt/jobs/workdir/XITXUS33_Bank_10/simulate_job/app_XITXUS33_Bank_10/custom/xgb_embed_data_loader.py
/tmp/czt/jobs/workdir/XITXUS33_Bank_10

In [13]:
!cat /tmp/czt/workspace/xgb/credit_card/config/xgb_job/meta.json

cat: /tmp/czt/workspace/xgb/credit_card/config/xgb_job/meta.json: No such file or directory


Now we have the job definition, you can either run it in POC mode or production setup. 

* setup POC
``` 
    nvfalre poc prepare -c <list of clients>
    nvflare poc start -ex admin@nvidia.com  
```
  
* submit job using NVFLARE console 
        
    from different terminal 
   
   ```
   nvflare poc start -p admin@nvidia.com
   ```
   using submit job command
    
* use nvflare job submit command  to submit job

* use NVFLARE API to submit job

The exact same process for production. Please look at this site for documentation or tuturial examples: https://nvidia.github.io/NVFlare/


    
    
    
    




