Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In a scenario where both redis lettuce and rocketmq are used, lettuce (redis lettuce's log) is output in rocketmq_client.log, these are different components and netty is used at the same time. #14039

Closed
FrankCy opened this issue May 8, 2024 · 9 comments

Comments

@FrankCy
Copy link

FrankCy commented May 8, 2024

Expected behavior

redis opens the topology, introduces lettuce, and also uses rocketmq, rocketmq_client.log logs only the logs of the rocketmq component.

Actual behavior

Actually rocketmq_client.log records the lettuce log as follows:
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] channelActive()
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] userEventTriggered(ctx, io.lettuce.core.ConnectionEvents$Activated@1358e27d)
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] userEventTriggered(ctx, io.lettuce.core.ConnectionEvents$Activated@1358e27d)
2024-05-06 16:25:23,023 DEBUG io.lettuce.core.protocol.ConnectionWatchdog - [channel=0xe7773456, /10.8.0.27:57227 -> 172.17.48.140/172.17.48.140:6379, last known addr=172.17.48.140/172.17.48.140:6379] userEventTriggered(ctx, io.lettuce.core.ConnectionEvents$Activated@1358e27d)

Steps to reproduce

lettuce-core:5.1.7.RELEASE
spring-data-redis:2.1.9.RELEASE
spring-boot-starter-data-redis:2.2.13.RELEASE
rocketmq-acl:4.6.0
rocketmq-client:4.6.0
rocketmq-common:4.6.0
rocketmq-logging:4.6.0
rocketmq-remoting:4.6.0
rocketmq-spring-boot:2.1.0
rocketmq-spring-boot-starter:2.1.0
rocketmq-srvutil:4.6.0

Minimal yet complete reproducer code (or URL to code)

Netty version

4.1.36.Final.jar

JVM version (e.g. java -version)

JDK:java version "1.8.0_321"

OS version (e.g. uname -a)

Darwin frank 21.3.0 Darwin Kernel Version 21.3.0: Wed Jan 5 21:37:58 PST 2022; root:xnu-8019.80.24~20/RELEASE_ARM64_T8101 arm64

@chrisvest
Copy link
Contributor

Sounds like a problem with how you configured your logs, and not a Netty issue.

@FrankCy
Copy link
Author

FrankCy commented May 8, 2024

Sounds like a problem with how you configured your logs, and not a Netty issue.

This feels like it has something to do with the InternalLoggerFactory encapsulation, both rocketmq and redis have InternalLoggerFactory to facade the logs, could this be the cause of the log write errors? This is not an issue when configured separately.

@chrisvest
Copy link
Contributor

Do any of them call InternalLoggerFactory.setDefaultFactory()? If they don't (and they shouldn't), then that's the end of our logging configuration. The Netty logging APIs just delegate to the respective logging framework; slf4j, log4j, JDK logging...

@FrankCy
Copy link
Author

FrankCy commented May 16, 2024

Do any of them call InternalLoggerFactory.setDefaultFactory()? If they don't (and they shouldn't), then that's the end of our logging configuration. The Netty logging APIs just delegate to the respective logging framework; slf4j, log4j, JDK logging...

The logging framework used in the lettuce component io.lettuce.core.protocol.ConnectionWatchdog is declared by the netty component io.netty.util.internal.logging.InternalLoggerFactory and instantiated io.netty.util.internal.logging.InternalLogger, configuring log4j2.xml can only affect the log output of io.lettuce.core.protocol, but not io.lettuce.core.protocol. ConnectionWatchdog, how do Io.netty.util.internal.logging.
Here is the lettuce-core implementation:

/*
 * Copyright 2011-2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.lettuce.core.protocol;

import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import reactor.core.publisher.Mono;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.
 *
 * @author Will Glozer
 * @author Mark Paluch
 * @author Koji Lin
 */
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {

    private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);

    private final Delay reconnectDelay;
    private final Bootstrap bootstrap;
    private final EventExecutorGroup reconnectWorkers;
    private final ReconnectionHandler reconnectionHandler;
    private final ReconnectionListener reconnectionListener;
    private final Timer timer;

    private Channel channel;
    private SocketAddress remoteAddress;
    private long lastReconnectionLogging = -1;
    private String logPrefix;

    private final AtomicBoolean reconnectSchedulerSync;
    private volatile int attempts;
    private volatile boolean armed;
    private volatile boolean listenOnChannelInactive;
    private volatile Timeout reconnectScheduleTimeout;

    /**
     * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new
     * {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.
     *
     * @param reconnectDelay reconnect delay, must not be {@literal null}
     * @param clientOptions client options for the current connection, must not be {@literal null}
     * @param bootstrap Configuration for new channels, must not be {@literal null}
     * @param timer Timer used for delayed reconnect, must not be {@literal null}
     * @param reconnectWorkers executor group for reconnect tasks, must not be {@literal null}
     * @param socketAddressSupplier the socket address supplier to obtain an address for reconnection, may be {@literal null}
     * @param reconnectionListener the reconnection listener, must not be {@literal null}
     * @param connectionFacade the connection facade, must not be {@literal null}
     */
    public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,
            EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier,
            ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade) {

        LettuceAssert.notNull(reconnectDelay, "Delay must not be null");
        LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
        LettuceAssert.notNull(bootstrap, "Bootstrap must not be null");
        LettuceAssert.notNull(timer, "Timer must not be null");
        LettuceAssert.notNull(reconnectWorkers, "ReconnectWorkers must not be null");
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        LettuceAssert.notNull(reconnectionListener, "ReconnectionListener must not be null");
        LettuceAssert.notNull(connectionFacade, "ConnectionFacade must not be null");

        this.reconnectDelay = reconnectDelay;
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.reconnectWorkers = reconnectWorkers;
        this.reconnectionListener = reconnectionListener;
        this.reconnectSchedulerSync = new AtomicBoolean(false);

        Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
                .onErrorResume(
                        t -> {

                            if (logger.isDebugEnabled()) {
                                logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()
                                        + ", reusing cached address " + remoteAddress, t);
                            } else {
                                logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()
                                        + ", reusing cached address " + remoteAddress);
                            }

                            return Mono.just(remoteAddress);
                        });

        this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer,
                reconnectWorkers, connectionFacade);

        resetReconnectDelay();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt);

        if (evt instanceof ConnectionEvents.Activated) {
            attempts = 0;
            resetReconnectDelay();
        }

        super.userEventTriggered(ctx, evt);
    }

    void prepareClose() {

        setListenOnChannelInactive(false);
        setReconnectSuspended(true);

        Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;
        if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {
            reconnectScheduleTimeout.cancel();
        }

        reconnectionHandler.prepareClose();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        reconnectSchedulerSync.set(false);
        channel = ctx.channel();
        reconnectScheduleTimeout = null;
        logPrefix = null;
        remoteAddress = channel.remoteAddress();
        logPrefix = null;
        logger.debug("{} channelActive()", logPrefix());

        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.debug("{} channelInactive()", logPrefix());
        if (!armed) {
            logger.debug("{} ConnectionWatchdog not armed", logPrefix());
            return;
        }

        channel = null;

        if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
            scheduleReconnect();
        } else {
            logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
        }

        super.channelInactive(ctx);
    }

    /**
     * Enable {@link ConnectionWatchdog} to listen for disconnected events.
     */
    void arm() {
        this.armed = true;
        setListenOnChannelInactive(true);
    }

    /**
     * Schedule reconnect if channel is not available/not active.
     */
    public void scheduleReconnect() {

        logger.debug("{} scheduleReconnect()", logPrefix());

        if (!isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }

        if (!isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }

        if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {

            attempts++;
            final int attempt = attempts;
            int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
            logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);

            this.reconnectScheduleTimeout = timer.newTimeout(it -> {

                reconnectScheduleTimeout = null;

                if (!isEventLoopGroupActive()) {
                    logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
                    return;
                }

                reconnectWorkers.submit(() -> {
                    ConnectionWatchdog.this.run(attempt);
                    return null;
                });
            }, timeout, TimeUnit.MILLISECONDS);

            // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
            if (!reconnectSchedulerSync.get()) {
                reconnectScheduleTimeout = null;
            }
        } else {
            logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
        }
    }

    /**
     * Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with
     * the same handler instances contained in the old channel's pipeline.
     *
     * @param attempt attempt counter
     *
     * @throws Exception when reconnection fails.
     */
    public void run(int attempt) throws Exception {

        reconnectSchedulerSync.set(false);
        reconnectScheduleTimeout = null;

        if (!isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }

        if (!isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }

        if (isReconnectSuspended()) {
            logger.debug("Skip reconnect scheduling, reconnect is suspended");
            return;
        }

        boolean shouldLog = shouldLog();

        InternalLogLevel infoLevel = InternalLogLevel.INFO;
        InternalLogLevel warnLevel = InternalLogLevel.WARN;

        if (shouldLog) {
            lastReconnectionLogging = System.currentTimeMillis();
        } else {
            warnLevel = InternalLogLevel.DEBUG;
            infoLevel = InternalLogLevel.DEBUG;
        }

        InternalLogLevel warnLevelToUse = warnLevel;

        try {
            reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
            logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);

            CompletableFuture<Channel> future = reconnectionHandler.reconnect();

            future.whenComplete((c, t) -> {

                if (c != null && t == null) {
                    return;
                }

                if (ReconnectionHandler.isExecutionException(t)) {
                    logger.log(warnLevelToUse, "Cannot reconnect: {}", t.toString());
                } else {
                    logger.log(warnLevelToUse, "Cannot reconnect: {}", t.toString(), t);
                }

                if (!isReconnectSuspended()) {
                    scheduleReconnect();
                }
            });
        } catch (Exception e) {
            logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
        }
    }

    private boolean isEventLoopGroupActive() {

        if (!isEventLoopGroupActive(bootstrap.group()) || !isEventLoopGroupActive(reconnectWorkers)) {
            return false;
        }

        return true;
    }

    private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {
        return !(executorService.isShuttingDown());
    }

    private boolean shouldLog() {

        long quietUntil = lastReconnectionLogging + LOGGING_QUIET_TIME_MS;
        return quietUntil <= System.currentTimeMillis();
    }

    /**
     * Enable event listener for disconnected events.
     *
     * @param listenOnChannelInactive {@literal true} to listen for disconnected events.
     */
    public void setListenOnChannelInactive(boolean listenOnChannelInactive) {
        this.listenOnChannelInactive = listenOnChannelInactive;
    }

    public boolean isListenOnChannelInactive() {
        return listenOnChannelInactive;
    }

    /**
     * Suspend reconnection temporarily. Reconnect suspension will interrupt reconnection attempts.
     *
     * @param reconnectSuspended {@literal true} to suspend reconnection
     */
    public void setReconnectSuspended(boolean reconnectSuspended) {
        reconnectionHandler.setReconnectSuspended(reconnectSuspended);
    }

    public boolean isReconnectSuspended() {
        return reconnectionHandler.isReconnectSuspended();
    }

    ReconnectionHandler getReconnectionHandler() {
        return reconnectionHandler;
    }

    private void resetReconnectDelay() {
        if (reconnectDelay instanceof StatefulDelay) {
            ((StatefulDelay) reconnectDelay).reset();
        }
    }

    private String logPrefix() {

        if (logPrefix != null) {
            return logPrefix;
        }

        String
        buffer= "[" +ChannelLogDescriptor.logDescriptor(channel)+", last known addr="+remoteAddress+']';
        return logPrefix = buffer;
    }
}

