In [1]:
import pandas as pd
import numpy as np 
from preprocess import Preprocessor
import os

In [2]:
df = pd.read_csv('data/titanic.csv')

In [3]:
df.head()

Unnamed: 0,PassengerId,Survived,Pclass,Age,Sex,SibSp,Parch,Fare,Cabin,Embarked
0,1,0,3,22.0,male,1,0,7.25,N,S
1,2,1,1,38.0,female,1,0,71.2833,C,C
2,3,1,3,26.0,female,0,0,7.925,N,S
3,4,1,1,35.0,female,1,0,53.1,C,S
4,5,0,3,35.0,male,0,0,8.05,N,S


In [4]:
n_cols = len(df.columns)
df.columns = ['id'] + ['y'] + ['x' + str(i) for i in range(n_cols - 2)]
df.columns


Index(['id', 'y', 'x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7'], dtype='object')

In [5]:
processed_df = Preprocessor().fit_transform(df)
processed_df

Unnamed: 0,id,y,x0,x1,x2,x3,x4,x5,x6,x7
0,1,0,3,22.0,1,1,0,7.2500,7,2
1,2,1,1,38.0,0,1,0,71.2833,2,0
2,3,1,3,26.0,0,0,0,7.9250,7,2
3,4,1,1,35.0,0,1,0,53.1000,2,2
4,5,0,3,35.0,1,0,0,8.0500,7,2
...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,27.0,1,0,0,13.0000,7,2
887,888,1,1,19.0,0,0,0,30.0000,1,2
888,889,0,3,24.0,0,1,2,23.4500,7,2
889,890,1,1,26.0,1,0,0,30.0000,2,0


In [9]:
processed_df.to_csv("data/processed_titanic.csv")

In [10]:
num_data = processed_df.shape[0]
train_ratio = 0.2
train_df = processed_df[int(np.ceil(num_data * train_ratio)):]
valid_df = processed_df[:int(np.ceil(num_data * train_ratio))]

In [11]:
train_df.to_csv('data/titanic_train.csv', index=False)
valid_df.to_csv('data/titanic_valid.csv', index=False)

### Data split

In [20]:
from tqdm import tqdm
folders = ["8_0", "6_2", "4_4", "2_6", "0_8"]

for folder in tqdm(folders):
    
    if not os.path.exists(os.path.join("data", folder)):
        os.mkdir(os.path.join("data", folder))
        
    pos = int(folder.split("_")[0])
    train_guest_df = train_df[train_df.columns[: pos + 2]]
    train_host_df = train_df[['id'] + list(train_df.columns[pos + 2:])]
    train_n_cols = len(train_host_df.columns)
    train_host_df.columns = ['id'] + ['x' + str(i) for i in range(train_n_cols - 1)]
    
    valid_guest_df = valid_df[valid_df.columns[: pos + 2]]
    valid_host_df = valid_df[['id'] + list(valid_df.columns[pos + 2:])]
    valid_n_cols = len(valid_host_df.columns)
    valid_host_df.columns = ['id'] + ['x' + str(i) for i in range(valid_n_cols - 1)]

    train_guest_df.to_csv(os.path.join("data", folder, 'train_guest.csv'), index=False)
    train_host_df.to_csv(os.path.join("data", folder, 'train_host.csv'), index=False)
    valid_guest_df.to_csv(os.path.join("data", folder, 'valid_guest.csv'), index=False)
    valid_host_df.to_csv(os.path.join("data", folder, 'valid_host.csv'), index=False)
    
    

100%|██████████| 5/5 [00:00<00:00, 107.21it/s]


In [10]:
import os

In [11]:
!pipeline --help

Usage: pipeline [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  config  pipeline config tool
  init     - DESCRIPTION: Pipeline Config Command.


Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec

In [12]:
!pipeline init --ip 127.0.0.1 --port 9380

Pipeline configuration succeeded.


In [13]:
pwd

'/data/projects/fate/persistence/titanic-fl'

### upload data

In [21]:
import os

In [22]:
!pipeline --help

Usage: pipeline [OPTIONS] COMMAND [ARGS]...

Options:
  --help  Show this message and exit.

Commands:
  config  pipeline config tool
  init     - DESCRIPTION: Pipeline Config Command.


Assume we have a `FATE Flow Service` which name is `fateflow` and the port is `9380`, then exec

In [23]:
!pipeline init --ip fateflow --port 9380

Pipeline configuration succeeded.


### upload data

 Before start a modeling task, the data to be used should be uploaded. 
 Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes.

In [24]:
from pipeline.backend.pipeline import PipeLine

  from .autonotebook import tqdm as notebook_tqdm


Make a `pipeline` instance:

    - initiator: 
        * role: guest
        * party: 9999
    - roles:
        * guest: 9999

note that only local party id is needed.
    

In [25]:
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)

Define partitions for data storage

In [26]:
partition = 4

Define table name and namespace, which will be used in FATE job configuration

In [27]:
# dense_data_guest = {"name": "titanic_guest", "namespace": f"experiment"}
# dense_data_host = {"name": "titanic_host", "namespace": f"experiment"}

dense_data_train_guest = {"name": "titanic_train_guest", "namespace": f"experiment"}
dense_data_train_host = {"name": "titanic_train_host", "namespace": f"experiment"}
dense_data_valid_guest = {"name": "titanic_valid_guest", "namespace": f"experiment"}
dense_data_valid_host  = {"name": "titanic_valid_host", "namespace": f"experiment"}

