Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get data timeout, key=root:110:ALLGATHER #70

Closed
mingo0117 opened this issue Jul 28, 2022 · 16 comments
Closed

Get data timeout, key=root:110:ALLGATHER #70

mingo0117 opened this issue Jul 28, 2022 · 16 comments
Assignees

Comments

@mingo0117
Copy link

Issue Type

Others

Source

binary

Secretflow Version

latest

OS Platform and Distribution

ubuntu 18.04

Python version

3.8.13

Bazel version

No response

GCC/Compiler version

No response

What happend and What you expected to happen.

2022-07-28 16:16:13,219 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::SPURuntime.run() (pid=13081, ip=10.100.82.74, repr=<secretflow.device.device.spu.SPURuntime object at 0x7f1fd47b1220>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 224, in run
    self.runtime.run(executable)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/spu/binding/api.py", line 43, in run
    return self._vm.Run(executable.SerializeToString())
RuntimeError: what: 
        [external/yasl/yasl/link/transport/channel.cc:86] Get data timeout, key=root:110:ALLGATHER
stacktrace: 
#0 yasl::link::Context::RecvInternal()+0x7f202eb100b2
#1 yasl::link::AllGatherImpl<>()+0x7f202e9c8785
#2 yasl::link::AllGather()+0x7f202e9c8cb4
#3 spu::mpc::Communicator::allReduce()+0x7f202e2c7a37
#4 spu::mpc::semi2k::B2A_Randbit::proc()::{lambda()#1}::operator()()::{lambda()#3}::operator()()+0x7f202e2bd9f2
#5 spu::mpc::semi2k::B2A_Randbit::proc()+0x7f202e2c0a89
#6 spu::mpc::UnaryKernel::evaluate()+0x7f202e19efdb
#7 spu::mpc::Object::call<>()+0x7f202e2c60b8
#8 spu::mpc::(anonymous namespace)::_Lazy2A()+0x7f202e2dfb19
#9 spu::mpc::ABProtAddSP::proc()+0x7f202e2e019b
#10 spu::mpc::BinaryKernel::evaluate()+0x7f202e19f2f2
#11 spu::mpc::Object::call<>()+0x7f202e2c6866
#12 spu::mpc::add_sp()+0x7f202e2c6994
#13 spu::hal::_add_sp()+0x7f202e171b63
#14 spu::hal::_add()+0x7f202e167486
#15 spu::hal::_popcount()+0x7f202e168b8c

Reproduction code to reproduce the issue.

在做三方逻辑回归时,遇到上述报错。似乎和训练的数据量有关系。这块如果代码不调整的话,是否只能升级机器配置或加计算节点优化呢?
@mingo0117
Copy link
Author

mingo0117 commented Jul 29, 2022

以上是semi2k协议的,我后来换成aby3又提示:

Traceback (most recent call last):
  File "/home/ops/anaconda3/envs/secretflow/bin/kernprof", line 8, in <module>
    sys.exit(main())
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 220, in main
    execfile(script_file, ns, ns)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 28, in execfile
    exec(compile(f.read(), filename, 'exec'), globals, locals)
  File "test_cypher_logistic_gegression_3pc.py", line 167, in <module>
    test()
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/line_profiler/line_profiler.py", line 110, in wrapper
    result = func(*args, **kwds)
  File "test_cypher_logistic_gegression_3pc.py", line 153, in test
    losses = sf.reveal(losses)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/driver.py", line 158, in reveal
    value_obj = ray.get(value_ref)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/worker.py", line 1843, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(MemoryError): ray::SPURuntime.run() (pid=32328, ip=10.100.82.134, repr=<secretflow.device.device.spu.SPURuntime object at 0x7f3288065220>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 224, in run
    self.runtime.run(executable)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/spu/binding/api.py", line 43, in run
    return self._vm.Run(executable.SerializeToString())
MemoryError: std::bad_alloc

目前用的3个计算节点,都是8c16g

@6fj
Copy link
Member

6fj commented Jul 29, 2022

Hi @mingo0117 ,

  1. 请问你目前的机器资源是怎样的?
  2. 如果数据集减小的话,能否成功执行呢?

@mingo0117
Copy link
Author

Hi @mingo0117 ,

  1. 请问你目前的机器资源是怎样的?
  2. 如果数据集减小的话,能否成功执行呢?

ray 集群,3台机器,都是8c 16g
数据集调小后报错:

Traceback (most recent call last):
  File "/home/ops/anaconda3/envs/secretflow/bin/kernprof", line 8, in <module>
    sys.exit(main())
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 220, in main
    execfile(script_file, ns, ns)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/kernprof.py", line 28, in execfile
    exec(compile(f.read(), filename, 'exec'), globals, locals)
  File "test_cypher_logistic_gegression_3.py", line 152, in <module>
    test()
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/line_profiler/line_profiler.py", line 110, in wrapper
    result = func(*args, **kwds)
  File "test_cypher_logistic_gegression_3.py", line 139, in test
    losses = sf.reveal(losses)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/driver.py", line 158, in reveal
    value_obj = ray.get(value_ref)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/ray/worker.py", line 1845, in get
    raise value
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::SPURuntime.__init__() (pid=4100, ip=10.100.82.173, repr=<secretflow.device.device.spu.SPURuntime object at 0x7fed51ed8400>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 125, in __init__
    self.link = link.create_brpc(desc, rank)
RuntimeError: what: 
        [external/yasl/yasl/link/context.cc:140] connect to mesh failed, failed to setup connection to rank=1
stacktrace: 
#0 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7feda8c28ed7
#1 pybind11::cpp_function::dispatcher()+0x7feda8c150cb
#2 cfunction_call_varargs+0x561b974ea00e

@6fj
Copy link
Member

6fj commented Jul 29, 2022

Hi @mingo0117 ,

看上去SPU device没有建立成功,但你之前应该是成功的,现在请确保

  • 所有ray的节点已经被clean
  • 自行提供spu的config,并确保端口没有被占用。

@6fj
Copy link
Member

6fj commented Jul 29, 2022

请参考 #66

@6fj
Copy link
Member

6fj commented Jul 29, 2022

Hi @mingo0117

请试着用以下方式强制重建spu device:

sf.shutdown()

sf.init(['alice', 'bob', 'carol'], num_cpus=8, log_to_driver=False)

alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')
spu_3pc = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob', 'carol']))

感谢。

@mingo0117
Copy link
Author

Hi @mingo0117

请试着用以下方式强制重建spu device:

sf.shutdown()

sf.init(['alice', 'bob', 'carol'], num_cpus=8, log_to_driver=False)

alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')
spu_3pc = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob', 'carol']))

感谢。

嗯,spu device重建的问题不存在了,我还是想聊下最开始说的这个。我自己解决了,很有意思,但是不明白为什么:

2022-07-28 16:16:13,219 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::SPURuntime.run() (pid=13081, ip=10.100.82.74, repr=<secretflow.device.device.spu.SPURuntime object at 0x7f1fd47b1220>)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/secretflow/device/device/spu.py", line 224, in run
    self.runtime.run(executable)
  File "/home/ops/anaconda3/envs/secretflow/lib/python3.8/site-packages/spu/binding/api.py", line 43, in run
    return self._vm.Run(executable.SerializeToString())
RuntimeError: what: 
        [external/yasl/yasl/link/transport/channel.cc:86] Get data timeout, key=root:110:ALLGATHER
stacktrace: 
#0 yasl::link::Context::RecvInternal()+0x7f202eb100b2
#1 yasl::link::AllGatherImpl<>()+0x7f202e9c8785
#2 yasl::link::AllGather()+0x7f202e9c8cb4
#3 spu::mpc::Communicator::allReduce()+0x7f202e2c7a37
#4 spu::mpc::semi2k::B2A_Randbit::proc()::{lambda()#1}::operator()()::{lambda()#3}::operator()()+0x7f202e2bd9f2
#5 spu::mpc::semi2k::B2A_Randbit::proc()+0x7f202e2c0a89
#6 spu::mpc::UnaryKernel::evaluate()+0x7f202e19efdb
#7 spu::mpc::Object::call<>()+0x7f202e2c60b8
#8 spu::mpc::(anonymous namespace)::_Lazy2A()+0x7f202e2dfb19
#9 spu::mpc::ABProtAddSP::proc()+0x7f202e2e019b
#10 spu::mpc::BinaryKernel::evaluate()+0x7f202e19f2f2
#11 spu::mpc::Object::call<>()+0x7f202e2c6866
#12 spu::mpc::add_sp()+0x7f202e2c6994
#13 spu::hal::_add_sp()+0x7f202e171b63
#14 spu::hal::_add()+0x7f202e167486
#15 spu::hal::_popcount()+0x7f202e168b8c

这个只要把数据源改成官网的案例,就没问题:
features, label = load_breast_cancer(return_X_y=True)
但我想用自己的数据源做LR,于是我写的是:

features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))

然后神奇的事情发生了,数据一直在跑,机器CPU、内存都打满了,最后抛出Get data timeout的报错,更换协议也只是换了一个内存的报错。似乎是pandas读取的问题。后来我看load_breast_cancer源码,没有用pandas,于是我还是换成了np去读取:

features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
label = np.loadtxt(open(y_csv_path, "rb"))

问题解决了... 30多秒出了结果。这是为什么呢?pandas这种读取,我用明文的方式做lr是完全可以的,所以一直没有怀疑到这个读取方式上

@6fj
Copy link
Member

6fj commented Jul 29, 2022

很有趣的现象,可以发一下完整的代码/复现过程吗?感谢!

@mingo0117
Copy link
Author

# ******* logistic regression util start*************************

import timeit

import jax.numpy as jnp
import matplotlib.pyplot as plt
import numpy as np
from jax import jit
from jax import value_and_grad
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


def load_train_dataset(x_csv_path, y_csv_path, party_id=None) -> (np.ndarray, np.ndarray):
    #features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
    #label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))
    features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
    label = np.loadtxt(open(y_csv_path, "rb"))
    # features, label = load_breast_cancer(return_X_y=True)
    scaler = StandardScaler()
    features = scaler.fit_transform(features)
    X_train, _, y_train, _ = train_test_split(
        features, label, test_size=0.2, random_state=42
    )

    if party_id:
        if party_id == 1:
            return X_train[:, 0:6], _
        if party_id == 2:
            return X_train[:, 6:13], _
        else:
            return X_train[:, 13:20], y_train
    else:
        return X_train, y_train


def load_test_dataset(x_csv_path, y_csv_path):
    #features = np.array(pd.read_csv(r'{0}'.format(x_csv_path), sep=',', usecols=range(20)))
    #label = np.array(pd.read_csv(r'{0}'.format(y_csv_path)))
    features = np.loadtxt(open(x_csv_path, "rb"), delimiter=",", usecols=range(20))
    label = np.loadtxt(open(y_csv_path, "rb"))
    # features, label = load_breast_cancer(return_X_y=True)
    scaler = StandardScaler()
    features = scaler.fit_transform(features)
    _, X_test, _, y_test = train_test_split(
        features, label, test_size=0.2, random_state=42
    )
    return X_test, y_test


def sigmoid(x):
    return 1 / (1 + jnp.exp(-x))


# Outputs probability of a label being true.
def predict(W, b, inputs):
    return sigmoid(jnp.dot(inputs, W) + b)


# Training loss is the negative log-likelihood of the training examples.
def loss(W, b, inputs, targets):
    preds = predict(W, b, inputs)
    label_probs = preds * targets + (1 - preds) * (1 - targets)
    return -jnp.mean(jnp.log(label_probs))

@jit
def train_step(W, b, x1, x2, x3, y, learning_rate):
    x = jnp.concatenate([x1, x2, x3], axis=1)
    # x = jnp.hstack([x1, x2, x3])
    # print(x)
    loss_value, Wb_grad = value_and_grad(loss, (0, 1))(W, b, x, y)
    W -= learning_rate * Wb_grad[0]
    b -= learning_rate * Wb_grad[1]
    return loss_value, W, b


def fit(W, b, x1, x2, x3, y, epochs=1, learning_rate=1e-2):
    losses = jnp.array([])
    for _ in range(epochs):
        l, W, b = train_step(W, b, x1, x2, x3, y, learning_rate=learning_rate)
        losses = jnp.append(losses, l)
    return losses, W, b


def validate_model(W, b, X_test, y_test):
    y_pred = predict(W, b, X_test)
    return roc_auc_score(y_test, y_pred)


def plot(losses):
    # 创建Figure对象
    plt.figure(figsize=(20, 12))
    # plt.subplot(1, 2, 1)
    plt.plot(np.arange(len(losses)), losses)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.title("Loss Graph")
    plt.savefig("cypher_logistic_gegression_loss.jpg")


ray_cluster_address = '10.100.82.173:6379'
x_csv_path = '/home/ops/svm_two_class_train_dense_data.csv'
y_csv_path = '/home/ops/svm_two_class_train_dense_label.csv'
learn_rate = 0.001
epochs = 5


# ******* logistic_regression_util end *************************

def test():
    global alice, bob, charlie, spu, device, losses
    import secretflow as sf
    import spu as sp
    # In case you have a running secretflow runtime already.
    sf.shutdown()
    # Standalone
    # sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)
    # Cluster
    sf.init(address=ray_cluster_address)
    alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')
    # Standalone
    # spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))
    # Cluster
    spu = sf.SPU({
        'nodes': [{'party': 'alice',
                   'id': '0',
                   'address': '10.100.82.173:18888', },
                  {'party': 'bob',
                   'id': '1',
                   'address': '10.100.82.74:18888', },
                  {'party': 'charlie',
                   'id': '2',
                   'address': '10.100.82.134:18888', }],
        'runtime_config': {
            'protocol': sp.spu_pb2.SEMI2K,
            'field': sp.spu_pb2.FM128,
        },
    })
    x1, _ = alice(load_train_dataset)(x_csv_path, y_csv_path, party_id=1)
    x2, _ = bob(load_train_dataset)(x_csv_path, y_csv_path, party_id=2)
    x3, y = charlie(load_train_dataset)(x_csv_path, y_csv_path, party_id=3)
    device = spu
    W = jnp.zeros((20,))
    b = 0.0
    W_, b_, x1_, x2_, x3_, y_ = (
        sf.to(device, W),
        sf.to(device, b),
        x1.to(device),
        x2.to(device),
        x3.to(device),
        y.to(device),
    )
    losses, W_, b_ = device(fit, static_argnames=['epochs'], num_returns=3)(
        W_, b_, x1_, x2_, x3_, y_, epochs=epochs, learning_rate=learn_rate
    )
    losses = sf.reveal(losses)
    W = sf.reveal(W_)
    b = sf.reveal(b_)
    print(losses)
    print("losses={0}".format(losses))
    # print("W={0}".format(W))
    # print("b={0}".format(b))
    X_test, y_test = load_test_dataset(x_csv_path, y_csv_path)
    auc = validate_model(W, b, X_test, y_test)
    print(f'auc={auc}')


print("耗时:{0}秒".format(timeit.timeit('test()', setup='from __main__ import test', number=1)))

以上

@mingo0117
Copy link
Author

看 load_train_dataset 这里就行,区别就是注释的地方,您可以随便找个20列的测试集在三方集群试试

@longshan-ant
Copy link
Member

方便用np.array_equal判断一下np.loadtxt和pd.read_csv两个函数读取的数据有没有差异?

@mingo0117
Copy link
Author

image
有。pd.read_csv,我这种写法,第一行漏读了, 没有加header=None

@6fj
Copy link
Member

6fj commented Aug 1, 2022

Hi @mingo0117

As far as we know, serialization is costly in ray. So please use numpy IO API at most time. Please refer to https://docs.ray.io/en/releases-1.11.1/ray-core/serialization.html#serialization.

If you have to use Pandas for IO purpose, please check https://docs.ray.io/en/latest/data/modin/index.html#using-pandas-on-ray-modin as well.

Thanks.

@mingo0117
Copy link
Author

Hi @mingo0117

As far as we know, serialization is costly in ray. So please use numpy IO API at most time. Please refer to https://docs.ray.io/en/releases-1.11.1/ray-core/serialization.html#serialization.

If you have to use Pandas for IO purpose, please check https://docs.ray.io/en/latest/data/modin/index.html#using-pandas-on-ray-modin as well.

Thanks.

谢谢。再请教一个问题:spu是否有协议或者计算相关的日志?如何打开?想看看里面发生了什么

@6fj
Copy link
Member

6fj commented Aug 2, 2022

Hi @mingo0117 ,

首先,你需要通过设置spu的config来开启相应的log:

然后,你需要在secretflow init的时候打开log_to_driver,类似于

import secretflow as sf

sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)

@mingo0117
Copy link
Author

Hi @mingo0117 ,

首先,你需要通过设置spu的config来开启相应的log:

然后,你需要在secretflow init的时候打开log_to_driver,类似于

import secretflow as sf

sf.init(['alice', 'bob'], num_cpus=8, log_to_driver=True)

明白了,非常感谢

ElleryQu pushed a commit to ElleryQu/secretflow that referenced this issue Aug 22, 2023
* Fix build

* Update yacl again
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants