# one_party_data_preprocessing

## 事前準備
此篇 tutorial 為單 party 可執行的範例

### 設定資料路徑 & 參數

In [25]:
import os
guest = 9999
data_base = "/data/projects/fate/"

dense_data = {"name": "one_party_data_preprocessing", "namespace": f"experiment"}
dense_data_dir = os.path.join(data_base, "persistence/data/one_party_data_preprocessing.csv")

### 製作 Mimic Data

In [2]:
import pandas as pd
data = {
    "id":[0, 1, 2, 3, 4, 5, 6],
    "target":["0", "1", "0", "0", "1", "1", "0"],
    "x1":[0.1, 1.0, 0.2, None, 0.4, 0.5, 0.1],
    "x2":["A", "B", "B", "A", "C", "A", "A"],
    "x3":[0, 1, 0, None, 0, None, 1],
}
pd.DataFrame(data).to_csv(dense_data_dir, index=False)

### 缺失值 & 資料

In [3]:
dense_df = pd.read_csv(dense_data_dir)
print(dense_df.isna().mean())
print(dense_df.head(5))

id        0.000000
target    0.000000
x1        0.142857
x2        0.000000
x3        0.285714
dtype: float64
   id  target   x1 x2   x3
0   0       0  0.1  A  0.0
1   1       1  1.0  B  1.0
2   2       0  0.2  B  0.0
3   3       0  NaN  A  NaN
4   4       1  0.4  C  0.0


### 上傳資料到 pod 裡

In [26]:
from pipeline.backend.pipeline import PipeLine
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)
partition = 4


pipeline_upload.add_upload_data(file=dense_data_dir,
                                table_name=dense_data["name"],             # table name
                                namespace=dense_data["namespace"],         # namespace
                                head=1, partition=partition,               # data info
                                #with_meta=True,
                                #meta={
                                #        "input_format": "dense",
                                #        "with_label": True,
                                #        "label_name": "target",
                                #        "label_type": "int",
                                #        "data_type":"float64",
                                #        "exclusive_data_type":{"x2":"str"} 
                                #}
                               )               
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-06-05 09:33:56.245[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202306050933561032700
[0m
[32m2023-06-05 09:33:56.252[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[0mm2023-06-05 09:33:57.267[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-06-05 09:33:57.268[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:01[0m
[32m2023-06-05 09:33:58.282[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-06-05 09:33:59.298[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-06-05 09:34:00.327[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32

### 建構 Pipeline

In [48]:
from pipeline.component import Reader

## Reader 讀取 Pod 裡的資料
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
    table={"name": "one_party_data_preprocessing", "namespace": "experiment"})

## Data transform 

`DataTransform` 使用 `component_param(params)` 設定參數, 可一次完成以下三項功能

#### 設定目標變數
1. `with_label=True` ( 因為 label 有在這個 party 的資料表裡 )
2. `label_name='target'` ( default `label_name = 'y'`, 要設定成本資料表的目標變數欄位名`'target'`, 否則會報錯 )
3. `label_type='int'` ( 轉換 label 的資料型態, 配合任務做選擇, 這邊假設是 Binary Classification )

#### 填補缺失值

1. 自動填值 -> `missing_fill=True, missing_fill_method='min' or 'max' or 'mean'`
2. 指定填值 -> `missing_fill=True, missing_fill_method='designated', default_value= any or [list of feature len]`
3. 定義缺失值 -> `missing_impute=[ "", "none", "null", "na", "None", NaN ]` 

#### 替換離群值

1. 自動填值 -> `outlier_replace=True, outlier_replace_method='min' or 'max' or 'mean'`
2. 指定填值 -> `outlier_replace=True, outlier_replace_method='designated', default_value = any or [list of feature len]`
3. 定義離群值 -> `outlier_replace_value=[ "", "none", "null", "na", "None", NaN ]` 

#### 其他重要參數

1. input 資料型態 `data_type="int" or "int64" or "float" or "float64" or "str" or "long"`
2. input 例外資料型態 `exclusive_data_type_fid_map={"colname":"int" or "int64" or "float" or "float64" or "str" or "long"}`

In [49]:
from pipeline.component import DataTransform

data_transform_0 = DataTransform(name="data_transform_0")
# set data transform parameter
data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(
    with_label=True, label_name='target', label_type="int",
    missing_fill=True, missing_fill_method="designated", default_value=[0.0, "A", "1.0"],
    outlier_replace=True, outlier_replace_method="designated", outlier_replace_value=[0.0, "A", "1.0"],
    data_type="float", exclusive_data_type={"x2":"str", "x3":"str"}
)

## One hot encode

`OneHotEncoder` 使用 `component_param(params)` 設定參數

In [53]:
from pipeline.component import OneHotEncoder

onehot_encoder_0 = OneHotEncoder(name="onehot_encoder_0")

onehot_encoder_0.get_party_instance(role='guest', party_id=9999).component_param(
    transform_col_indexes=[3, 4], transform_col_names=["x2", "x3"]
)

## 執行整串pipeline

執行 `pipeline.compile()` & `pipeline.fit()` 看看資料轉換的成效

In [54]:
from pipeline.backend.pipeline import PipeLine
from pipeline.interface import Data

pipeline = PipeLine() \
        .set_initiator(role='guest', party_id=9999) \
        .set_roles(guest=9999)

pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(onehot_encoder_0, data=Data(data=data_transform_0.output.data))
pipeline.compile()
pipeline.fit()

[32m2023-06-05 09:50:17.739[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202306050950175839430
[0m
[32m2023-06-05 09:50:17.746[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[32m2023-06-05 09:50:18.756[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-06-05 09:50:19.770[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-06-05 09:50:19.772[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component reader_0, time elapse: 0:00:02[0m
[32m2023-06-05 09:50:20.787[0m | [1mINFO    

[32m2023-06-05 09:50:54.648[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component data_transform_0, time elapse: 0:00:36[0m
[32m2023-06-05 09:50:55.662[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component data_transform_0, time elapse: 0:00:37[0m
[32m2023-06-05 09:50:56.678[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component data_transform_0, time elapse: 0:00:38[0m
[32m2023-06-05 09:50:57.694[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component data_transform_0, time elapse: 0:00:39[0m
[32m2023-06-05 09:50:58.710[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_sta

In [56]:
pipeline.get_component('onehot_encoder_0').get_output_data()

Unnamed: 0,id,target,x1,x2_B,x2_C,x2_A,x3_0.0,x3_1.0
0,5,1,0.5,0.0,0.0,1.0,0.0,1.0
1,2,0,0.2,1.0,0.0,0.0,1.0,0.0
2,0,0,0.1,0.0,0.0,1.0,1.0,0.0
3,1,1,1.0,1.0,0.0,0.0,0.0,1.0
4,4,1,0.4,0.0,1.0,0.0,1.0,0.0
5,6,0,0.1,0.0,0.0,1.0,0.0,1.0
6,3,0,0.0,0.0,0.0,1.0,0.0,1.0


#### 查看模塊狀況

用 `pipeline.get_component('module_name').get_summary()` 來看結果是否符合想像

In [14]:
import json
data_base = "/data/projects/fate/"
metadata_saved_dir = os.path.join(data_base, "persistence/metadata/1_party_data_transform_tutorial.json")
metedata = json.dumps(pipeline.get_component('data_transform_0').get_summary(), indent=4)
#data = pipeline.get_component('data_transform_0').get_output_data(limit=10)

with open(metadata_saved_dir, "w") as json_file:
    json_file.write(metedata)
                                  
print(f"Write in metadata_saved_dir : {metadata_saved_dir} \n {metedata}")

Write in metadata_saved_dir : /data/projects/fate/persistence/metadata/1_party_data_transform_tutorial.json 
 {
    "missing_fill_info": {
        "missing_impute_rate": {
            "x0": 0.0,
            "x1": 0.0,
            "x2": 0.0017574692442882249,
            "x3": 0.0017574692442882249,
            "x4": 0.0035149384885764497,
            "x5": 0.0017574692442882249,
            "x6": 0.0017574692442882249,
            "x7": 0.0,
            "x8": 0.0,
            "x9": 0.0035149384885764497
        },
        "missing_impute_value": {
            "x0": 0.0,
            "x1": 0.0,
            "x2": 0.0,
            "x3": 0.0,
            "x4": 0.0,
            "x5": 0.0,
            "x6": 0.0,
            "x7": 0.0,
            "x8": 0.0,
            "x9": 0.0
        },
        "missing_value": [
            "",
            "none",
            "null",
            "na",
            "None",
            NaN
        ]
    },
    "outlier_replace_rate": {
        "outlier_repla

#### 查看資料狀況

用 `pipeline.get_component('module_name').get_output_data(limits=n_data)` 來看結果是否符合想像

In [15]:
pipeline.get_component('data_transform_0').get_output_data(limits=10)

Unnamed: 0,id,target,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9
0,262,0,0.853348,0.254488,0.912602,0.728509,-0.831505,0.206332,-0.20336,0.581561,0.268947,-0.504606
1,309,1,-0.318739,-1.347894,-0.396188,-0.365968,-1.348769,-1.24553,-1.218324,-1.207259,-1.284123,-1.005565
2,534,1,-0.962766,0.135613,-0.918334,-0.831639,0.45727,-0.02077,-0.287316,-0.243568,-0.989687,-0.064605
3,172,0,0.522016,-1.406518,0.528365,0.389232,0.90878,0.661808,1.491126,1.036837,0.509996,0.945071
4,129,0,1.317213,1.286918,1.234289,1.245335,-0.213419,0.838655,1.415804,0.892184,0.653979,0.039023
5,289,1,-0.809525,0.07536,-0.833146,-0.740579,-0.901643,-0.999916,-0.944625,-0.800557,0.592503,-0.776144
6,448,1,0.00638,0.441759,0.024984,-0.088042,-1.028767,0.067653,0.507153,-0.284223,-0.69525,-0.516244
7,181,0,2.155897,1.270634,2.062335,2.124291,0.733436,3.207003,1.94689,2.675218,1.936879,2.463465
8,371,1,-0.014328,-1.619844,-0.082245,-0.108082,-0.866574,-0.512506,-0.652408,-0.499832,-0.669366,-0.902492


## two_party_data_preprocessing

In [53]:
import os
guest = 9999
data_base = "/data/projects/fate/"

dense_data = {"name": "titanic_hetero_guest", "namespace": f"experiment"}
dense_data_dir = os.path.join(data_base, "persistence/data/titanic_hetero_guest.csv")

In [7]:
import pandas as pd
dense_df = pd.read_csv(dense_data_dir)
dense_df.head(5)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0


In [8]:
from pipeline.backend.pipeline import PipeLine
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)
partition = 4

pipeline_upload.add_upload_data(file=dense_data_dir,
                                table_name=dense_data["name"],             # table name
                                namespace=dense_data["namespace"],         # namespace
                                head=1, partition=partition)               # data info
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%

[32m2023-06-01 08:37:29.317[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202306010837291722370
[0m
[32m2023-06-01 08:37:29.325[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m





[32m2023-06-01 08:37:30.336[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2023-06-01 08:37:31.354[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2023-06-01 08:37:31.355[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:02[0m
[32m2023-06-01 08:37:32.372[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:03[0m
[32m2023-06-01 08:37:33.390[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2023-0

In [65]:
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, DataTransform, HeteroFeatureSelection, Intersection
from pipeline.interface import Data

In [66]:
pipeline = PipeLine() \
        .set_initiator(role='guest', party_id=9999) \
        .set_roles(guest=9999)

In [67]:
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
    table={"name": "titanic_hetero_guest", "namespace": "experiment"})


In [68]:
hetero_feature_selection_0 = HeteroFeatureSelection(name="hetero_feature_selection_0")
hetero_feature_selection_0.get_party_instance(role='guest', party_id=9999).component_param(
    filter_methods=["manually"], manually_param={"filter_out_names": ["PassengerId", "Name"]}
)

In [69]:
pipeline.add_component(reader_0)
pipeline.add_component(hetero_feature_selection_0, data=Data(data=reader_0.output.data))
pipeline.compile()
pipeline.fit()

[32m2023-06-01 09:07:12.129[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202306010907119173010
[0m
[32m2023-06-01 09:07:12.152[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[32m2023-06-01 09:07:13.163[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[32m2023-06-01 09:07:14.174[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:02[0m
[32m2023-06-01 09:07:15.185[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:03[

[32m2023-06-01 09:07:48.977[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component hetero_feature_selection_0, time elapse: 0:00:36[0m
[32m2023-06-01 09:07:49.998[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component hetero_feature_selection_0, time elapse: 0:00:37[0m
[32m2023-06-01 09:07:51.017[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component hetero_feature_selection_0, time elapse: 0:00:38[0m
[32m2023-06-01 09:07:52.034[0m | [1mINFO    [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component hetero_feature_selection_0, time elapse: 0:00:39[0m
[32m2023-06-01 09:07:53.061[0m | [1mINFO    [0m | [36mpipeline.utils.invoke

ValueError: Job is failed, please check out job 202306010907119173010 by fate board or fate_flow cli

In [None]:
pipeline.get_component('hetero_feature_selection_0').get_output_data(limits=10)