Now, we add data to be uploaded

In [28]:
folder = "8_0"

data_base = "/data/projects/fate/persistence/titanic-fl/data"
pipeline_upload.add_upload_data(file=os.path.join(data_base, folder, "train_guest.csv"),
                                table_name=dense_data_train_guest["name"],             # table name
                                namespace=dense_data_train_guest["namespace"],         # namespace
                                head=1, partition=partition)               # data info

pipeline_upload.add_upload_data(file=os.path.join(data_base, folder, "train_host.csv"),
                                table_name=dense_data_train_host["name"],
                                namespace=dense_data_train_host["namespace"],
                                head=1, partition=partition)

pipeline_upload.add_upload_data(file=os.path.join(data_base, folder, "valid_guest.csv"),
                                table_name=dense_data_valid_guest["name"],             # table name
                                namespace=dense_data_valid_guest["namespace"],         # namespace
                                head=1, partition=partition)               # data info

pipeline_upload.add_upload_data(file=os.path.join(data_base, folder, "valid_host.csv"),
                                table_name=dense_data_valid_host["name"],
                                namespace=dense_data_valid_host["namespace"],
                                head=1, partition=partition)

We can then upload data

In [29]:
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-07-26 02:26:59.229[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202307260226590898270
[0m
[32m2023-07-26 02:26:59.236[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2023-07-26 02:27:00.244[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-07-26 02:27:01.259[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-07-26 02:27:01.260[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-07-26 02:27:02.273[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-07-26 02:27:03.288[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2023-0

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-07-26 02:27:09.514[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202307260227093754190
[0m
[32m2023-07-26 02:27:09.520[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2023-07-26 02:27:10.529[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-07-26 02:27:11.544[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-07-26 02:27:11.545[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-07-26 02:27:12.559[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-07-26 02:27:13.573[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2023-0

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-07-26 02:27:17.764[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202307260227176291970
[0m
[32m2023-07-26 02:27:17.771[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2023-07-26 02:27:18.780[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-07-26 02:27:19.794[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-07-26 02:27:19.795[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-07-26 02:27:20.809[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-07-26 02:27:21.823[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2023-0

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-07-26 02:27:26.018[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202307260227258791120
[0m
[32m2023-07-26 02:27:26.024[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2023-07-26 02:27:27.033[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-07-26 02:27:28.048[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-07-26 02:27:28.049[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-07-26 02:27:29.063[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-07-26 02:27:30.077[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2023-0

In [32]:
import argparse
from sklearn.metrics import accuracy_score

from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import HeteroSecureBoost
from pipeline.component import Intersection
from pipeline.component import Reader
from pipeline.interface import Data
from pipeline.component import Evaluation
from pipeline.interface import Model

from pipeline.utils.tools import load_job_config

In [33]:
guest, host = 9999, 10000

# init pipeline
pipeline = PipeLine().set_initiator(role="guest", party_id=guest).set_roles(guest=guest, host=host,)

In [34]:
# set data reader and data-io
reader_0, reader_1 = Reader(name="reader_0"), Reader(name="reader_1")
reader_0.get_party_instance(role="guest", party_id=guest).component_param(table=dense_data_train_guest)
reader_0.get_party_instance(role="host", party_id=host).component_param(table=dense_data_train_host)
reader_1.get_party_instance(role="guest", party_id=guest).component_param(table=dense_data_valid_guest)
reader_1.get_party_instance(role="host", party_id=host).component_param(table=dense_data_valid_host)

In [35]:
data_transform_0, data_transform_1 = DataTransform(name="data_transform_0"), DataTransform(name="data_transform_1")

data_transform_0.get_party_instance(
    role="guest", party_id=guest).component_param(
    with_label=True, output_format="dense")
data_transform_0.get_party_instance(role="host", party_id=host).component_param(with_label=False)
data_transform_1.get_party_instance(
    role="guest", party_id=guest).component_param(
    with_label=True, output_format="dense")
data_transform_1.get_party_instance(role="host", party_id=host).component_param(with_label=False)

In [36]:
# data intersect component
intersect_0 = Intersection(name="intersection_0")
intersect_1 = Intersection(name="intersection_1")

In [37]:
# secure boost component
hetero_secure_boost_0 = HeteroSecureBoost(name="hetero_secure_boost_0",
                                          num_trees=10,
                                          task_type="classification",
                                          objective_param={"objective": "cross_entropy"},
                                          encrypt_param={"method": "Paillier"},
                                          tree_param={"max_depth": 3},
                                          complete_secure=True,
                                          validation_freqs=1,
                                          )

# evaluation component
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")

In [38]:
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(
    data_transform_1, data=Data(
        data=reader_1.output.data), model=Model(
        data_transform_0.output.model))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(intersect_1, data=Data(data=data_transform_1.output.data))
pipeline.add_component(hetero_secure_boost_0, data=Data(train_data=intersect_0.output.data,
                                                        validate_data=intersect_1.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_secure_boost_0.output.data))

pipeline.compile()
pipeline.fit()

[32m2023-07-26 02:34:37.245[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202307260234326697150
[0m
[32m2023-07-26 02:34:37.252[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[32m2023-07-26 02:34:38.260[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-07-26 02:34:39.276[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-07-26 02:34:39.277[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component reader_0, time elapse: 0:00:02[0m
[32m2023-07-26 02:34:40.293[0m | [1mINFO    

ValueError: Job is failed, please check out job 202307260234326697150 by fate board or fate_flow cli