In [None]:
import os
import pandas as pd
import numpy as np

from pipeline.backend.pipeline import PipeLine
from pipeline.component.dataio import DataIO
from pipeline.component.homo_lr import HomoLR
from pipeline.component.reader import Reader
from pipeline.interface.data import Data
from pipeline.interface.model import Model
from pipeline.component.evaluation import Evaluation
from pipeline.component.scale import FeatureScale
from pipeline.utils.tools import load_job_config
from pipeline.runtime.entity import JobParameters

from sklearn.preprocessing import RobustScaler


# Simple Feature Engineering

In [None]:
df = pd.read_csv('/Examples/Pipeline/notebooks/creditcard_guest.csv')

robust_scale = RobustScaler()
df['Scaled_Amount'] = robust_scale.fit_transform(df['Amount'].values.reshape(-1,1))
df['Scaled_Time'] = robust_scale.fit_transform(df['Time'].values.reshape(-1,1))
IDs = [i + 1 for i in range(len(df))]

Scaled_Amount = df['Scaled_Amount']
Scaled_Time = df['Scaled_Time']
df.drop(['Scaled_Amount','Scaled_Time'], axis=1, inplace=True)
df.insert(0, 'id', IDs)
df.insert(1, 'Scaled_Amount', Scaled_Amount)
df.insert(2, 'Scaled_Time', Scaled_Time)

df.drop(['Time','Amount'], axis=1, inplace=True)

df.to_csv('/Examples/Pipeline/notebooks/creditcard_guest_scaled.csv', index=False)

# Upload Data To Local FATE Cluster

In [None]:
guest = 9999

# spark + pulsar
backend = 2
work_mode = 1

partition = 8

guest_train_data = {"name": "dataset_guest", "namespace": f"experiment"}

guest_eval_data = {"name": "dataset_guest", "namespace": f"experiment"}

pipeline_upload = PipeLine().set_initiator(role="guest", party_id=guest).set_roles(guest=guest)
# add upload data info
# original csv file path
pipeline_upload.add_upload_data(file=os.path.join('/Examples/Pipeline/notebooks/creditcard_guest_scaled.csv'),
                                table_name=guest_train_data["name"],
                                namespace=guest_train_data["namespace"],
                                head=1, partition=partition)
# upload all data
pipeline_upload.upload(work_mode=work_mode, backend=backend, drop=1)

# Define the Components of Training Pipeline

In [None]:
# parties config
guest = 9999 
host = 10000
arbiter = 10000

host_train_data = {"name": "dataset_host", "namespace": f"experiment"}

host_eval_data = {"name": "dataset_host", "namespace": f"experiment"}

# define Reader components to read in data
reader_0 = Reader(name="reader_0")
# configure Reader for guest
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
# configure Reader for host
reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_train_data)

reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_eval_data)
reader_1.get_party_instance(role='host', party_id=host).component_param(table=host_eval_data)
# define DataIO components
dataio_0 = DataIO(name="dataio_0", with_label=True, output_format="dense", label_name='Class')  # start component numbering at 0
dataio_1 = DataIO(name="dataio_1", with_label=True, output_format="dense", label_name='Class')  # start component numbering at 0

param = {
    "penalty": "L2",
    "optimizer": "sgd",
    "tol": 1e-05,
    "alpha": 0.01,
    "max_iter": 5,
    "early_stop": "diff",
    "batch_size": 50000,
    "learning_rate": 0.15,
    "validation_freqs": 1,
    "init_param": {
        "init_method": "zeros"
    },
    "encrypt_param": {
        "method": None
    },
    "cv_param": {
        "n_splits": 4,
        "shuffle": True,
        "random_seed": 33,
        "need_cv": False
    }
}

homo_lr_0 = HomoLR(name='homo_lr_0', **param)

# Compose and Submit the Pipeline of Training

In [None]:
# initialize pipeline
pipeline = PipeLine()

# set job initiator
pipeline.set_initiator(role="guest", party_id=guest)
# set participants information
pipeline.set_roles(guest=guest, host=host, arbiter=arbiter)

# add components to pipeline, in order of task execution
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)

pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))
pipeline.add_component(dataio_1, data=Data(data=reader_1.output.data),
                       model=Model(dataio_0.output.model))

homo_lr_0 = HomoLR(name='homo_lr_0', **param)
pipeline.add_component(homo_lr_0, data=Data(train_data=dataio_0.output.data,
                                                validate_data=dataio_1.output.data))

evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
evaluation_0.get_party_instance(role='host', party_id=host).component_param(need_run=False)
pipeline.add_component(evaluation_0, data=Data(data=homo_lr_0.output.data))

# compile pipeline once finished adding modules, this step will form conf and dsl files for running job
pipeline.compile()

# fit model
job_parameters = JobParameters(backend=backend, work_mode=work_mode)
pipeline.fit(job_parameters)
# query component summary
import json
print(json.dumps(pipeline.get_component("homo_lr_0").get_summary(), indent=4))

# Compose and Submit the Pipeline of Training

In [None]:
# predict
# deploy pipeline model
pipeline.deploy_component([dataio_0, homo_lr_0])

predict_pipeline = PipeLine()
# add data reader onto predict pipeline
predict_pipeline.add_component(reader_1)
# add selected components from train pipeline onto predict pipeline
# specify data source
predict_pipeline.add_component(pipeline,
                                   data=Data(predict_input={pipeline.dataio_0.input.data: reader_1.output.data}))
# run predict model
predict_pipeline.predict(job_parameters)

print(json.dumps(predict_pipeline.get_component("homo_lr_0").get_summary(), indent=4))
