Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

高并发下的异步操作的问题 #2148

Closed
zhou-hao opened this issue Jun 5, 2019 · 19 comments

Comments

2 participants
@zhou-hao
Copy link

commented Jun 5, 2019

在执行大量的 异步操作时,速度很慢,而且大量报错。nettyThreads和'connection pool'也不能无限增加吧. 而且报错都集中在线程 eventloop-thread-1

org.redisson.client.RedisTimeoutException: Unable to get connection! Try to increase 'nettyThreads' and 'connection pool' settings or set decodeInExecutor = true and increase 'threads' settingNode source: NodeSource [slot=null, addr=null, redisClient=null, redirect=null, entry=MasterSlaveEntry [masterEntry=[freeSubscribeConnectionsAmount=1, freeSubscribeConnectionsCounter=value:50:queue:0, freeConnectionsAmount=32, freeConnectionsCounter=value:32:queue:1, freezed=false, freezeReason=null, client=[addr=redis://127.0.0.1:6379], nodeType=MASTER, firstFail=0]]], command: (HGET), params: [test-map, PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 256)] after 3 retry attempts
	at org.redisson.command.CommandAsyncService$6.run(CommandAsyncService.java:703)
	at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:682)
	at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:757)
	at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:485)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce or test case

    public static RedissonClient newRedissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress(System.getProperty("redis.host", "redis://127.0.0.1:6379"))
                .setDatabase(0)
                .setTimeout(10000)
                .setConnectionPoolSize(1024)
                .setConnectTimeout(10000);
        config.setThreads(32);
        config.setNettyThreads(32);

        return Redisson.create(config);
    }


    @SneakyThrows
    public static void main(String[] args) {
        RedissonClient client = newRedissonClient();

        RMap<String, Object> map = client.getMap("test-map");
        map.put("key1", "value1");
        map.put("key2", "value2");
        map.put("key3", "value3");
        CountDownLatch latch = new CountDownLatch(100000);
        for (int i = 0; i < 100000; i++) {
            int fi = i;
            map.getAsync("key1")
                    .whenComplete((val, err) -> {
                        System.out.println(val + " => " + fi+" =>"+Thread.currentThread().getName());

                        if(null!=err){
                            err.printStackTrace();
                        }
                        latch.countDown();

                    });
        }
        latch.await();
        client.shutdown();
    }

Redis version

5.0.4

Redisson version

3.10.6

Redisson configuration

 Config config = new Config();
        config.useSingleServer()
                .setAddress(System.getProperty("redis.host", "redis://127.0.0.1:6379"))
                .setDatabase(0)
                .setTimeout(10000)
                .setConnectionPoolSize(1024)
                .setConnectTimeout(10000);
        config.setThreads(32);
        config.setNettyThreads(32);
@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 5, 2019

这其实是对并发和异步操作的误解。这跟往水池注水是一样的道理。异步操作就是把原来的直管换成了水池给了你一个缓冲的空间,但是长期进水的速度大于出水的速度,迟早会把水池装满。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 5, 2019

@jackygurui 嗯,其实我的疑问其实就是,在使用同步的时候,速度很快没问题。但是使用了异步的时候,却一堆错。。。 场景是在收到tcp请求的时候, 要先鉴权。 但是我想异步鉴权,不想阻塞tcp服务的eventloop。于是想直接用redisson的 getAsync。。。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 5, 2019

这就需要调整你水池的大小了。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 5, 2019

@jackygurui 并发可能很高,也不可能无限往上加吧? 不行的话,我用单独的线程池去做吧。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 5, 2019

问题还是源头上,源头流量太大了,出口的速度跟不上,还是会有问题。单独的线程池相当于增加了一个水池,只是暂时隐藏了这个问题,不是真正解决它。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 6, 2019

是滴 😂。 其实现在用同步也没啥问题。 我再研究研究有没有其他方法吧。

@zhou-hao zhou-hao closed this Jun 6, 2019

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 11, 2019

@jackygurui 还有个疑问,如果并发大量redis异步请求,能否支持将请求队列起来,而不是超时报错。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 11, 2019

内部已经是将请求队列化了。问题是在规定的时间/尝试次数内如果不能把消息发送出去,就会触发这个报错。如果你不想看到这个报错,可以调整相关参数延长这个报错的时间。但是这仅仅是一种掩耳盗铃的处理办法,这后续产生影响也需要重新评估。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 11, 2019

@jackygurui 我只是想要保证所有都能被执行。。。 类似线程池。 现在的问题就是,相同并发下,使用同步获取没问题很快。使用异步就一堆超时。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 11, 2019

800个请求。 同步获取时,1秒左右就执行完。 使用异步时却一堆超时。超时时间设置的10s。 总感觉怪怪的,就算同时请求的redis操作超过了连接数量,应该也不会这样啊。 我只是要保证这些请求全部都能执行(除非redis挂了)。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 11, 2019

其实还是对同步异步的一个误解。异步操作不是为了加速同步操作而产生的。

通常情况下,在出口速度固定不变的时候,采用异步操作的目的是减少请求线程挂起的时间,不是为了让出口速度变得更快。相反,额外增加的流量控制等额外操作会在某种程度上降低出口的速度。异步操作中利用线程池和队列可以起到一个缓冲的,但是这个缓冲总是有限制的,当这个缓冲区满了,必然会需要一个处理的机制。总的来讲处理机制无非就两种:驱逐老的或拒绝新的。在驱逐老的时候,就需要依靠超时来判定是否该驱逐,满足条件时就会产生一条超时报错。在没有老的可以驱逐时,就需要拒绝新。这时会产生一条无法排队的报错。Redisson中,这个缓冲区不是一个直接量化的物理空间,而是超时时间,重试次数和重试间隔这几个参数的综合反映。因此如果想要把这个空间的某个维度增大,就需要调整相应的参数了。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 11, 2019

所以redisson现在是使用超时和重试来实现了缓冲? 未来有没有可能提供可拓展的方式,让用户来拓展这个策略?我其实想要的只是 我提交了一个redis命令,那么这个命令在连接池还活着的情况下,就一定会被执行,可以通过RFuture cancel来取消执行或者可以单独指定这个命令的超时时间。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 11, 2019

我使用异步的目的,就是不想阻塞主线程,主线程是一个eventLoop。 并发会很大,也不能因为redis超时就直接断开客户端连接,因为客户端会一直重试。我的业务接口也是返回的一个CompletionStage来进行异步处理。 所以我想直接使用redisson的异步处理。但是redisson 又是使用重试和超时来实现缓冲redis请求,于是就有来一堆异常,通过调整重试和超时时间感觉不能完全满足这个场景。 我现在是使用单独的线程池+redisson同步操作来缓冲大量请求,所以想知道redisson 原厂有没有可能实现。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 11, 2019

你试试调整重试次数(retryAttempts)就知道了。

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 12, 2019

重试就存在间隔了吧。 感觉使用队列的方式更靠谱啊。 我测试了一下,1000个map.get请求, 使用单独的线程池去调用rmap.get(key)1秒不到执行完。 而使用rmap.getAsync(key) 则需要10秒左右。 这不太是我想要的结果啊。。。

测试代码

public static RedissonClient newRedissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress(System.getProperty("redis.host", "redis://127.0.0.1:6379"))
                .setDatabase(0)
                .setTimeout(10000)
                .setRetryAttempts(1000)
                .setRetryInterval(10)
                .setConnectionPoolSize(1024)
                .setConnectTimeout(10000);
        config.setThreads(32);
        config.setNettyThreads(32);

        return Redisson.create(config);
    }

    @SneakyThrows
    public static void main(String[] args) {
        RedissonClient client = newRedissonClient();
        ExecutorService executorService = Executors.newFixedThreadPool(32);

        RMap<String, String> map = client.getMap("test-map");
        map.put("key1", "value1");
        map.put("key2", "value2");
        map.put("key3", "value3");

        CountDownLatch latch = new CountDownLatch(1000);
        long startWith = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            CompletableFuture
                    .supplyAsync(() -> map.get("key1"), executorService)
                    .thenRun(latch::countDown);
        }
        System.out.println("executorService:" + (System.currentTimeMillis() - startWith) + "ms");
        latch.await();

        CountDownLatch latch2 = new CountDownLatch(1000);
        startWith = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            map.getAsync("key1")
                    .thenRun(latch2::countDown);
        }
        latch2.await();
        System.out.println("async:" + (System.currentTimeMillis() - startWith) + "ms");
        executorService.shutdown();
        client.shutdown();
    }

@zhou-hao zhou-hao changed the title 高并发下的异步操作都使用问题 高并发下的异步操作的问题 Jun 12, 2019

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 12, 2019

    public static RedissonClient newRedissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress(System.getProperty("redis.host", "redis://127.0.0.1:6379"))
                .setDatabase(0)
                .setTimeout(10000)
                .setRetryAttempts(1000)
                .setRetryInterval(2)
                .setConnectionPoolSize(64)
                .setConnectTimeout(10000);
        config.setThreads(32);
        config.setCodec(StringCodec.INSTANCE);
        config.setNettyThreads(32);

        return Redisson.create(config);
    }

    public static void main(String[] args) throws InterruptedException {
        RedissonClient client = newRedissonClient();
        ExecutorService executorService = Executors.newFixedThreadPool(32);

        RMap<String, String> map = client.getMap("test-map");
        map.put("key1", "value1");
        map.put("key2", "value2");
        map.put("key3", "value3");

        CountDownLatch latch = new CountDownLatch(1000);
        long startWith = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            CompletableFuture
                    .supplyAsync(() -> map.get("key1"), executorService)
                    .thenRun(latch::countDown);
        }
        System.out.println("executorService:" + (System.currentTimeMillis() - startWith) + "ms");
        latch.await();

        CountDownLatch latch2 = new CountDownLatch(1000);
        startWith = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            map.getAsync("key1")
                    .thenRun(latch2::countDown);
        }
        latch2.await();
        System.out.println("async:" + (System.currentTimeMillis() - startWith) + "ms");
        executorService.shutdown();
        client.shutdown();
    }

第一次

[main] INFO org.redisson.Version - Redisson 3.10.6
[redisson-netty-1-19] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:6379
[redisson-netty-1-30] INFO org.redisson.connection.pool.MasterConnectionPool - 32 connections initialized for 127.0.0.1/127.0.0.1:6379
executorService:166ms
async:128ms

第二次

[main] INFO org.redisson.Version - Redisson 3.10.6
[redisson-netty-1-32] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:6379
[redisson-netty-1-3] INFO org.redisson.connection.pool.MasterConnectionPool - 32 connections initialized for 127.0.0.1/127.0.0.1:6379
executorService:167ms
async:164ms

第三次

[main] INFO org.redisson.Version - Redisson 3.10.6
[redisson-netty-1-3] INFO org.redisson.connection.pool.MasterConnectionPool - 32 connections initialized for 127.0.0.1/127.0.0.1:6379
[redisson-netty-1-32] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:6379
executorService:175ms
async:161ms

第四次

[main] INFO org.redisson.Version - Redisson 3.10.6
[redisson-netty-1-2] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:6379
[redisson-netty-1-3] INFO org.redisson.connection.pool.MasterConnectionPool - 32 connections initialized for 127.0.0.1/127.0.0.1:6379
executorService:166ms
async:166ms

第五次

[main] INFO org.redisson.Version - Redisson 3.10.6
[redisson-netty-1-32] INFO org.redisson.connection.pool.MasterPubSubConnectionPool - 1 connections initialized for 127.0.0.1/127.0.0.1:6379
[redisson-netty-1-17] INFO org.redisson.connection.pool.MasterConnectionPool - 32 connections initialized for 127.0.0.1/127.0.0.1:6379
executorService:155ms
async:160ms
@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 12, 2019

我把重试间隔降到了最低,把连接池大小降下来了,太多了也不是什么好事。另外你所说的性能差别会不会是由于GC引起的?

@zhou-hao

This comment has been minimized.

Copy link
Author

commented Jun 13, 2019

重试间隔设置太小会导致订阅报错:

org.redisson.client.RedisTimeoutException
	at org.redisson.pubsub.PublishSubscribeService$3.run(PublishSubscribeService.java:235)
	at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:682)
	at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:757)
	at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:485)
	at java.lang.Thread.run(Thread.java:748)

而且减小重试间隔会不会导致其他问题? 比如在 redis连接较慢 或者异常时。 不停去重试。

@jackygurui

This comment has been minimized.

Copy link
Member

commented Jun 13, 2019

遇到需要在不同场景使用不同重试策略的情况,可以使用RBatch的配置对象实现。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.