@FrankCy
Copy link
Author

FrankCy commented May 16, 2024

The logger.debug within the ConnectionWatchdog file is unaffected by the log4j2.xml configuration and is output to rocketmq_client.log in applications that also use rocketmq. Can you help explain how the output location is set? Here we are using io.netty.util.internal.logging.InternalLogger, version: netty-common-4.1.38.Final

@FrankCy
Copy link
Author

FrankCy commented May 16, 2024

Currently looking at using io.netty.util.internal.logging.InternalLogger logs are not controlled by logj4.xml, netty's log output is logback?Is there some kind of fantastic relationship here?

@slandelle
Copy link
Contributor

Netty has a dependency to slf4j, which is a facade, not a logger implementation.
If you want to log with log4j2, you must bring the bridge yourself, cf https://logging.apache.org/log4j/2.x/log4j-slf4j-impl.html

@FrankCy
Copy link
Author

FrankCy commented May 16, 2024

Netty has a dependency to slf4j, which is a facade, not a logger implementation. If you want to log with log4j2, you must bring the bridge yourself, cf https://logging.apache.org/log4j/2.x/log4j-slf4j-impl.html

OK, thanks, this side see is based on slf4j implementation of the log facade, the programme introduced log4j-slf4j-impl, but my program currently still have the above situation, only lettuce-core this part of the implementation can not be log4j2.xml management.

@FrankCy
Copy link
Author

FrankCy commented May 20, 2024

Because netty-common depends on logback-classic 1.2.13, the class StaticLoggerBinder in logback-classic implements LoggerFactoryBinder and binds logback as a slf4j implementation, so as long as you reference this version of the netty-common, it will use logback as the logging implementation by default, is this the reason?

  • logback-classic 1.2.13 StaticLoggerBinder.java
/**
 * Logback: the reliable, generic, fast and flexible logging framework.
 * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
 *
 * This program and the accompanying materials are dual-licensed under
 * either the terms of the Eclipse Public License v1.0 as published by
 * the Eclipse Foundation
 *
 *   or (per the licensee's choosing)
 *
 * under the terms of the GNU Lesser General Public License version 2.1
 * as published by the Free Software Foundation.
 */
package org.slf4j.impl;

import ch.qos.logback.core.status.StatusUtil;
import org.slf4j.ILoggerFactory;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.Util;
import org.slf4j.spi.LoggerFactoryBinder;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.util.ContextInitializer;
import ch.qos.logback.classic.util.ContextSelectorStaticBinder;
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.joran.spi.JoranException;
import ch.qos.logback.core.util.StatusPrinter;

/**
 * 
 * The binding of {@link LoggerFactory} class with an actual instance of
 * {@link ILoggerFactory} is performed using information returned by this class.
 * 
 * @author Ceki G&uuml;lc&uuml;</a>
 */
public class StaticLoggerBinder implements LoggerFactoryBinder {

    /**
     * Declare the version of the SLF4J API this implementation is compiled
     * against. The value of this field is usually modified with each release.
     */
    // to avoid constant folding by the compiler, this field must *not* be final
    public static String REQUESTED_API_VERSION = "1.7.16"; // !final

    final static String NULL_CS_URL = CoreConstants.CODES_URL + "#null_CS";

    /**
     * The unique instance of this class.
     */
    private static StaticLoggerBinder SINGLETON = new StaticLoggerBinder();

    private static Object KEY = new Object();

    static {
        SINGLETON.init();
    }

    private boolean initialized = false;
    private LoggerContext defaultLoggerContext = new LoggerContext();
    private final ContextSelectorStaticBinder contextSelectorBinder = ContextSelectorStaticBinder.getSingleton();

    private StaticLoggerBinder() {
        defaultLoggerContext.setName(CoreConstants.DEFAULT_CONTEXT_NAME);
    }

    public static StaticLoggerBinder getSingleton() {
        return SINGLETON;
    }

    /**
     * Package access for testing purposes.
     */
    static void reset() {
        SINGLETON = new StaticLoggerBinder();
        SINGLETON.init();
    }

    /**
     * Package access for testing purposes.
     */
    void init() {
        try {
            try {
                new ContextInitializer(defaultLoggerContext).autoConfig();
            } catch (JoranException je) {
                Util.report("Failed to auto configure default logger context", je);
            }
            // logback-292
            if (!StatusUtil.contextHasStatusListener(defaultLoggerContext)) {
                StatusPrinter.printInCaseOfErrorsOrWarnings(defaultLoggerContext);
            }
            contextSelectorBinder.init(defaultLoggerContext, KEY);
            initialized = true;
        } catch (Exception t) { // see LOGBACK-1159
            Util.report("Failed to instantiate [" + LoggerContext.class.getName() + "]", t);
        }
    }

    public ILoggerFactory getLoggerFactory() {
        if (!initialized) {
            return defaultLoggerContext;
        }

        if (contextSelectorBinder.getContextSelector() == null) {
            throw new IllegalStateException("contextSelector cannot be null. See also " + NULL_CS_URL);
        }
        return contextSelectorBinder.getContextSelector().getLoggerContext();
    }

    public String getLoggerFactoryClassStr() {
        return contextSelectorBinder.getClass().getName();
    }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants