Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisTemplate appears Connection is closed in Thread #39811

Closed
yibird opened this issue Mar 1, 2024 · 1 comment
Closed

RedisTemplate appears Connection is closed in Thread #39811

yibird opened this issue Mar 1, 2024 · 1 comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid

Comments

@yibird
Copy link

yibird commented Mar 1, 2024

Redis:7.2.4
SpringBoot-redis-data-starter:3.2.3

I used RedisTemplate to implement a blocking queue and used RedisTemplate.execute() to execute script commands. However, when I create a thread in the test method and call it, Caused by: io.lettuce.core.RedisException: Connection will appear. is closed, I don’t know what’s going on, please help.

application.yaml

spring:
  data:
    redis:
      # redis server主机地址
      host: 192.168.198.131
      # redis server端口
      port: 6379
      # redis server username
      username:
      # redis server password
      password:
      # redis 连接超时时间
      timeout: 5000
      # redis lettuce客户端配置,使用lettuce需要添加commons-pool2依赖,lettuce连接池基于commons-pool2
      lettuce:
        # lettuce 连接池配置
        pool:
          # lettuce连接池最大连接数,默认8
          max-active: 8
          # lettuce连接池最大空闲连接数,默认0
          max-idle: 8
          # lettuce连接池最小空闲连接数,默认0
          min-idle: 1
          # # lettuce连接池最大等待时间(单位毫秒),默认-1ms
          max-wait: 1000
#      jedis:
#        pool:
#          # pool连接池最大连接数,默认8
#          max-active: 8
#          # pool连接池最大空闲连接数,默认0
#          max-idle: 0
#          # pool连接池最小空闲连接数,默认0
#          min-idle: 0
#          # pool连接池最大等待时间(单位毫秒),默认-1ms
#          max-wait: -1

RedisConfiguration:

package com.fly.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.*;

/**
 * Redis配置类
 * @author zchengfeng
 */
@Configuration
public class RedisConfiguration {

    /**
     * RedisTemplate是SpringBoot Redis整合包提供的用于操作Redis的模板类,提供了一列类操作
     * Redis的API,默认采用JdkSerialization进行序列化,set后的数据实际上存储的是
     * 二进制字节码,可读性非常差,通过自定义RedisTemplate
     *
     * @param factory redis连接工厂
     * @return RedisTemplate
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();


        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
        // Jackson2Json序列化
        GenericJackson2JsonRedisSerializer jackson2JsonRedisSerializer =new GenericJackson2JsonRedisSerializer(objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        /**
         * 设置key采用String序列化方式,
         * Redis存取默认使用JdkSerializationRedisSerializer序列化,
         * 这种序列化会key的前缀添加奇怪的字符,例如\xac\xed\x00\x05t\x00user_id,
         * 使用StringRedisSerializer序列化可以去掉这种字符
         */
        template.setKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 设置连接工厂
        template.setConnectionFactory(factory);
        template.afterPropertiesSet();
        /*
         * 开启Redis事务,默认是关闭的。也可以手动开启事务,
         * 通过template.multi()开启事务,template.exec()关闭事务
         */
//        template.setEnableTransactionSupport(false);
        return template;
    }
}

queue.class:

package com.fly.structure.list;

import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.NoSuchElementException;

/**
 * @Description 分布式阻塞队列
 * @Author zchengfeng
 * @Date 2024/2/26 01:07:29
 */
@Component
public class DistributedBlockingQueue<E> {
    private final RedisTemplate<String, Object> redisTemplate;
    private ListOperations<String, Object> listOperations;
    private final static String PREFIX = "blockingQueue::";
    /**
     * 队列名称,可能存在多个队列,使用队列名称区分,队列名称也会作为Redis List结构的key
     */
    private String queueName;

    public DistributedBlockingQueue(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void setQueueName(String name) {
        this.queueName = PREFIX + name;
    }


    /**
     * 获取ListOperations实例用于List结构,避免多次调用opsForList()
     * 获取ListOperations实例
     *
     * @return ListOperations实例
     */
    public ListOperations<String, Object> getListOperations() {
        listOperations = listOperations == null ? redisTemplate.opsForList() : listOperations;
        return listOperations;
    }

    /**
     * 检查队列名是否为null,为null将抛出NullPointerException
     */
    public void checkQueueName() {
        if (queueName == null) {
            throw new NullPointerException("queueName is empty");
        }
    }

    /**
     * 获取队列的长度
     *
     * @return 队列的长度
     */
    public Long size() {
        checkQueueName();
        return getListOperations().size(queueName);
    }

    /**
     * 向队列尾部添加元素(入队),添加成功返回true,否则返回false
     *
     * @param e 被添加元素
     * @return 添加结果
     */
    public boolean add(E e) {
        checkQueueName();
        getListOperations().leftPush(queueName, e);
//        try {
//            getListOperations().leftPush(queueName, e);
//        } catch (Exception ex) {
//            System.out.println("ex:" + ex);
//            return false;
//        }
        return true;
    }


    /**
     * 从队列头部删除元素(出队),remove与poll()的区别在于,此方法不是阻塞式的,如果此队列为空,则抛出异常。
     *
     * @return 被删除的元素
     */
    public E remove() {
        if (size() == 0) {
            throw new NoSuchElementException();
        }
        return (E) getListOperations().rightPop(queueName, 1);
    }

    /**
     * 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
     *
     * @return 返回队列的头部元素
     */
    public E poll() {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            String command = "BRPOP";
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            byte[] timeout = "0".getBytes();
            return conn.execute(command, keyArg, timeout);
        });
        return (E) element;
    }


    /**
     * 删除队列第一个元素并返回该元素,如果队列为空,则阻塞等待直到队列有元素。
     *
     * @param timeout 阻塞的超时时间,单位ms
     * @return 返回队列的头部元素
     */
    public E poll(Long timeout) {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
            buffer.putLong(timeout);
            // 执行 BRPOP KEY timeout 命令
            return conn.execute("BRPOP", keyArg, buffer.array());
        });
        return (E) element;
    }

    /**
     * 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
     *
     * @return 返回队列的尾部元素
     */
    public E peek() {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            return conn.execute("BLPOP", keyArg);
        });
        return (E) element;
    }

    /**
     * 删除队列最后一个元素并返回该元素,如果此队列为空,则阻塞等待直到队列有元素。
     *
     * @param timeout 阻塞的超时时间,单位ms
     * @return 返回队列的尾部元素
     */
    public E peek(long timeout) {
        // 执行redis命令获取结果
        final Object element = redisTemplate.execute((RedisCallback<Object>) conn -> {
            byte[] keyArg = queueName.getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
            buffer.putLong(timeout);
            // 执行 BLPOP KEY timeout 命令
            return conn.execute("BLPOP", keyArg, buffer.array());
        });
        return (E) element;
    }

    /**
     * 获取队列中所有元素。range start stop命令用于获取指定范围的元素,
     * 当stop为-1时表示获取至队列末尾。
     *
     * @return 队列中所有元素
     */
    public List<E> getItems() {
        checkQueueName();
        return (List<E>) getListOperations().range(queueName, 0, -1);
    }

    /**
     * 清除队列中所有元素。LTRIM key start stop用于修剪list中start至stop的元素,
     * 如果start大于stop就可以达到清除所有元素的效果
     */
    public void clear() {
        checkQueueName();
        getListOperations().trim(queueName, 1, 0);
    }
}

test.class:

package com.fly.structure.list;

import com.fly.RedisApplication;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @Description DistributedBlockingQueue测试类
 * @Author zchengfeng
 * @Date 2024/2/26 01:32:37
 */
@SpringBootTest(classes = RedisApplication.class)
public class DistributedBlockingQueueTest {
    @Autowired
    DistributedBlockingQueue<String> queue;

    @Test
    public void blockingQueueTest() {
        queue.setQueueName("bQueue");
        System.out.println("xxx:" + queue.add("item1")); // OK
        // 队列消费者
        new Thread(() -> {
            System.out.println("xxx:" + queue.add("item1")); // ERROR,Connection is closed
        }).start();
    }
}

Connection is closed when used internally in Thread, but it is normal when used externally.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 1, 2024
@wilkinsona
Copy link
Member

Spring Boot isn't involved at this level of Spring Data Redis and Lettuce's behavior. If you're looking for some help, Stack Overflow is a better place to ask. Or, if you believe you have found a bug, please open a Spring Data Redis issue.

@wilkinsona wilkinsona closed this as not planned Won't fix, can't repro, duplicate, stale Mar 1, 2024
@wilkinsona wilkinsona added status: invalid An issue that we don't feel is valid for: stackoverflow A question that's better suited to stackoverflow.com and removed status: waiting-for-triage An issue we've not yet triaged labels Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants