Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

problem when massive pending message #2943

Closed
sinlang opened this issue Sep 28, 2014 · 6 comments
Closed

problem when massive pending message #2943

sinlang opened this issue Sep 28, 2014 · 6 comments

Comments

@sinlang
Copy link

sinlang commented Sep 28, 2014

i use 4.0.21 to send message 1billiion asynchronously , when 1000 message pending(send but not finish, netty future is not done), sendding is fast; but when 100, 000 pending, sending is slowling down.
but the memory is big enough to cache it.

image

@sinlang
Copy link
Author

sinlang commented Sep 28, 2014

my test code

package com.cnc.cms.common.net;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cnc.cms.common.cluster.Host;
import com.cnc.cms.common.cluster.MasterInfo;
import com.cnc.cms.common.cluster.WorkerInfo;
import com.cnc.cms.common.net.impl.serializer.KryoSerializer;
import com.cnc.cms.common.util.NameThreadFactory;

public class QuickNettyTest {

private static int maxQueue = 100000;
private static int messageSize = 1000;
private static int messageNum = 1000 * 10000;

private static Logger logger = LoggerFactory.getLogger(QuickNettyTest.class);

private static MasterInfo master;

private static WorkerInfo worker;

@BeforeClass
public static void init() {
    NetFactory.getSerializerRegistry().register(new KryoSerializer<TaskBean>() {
        @Override
        public Class<TaskBean> getSerialClass() {
            return TaskBean.class;
        }

        @Override
        public int getTypeId() {
            return 30;
        }
    });

    Host host = new Host();
    host.setIp("127.0.0.1");

    master = new MasterInfo();
    master.setInstanceId("master-1");
    master.setPort(55003);
    master.setHosts(Arrays.asList(host));

    worker = new WorkerInfo();
    worker.setInstanceId("worker-1");
    master.setHosts(Arrays.asList(host));
}

@Test
public void server() throws Exception {
    // nio 
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new NameThreadFactory(master.getInstanceId()
            + "-io-boss:"));
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(3, new NameThreadFactory(master.getInstanceId()
            + "-io-worker:"));

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    //
                    if (2 == 1) {
                        System.out.println("cms");
                    }
                }
            });
    try {
        // Start the server.
        ChannelFuture serverFuture = bootstrap.bind(master.getPort()).sync();
        logger.info("server : {} is starting", master);
    }
    catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    Thread.currentThread().join();
}

@Test
public void client() throws Exception {
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(3, new NameThreadFactory(worker.getInstanceId()
            + "-io-worker:"));

    Bootstrap boostrap = new Bootstrap();
    boostrap.group(workerGroup).channel(NioSocketChannel.class)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5 * 1000).option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                    ch.pipeline().addLast("1", new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            // TODO Auto-generated method stub
                            super.channelRead(ctx, msg);
                        }
                    });

                    ch.pipeline().addLast("2", new ChannelOutboundHandlerAdapter() {
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                throws Exception {
                            // TODO Auto-generated method stub
                            super.write(ctx, msg, promise);
                        }
                    });
                }
            });

    ChannelFuture future = boostrap.connect(NetUtil.getSuitableHost(master, worker).getIp(), master.getPort());
    future.sync();
    final Channel channel = future.channel();

    StopWatch stop = new StopWatch();
    stop.start();
    int i = 0;
    String str = StringUtils.rightPad("", messageSize, "-");
    final Semaphore buf = new Semaphore(maxQueue);
    byte[] data = str.getBytes();
    for (; i < messageNum; i++) {
        buf.acquire();

        ChannelFuture f = channel.writeAndFlush(Unpooled.wrappedBuffer(data));
        f.addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                buf.release();
            }
        });

        if (i % 10000 == 0) {
            System.out.println("index :" + i);
        }
    }

    stop.stop();
    double tps = ((double) messageSize) * messageNum / 1024 / 1024 / ((double) (stop.getTime()) / 1000);
    System.out.println(stop.toString() + "   " + tps);
    Thread.sleep(1000);
}

}

@sinlang
Copy link
Author

sinlang commented Sep 28, 2014

it see that when message pendding , a lot of loop addFlush is call
at file
io.netty.channel.ChannelOutboundBuffer

void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See #2577
if (unflushed != tail) {
unflushed = tail;

        final int mask = buffer.length - 1;
        int i = flushed;
        while (i != unflushed && buffer[i].msg != null) {
            Entry entry = buffer[i];
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending);
            }
            i = i + 1 & mask;
        }
    }
}

@sinlang
Copy link
Author

sinlang commented Sep 28, 2014

when i update to 4.0.23, it seem the probleam is fixed ?

@sinlang
Copy link
Author

sinlang commented Sep 28, 2014

4.0.23 is really fix some problem at the method, it's really fast when pendding massive message,
but is still slowly when small message pendding.
so i still need some code to make netty not buffering too much message

@normanmaurer
Copy link
Member

@sinlang so can we close this as fixed ?

@sinlang
Copy link
Author

sinlang commented Sep 29, 2014

@normanmaurer, surely ,

@sinlang sinlang closed this as completed Sep 29, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants