In [1]:
from pipeline.component.nn import save_to_fate

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
%%save_to_fate model guest_bottom_image.py
from torch import nn
import torch as t
from torch.nn import functional as F

class ImgBottomNet(nn.Module):
    def __init__(self):
        super(ImgBottomNet, self).__init__()
        self.seq = t.nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=6, out_channels=6, kernel_size=3),
            nn.AvgPool2d(kernel_size=5)
        )
        
        self.fc = t.nn.Sequential(
            nn.Linear(1176, 32),
            nn.ReLU(),
            nn.Linear(32, 8)
        )

    def forward(self, x):
        x = self.seq(x)
        x = x.flatten(start_dim=1)
        x = self.fc(x)
        return x


In [3]:
%%save_to_fate model guest_top_image.py
from torch import nn
import torch as t
from torch.nn import functional as F

class ImgTopNet(nn.Module):
    def __init__(self):
        super(ImgTopNet, self).__init__()
        
        self.fc = t.nn.Sequential(
            nn.Linear(4, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.fc(x)
        return x.flatten()

In [4]:
%%save_to_fate model host_bottom_lstm.py
from torch import nn
import torch as t
from torch.nn import functional as F

class LSTMBottom(nn.Module):
    
    def __init__(self, vocab_size):
        super(LSTMBottom, self).__init__()
        self.word_embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=16, padding_idx=0)
        self.lstm = t.nn.Sequential(
            nn.LSTM(input_size=16, hidden_size=16, num_layers=2, batch_first=True)
        )
        self.act = nn.ReLU()
        self.linear = nn.Linear(16, 8)

    def forward(self, x):
        embeddings = self.word_embed(x)
        lstm_fw, _ = self.lstm(embeddings)
        
        return self.act(self.linear(lstm_fw.sum(dim=1)))    

In [5]:
from federatedml.nn.dataset.image import ImageDataset
from federatedml.nn.dataset.nlp_tokenizer import TokenizerDataset

In [6]:
# flicke image
img_ds = ImageDataset(center_crop=True, center_crop_shape=(224, 224), return_label=True) # return label = True
img_ds.load('/data/flicker/flicker_toy_data/flicker/images/')
# text
txt_ds = TokenizerDataset(return_label=False) 
txt_ds.load('/data/flicker/flicker_toy_data/text.csv')

In [7]:
print(len(img_ds))
print(img_ds[0])
print(img_ds.get_classes())
print(img_ds.get_sample_ids()[0: 10])

215
(tensor([[[0.5059, 0.5176, 0.5137,  ..., 0.4941, 0.5020, 0.5059],
         [0.4980, 0.5020, 0.4980,  ..., 0.4824, 0.5020, 0.5059],
         [0.5059, 0.4863, 0.4902,  ..., 0.4980, 0.4980, 0.5137],
         ...,
         [0.7843, 0.7922, 0.7529,  ..., 0.1412, 0.2078, 0.2196],
         [0.9922, 0.9922, 0.9647,  ..., 0.1176, 0.0941, 0.1333],
         [0.9961, 0.9922, 1.0000,  ..., 0.1647, 0.1294, 0.1373]],

        [[0.5765, 0.5882, 0.5843,  ..., 0.5490, 0.5569, 0.5608],
         [0.5686, 0.5804, 0.5765,  ..., 0.5490, 0.5529, 0.5529],
         [0.5608, 0.5569, 0.5647,  ..., 0.5569, 0.5490, 0.5529],
         ...,
         [0.7961, 0.8039, 0.7490,  ..., 0.1373, 0.1882, 0.2000],
         [0.9961, 0.9961, 0.9608,  ..., 0.1137, 0.1137, 0.1529],
         [0.9922, 0.9922, 1.0000,  ..., 0.1608, 0.1059, 0.1216]],

        [[0.6235, 0.6353, 0.6314,  ..., 0.5922, 0.6000, 0.6118],
         [0.6078, 0.6235, 0.6196,  ..., 0.5804, 0.5882, 0.6000],
         [0.6039, 0.6118, 0.6196,  ..., 0.5843, 0.584

In [8]:
print(len(txt_ds))
print(txt_ds[0]) # word idx
print(txt_ds.get_vocab_size()) # vocab size

215
tensor([  101,  1037,  2158,  1998,  2450,  2729,  2005,  2019, 10527,  2247,
         1996,  2217,  1997,  1037,  2303,  1997,  2300,  1012,   102,     0,
            0,     0,     0,     0,     0,     0])
30522


In [9]:
img_bottom = ImgBottomNet()
lstm_bottom = LSTMBottom(vocab_size=txt_ds.get_vocab_size())
lstm_bottom(t.vstack([txt_ds[0], txt_ds[1]]))  # test forward

tensor([[0.0000, 2.8432, 0.1454, 1.2420, 0.2922, 0.0000, 0.0000, 0.0000],
        [0.0000, 2.6467, 0.6274, 1.1765, 0.1887, 0.0000, 0.0000, 0.0000]],
       grad_fn=<ReluBackward0>)

In [10]:
img_bottom(t.vstack([img_ds[0][0].unsqueeze(dim=0), img_ds[1][0].unsqueeze(dim=0)])) 

tensor([[ 0.0291, -0.0698,  0.1511, -0.0235,  0.1022,  0.0132, -0.0452, -0.0957],
        [ 0.0555, -0.0826,  0.1833, -0.0097,  0.0629,  0.0195, -0.0194, -0.0683]],
       grad_fn=<AddmmBackward0>)

In [11]:
import os
import torch as t
from torch import nn
from pipeline import fate_torch_hook
from pipeline.component import HeteroNN
from pipeline.component.hetero_nn import DatasetParam
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, Evaluation, DataTransform
from pipeline.interface import Data, Model
from pipeline.component.nn import save_to_fate

fate_torch_hook(t)

fate_project_path = os.path.abspath('/data/')
guest = 10000
host = 9999

pipeline_mix = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host)

guest_data = {"name": "flicker_guest", "namespace": "experiment"}
host_data = {"name": "flicker_host", "namespace": "experiment"}

guest_data_path = fate_project_path + '/flicker/flicker_toy_data/flicker/images'
host_data_path = fate_project_path + '/flicker/flicker_toy_data/text.csv'

print(guest_data_path)
print(host_data_path)

pipeline_mix.bind_table(name='flicker_guest', namespace='experiment', path=guest_data_path)
pipeline_mix.bind_table(name='flicker_host', namespace='experiment', path=host_data_path)

[32m2023-05-21 13:24:08.477[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36m<module>[0m:[36m29[0m - [31m[1mAn error has been caught in function '<module>', process 'MainProcess' (225), thread 'MainThread' (140635839182656):[0m
[33m[1mTraceback (most recent call last):[0m

  File "/data/projects/fate/env/python/miniconda/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
           │         │     └ {'__name__': '__main__', '__doc__': 'Entry point for launching an IPython kernel.\n\nThis is separate from the ipykernel pack...
           │         └ <code object <module> at 0x7fe8543560e0, file "/data/projects/fate/env/python/venv/lib/python3.8/site-packages/ipykernel_laun...
           └ <function _run_code at 0x7fe85432c550>
  File "/data/projects/fate/env/python/miniconda/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
         │     └ {'__name__': '__main__', '__doc__': 'Entry point for la

/data/flicker/flicker_toy_data/flicker/images
/data/flicker/flicker_toy_data/text.csv


ValueError: Cannot bind table, error msg is Connection refused, Please check if the fate flow service is started

In [None]:
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_data)
reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_data)

In [None]:
hetero_nn_0 = HeteroNN(name="hetero_nn_0", epochs=5,
                       interactive_layer_lr=0.001, batch_size=64, validation_freqs=1, task_type='classification')
guest_nn_0 = hetero_nn_0.get_party_instance(role='guest', party_id=guest)
host_nn_0 = hetero_nn_0.get_party_instance(role='host', party_id=host)

In [None]:
guest_bottom = t.nn.Sequential(
    nn.CustModel(module_name='guest_bottom_image', class_name='ImgBottomNet')
)

guest_top = t.nn.Sequential(
    nn.CustModel(module_name='guest_top_image', class_name='ImgTopNet')
)
# bottom model
host_bottom = nn.CustModel(module_name='host_bottom_lstm', class_name='LSTMBottom', vocab_size=txt_ds.get_vocab_size())

interactive_layer = t.nn.InteractiveLayer(out_dim=4, guest_dim=8, host_dim=8, host_num=1)

In [None]:
guest_nn_0.add_top_model(guest_top)
guest_nn_0.add_bottom_model(guest_bottom)
host_nn_0.add_bottom_model(host_bottom)
optimizer = t.optim.Adam(lr=0.001)
loss = t.nn.BCELoss()

hetero_nn_0.set_interactive_layer(interactive_layer)
hetero_nn_0.compile(optimizer=optimizer, loss=loss)

In [None]:
guest_nn_0.add_dataset(DatasetParam(dataset_name='image', return_label=True, center_crop=True, center_crop_shape=(224, 224), label_dtype='float'))
host_nn_0.add_dataset(DatasetParam(dataset_name='nlp_tokenizer', return_label=False))

In [None]:
pipeline_mix.add_component(reader_0)
pipeline_mix.add_component(hetero_nn_0, data=Data(train_data=reader_0.output.data))
pipeline_mix.compile()

<pipeline.backend.pipeline.PipeLine at 0x7fb069c377f0>

In [None]:
pipeline_mix.fit()

[32m2023-05-21 13:17:19.238[0m | [31m[1mERROR   [0m | [36m__main__[0m:[36m<module>[0m:[36m1[0m - [31m[1mAn error has been caught in function '<module>', process 'MainProcess' (703), thread 'MainThread' (140398834571072):[0m
[33m[1mTraceback (most recent call last):[0m

  File "/data/projects/fate/env/python/venv/lib/python3.8/site-packages/fate_client-1.11.0-py3.8.egg/pipeline/utils/invoker/job_submitter.py", line 46, in submit_job
    raise ValueError(f"retcode err, callback result is {result}")

[31m[1mValueError[0m:[1m retcode err, callback result is {'jobId': '202305211316365209900', 'retcode': 103, 'retmsg': 'Traceback (most recent call last):\n  File "/data/projects/fate/fateflow/python/fate_flow/scheduler/dag_scheduler.py", line 142, in submit\n    raise Exception("create job failed", response)\nException: (\'create job failed\', {\'guest\': {10000: {\'data\': {\'components\': {\'hetero_nn_0\': {\'need_run\': True}, \'reader_0\': {\'need_run\': True}}}, \'re

ValueError: job submit failed, err msg: {'jobId': '202305211316365209900', 'retcode': 103, 'retmsg': 'Traceback (most recent call last):\n  File "/data/projects/fate/fateflow/python/fate_flow/scheduler/dag_scheduler.py", line 142, in submit\n    raise Exception("create job failed", response)\nException: (\'create job failed\', {\'guest\': {10000: {\'data\': {\'components\': {\'hetero_nn_0\': {\'need_run\': True}, \'reader_0\': {\'need_run\': True}}}, \'retcode\': 0, \'retmsg\': \'success\'}}, \'host\': {9999: {\'retcode\': 100, \'retmsg\': "BusyError(\'BusyError: database is locked\')"}}})\n'}