/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.example.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
 * Benchmark：对比两种 ConcurrentHashMap 缓存策略在并发场景下的吞吐量差异。
 *
 * <p><b>对比的两种实现：</b>
 * <ul>
 *   <li><b>A（旧）</b>：{@code computeIfAbsent(key, loader)}
 *       —— miss 时持有 bin-lock 期间执行构造，其他线程等待</li>
 *   <li><b>B（新）</b>：{@code get(key)} + lock 外构造 + {@code putIfAbsent(key, val)}
 *       —— miss 时多线程可并行构造，putIfAbsent 决定胜者，loser 对象直接 GC</li>
 * </ul>
 *
 * <p><b>测试场景（三个独立 State 类，各自管理生命周期）：</b>
 * <ol>
 *   <li>{@link HitState}   – <b>全命中</b>：每次 Iteration 前预热，测量稳定态读性能</li>
 *   <li>{@link MissState}  – <b>全未命中</b>：每次 Invocation 前清空，
 *       模拟 {@code clearIfReachedMaxCapacity} 触发雪崩式 miss</li>
 *   <li>{@link MixedState} – <b>90%命中 + 10%未命中</b>：每次 Invocation 前清掉头 10%，
 *       贴近生产中 loadNamespaceTopics 的实际分布</li>
 * </ol>
 *
 * <p><b>运行方式（推荐）：</b>
 * <pre>
 *   # 1. 编译并打包（跳过常规测试）
 *   mvn -pl pulsar-common package -DskipTests -am
 *
 *   # 2. 用 JMH uber-jar 运行（需在 pom.xml 里配置 maven-shade-plugin，见注释）
 *   java -jar pulsar-common/target/benchmarks.jar TopicNameCacheBenchmark -rf json
 *
 *   # 或者直接通过 exec 插件运行（无需 shade）
 *   mvn -pl pulsar-common test-compile \
 *       exec:java -Dexec.classpathScope=test \
 *       -Dexec.mainClass=org.apache.pulsar.common.naming.TopicNameCacheBenchmark
 * </pre>
 */
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 3, time = 2)
@Measurement(iterations = 5, time = 3)
@Fork(value = 1, jvmArgsAppend = {"-Xmx512m", "-Xms512m"})
public class TopicNameCacheBenchmark {

    // =======================================================================
    // 公共工具方法 & FakeTopicName
    // =======================================================================

    /**
     * 旧实现：computeIfAbsent 持有 bin-lock 期间执行 lambda。
     * 同一 key 并发时只有一个线程执行构造，其余线程在 synchronized 块外自旋或阻塞。
     */
    static FakeTopicName getOrLoadA(ConcurrentHashMap<String, FakeTopicName> cache, String topic) {
        return cache.computeIfAbsent(topic, FakeTopicName::new);
    }

    /**
     * 新实现：先 get（volatile 读，无锁），miss 时在 bin-lock 外构造对象，再 putIfAbsent。
     * 多线程并发 miss 同一 key 时各自构造自己的实例，putIfAbsent 决定胜者，
     * 其余实例直接被 GC 回收，完全消除了 bin-lock 期间的阻塞等待。
     */
    static FakeTopicName getOrLoadB(ConcurrentHashMap<String, FakeTopicName> cache, String topic) {
        FakeTopicName tp = cache.get(topic);
        if (tp != null) {
            return tp;
        }
        // 在锁外构造，不阻塞其他线程
        FakeTopicName newTp = new FakeTopicName(topic);
        FakeTopicName existing = cache.putIfAbsent(topic, newTp);
        return existing != null ? existing : newTp;
    }

    /**
     * 模拟 TopicName 构造的真实 CPU 开销：indexOf + substring + split。
     * 与 {TopicName} 构造函数中的字符串解析逻辑等价，
     * 但无需引入 Splitter/NamespaceName 等依赖，保持 benchmark 的纯净性。
     */
    static class FakeTopicName {
        @SuppressWarnings("unused")
        final String domain;
        @SuppressWarnings("unused")
        final String tenant;
        @SuppressWarnings("unused")
        final String namespace;
        @SuppressWarnings("unused")
        final String localName;

        FakeTopicName(String fullName) {
            int schemaEnd = fullName.indexOf("://");
            if (schemaEnd < 0) {
                this.domain = "persistent";
                this.tenant = "public";
                this.namespace = "default";
                this.localName = fullName;
                return;
            }
            this.domain = fullName.substring(0, schemaEnd);
            String rest = fullName.substring(schemaEnd + 3);
            // persistent://tenant/namespace/topic → ["tenant","namespace","topic"]
            String[] parts = rest.split("/", 3);
            this.tenant    = parts.length > 0 ? parts[0] : "";
            this.namespace = parts.length > 1 ? parts[1] : "";
            this.localName = parts.length > 2 ? parts[2] : "";
        }
    }

    // =======================================================================
    // 场景 1：全命中（Warm Cache）
    //
    // 每次 Iteration 开始前预热，测量并发下纯 cache.get() 的吞吐。
    // A 与 B 在命中时路径完全相同（都是 ConcurrentHashMap.get），
    // 结果应接近，可用于验证 benchmark 基准是否公平。
    // =======================================================================

    @State(Scope.Benchmark)
    public static class HitState {

        @Param({"100", "1000", "5000"})
        int topicCount;

        List<String> topicNames;
        final ConcurrentHashMap<String, FakeTopicName> cacheA = new ConcurrentHashMap<>();
        final ConcurrentHashMap<String, FakeTopicName> cacheB = new ConcurrentHashMap<>();

        @Setup(Level.Trial)
        public void setupTopics() {
            topicNames = new ArrayList<>(topicCount);
            for (int i = 0; i < topicCount; i++) {
                topicNames.add("persistent://public/default/topic-" + i);
            }
        }

        /** 每次 Iteration 前重新预热，确保测量期间全部命中 */
        @Setup(Level.Iteration)
        public void warmUp() {
            for (String name : topicNames) {
                getOrLoadA(cacheA, name);
                getOrLoadB(cacheB, name);
            }
        }
    }

    @Benchmark
    public FakeTopicName hit_A(HitState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadA(s.cacheA, name);
        }
        return last;
    }

    @Benchmark
    public FakeTopicName hit_B(HitState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadB(s.cacheB, name);
        }
        return last;
    }

    // =======================================================================
    // 场景 2：全未命中（Cold Cache / 雪崩）
    //
    // 每次 Invocation 前清空缓存，8 个线程同时对同一批 key 首次写入。
    // 这是 computeIfAbsent vs get+putIfAbsent 竞争最激烈的场景，
    // 对应火焰图里 loadNamespaceTopics 触发大量 lockedGetOrLoad 的情况。
    // =======================================================================

    @State(Scope.Benchmark)
    public static class MissState {

        @Param({"100", "1000", "5000"})
        int topicCount;

        List<String> topicNames;
        final ConcurrentHashMap<String, FakeTopicName> cacheA = new ConcurrentHashMap<>();
        final ConcurrentHashMap<String, FakeTopicName> cacheB = new ConcurrentHashMap<>();

        @Setup(Level.Trial)
        public void setupTopics() {
            topicNames = new ArrayList<>(topicCount);
            for (int i = 0; i < topicCount; i++) {
                topicNames.add("persistent://public/default/topic-" + i);
            }
        }

        /** 每次 Invocation 前清空，保证每轮都是 100% miss */
        @Setup(Level.Invocation)
        public void clearAll() {
            cacheA.clear();
            cacheB.clear();
        }
    }

    @Benchmark
    public FakeTopicName miss_A(MissState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadA(s.cacheA, name);
        }
        return last;
    }

    @Benchmark
    public FakeTopicName miss_B(MissState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadB(s.cacheB, name);
        }
        return last;
    }

    // =======================================================================
    // 场景 3：混合读写（90% Hit + 10% Miss）
    //
    // 每次 Invocation 前：
    //   - 确保缓存中有全部 key（warmup）
    //   - 然后删掉前 10% 的 key，制造局部 miss
    // 贴近生产中 Broker 稳定运行期间偶发 miss 的实际场景。
    // =======================================================================

    @State(Scope.Benchmark)
    public static class MixedState {

        @Param({"100", "1000", "5000"})
        int topicCount;

        List<String> topicNames;
        final ConcurrentHashMap<String, FakeTopicName> cacheA = new ConcurrentHashMap<>();
        final ConcurrentHashMap<String, FakeTopicName> cacheB = new ConcurrentHashMap<>();

        @Setup(Level.Trial)
        public void setupTopics() {
            topicNames = new ArrayList<>(topicCount);
            for (int i = 0; i < topicCount; i++) {
                topicNames.add("persistent://public/default/topic-" + i);
            }
        }

        /**
         * 每次 Invocation 前：先全量预热（保证 90% 命中），再清掉头部 10%（制造 miss）。
         * 注意：JMH 在多线程下每个线程都会执行 @Setup(Invocation)，
         * 因此清除操作是幂等的（remove 不存在的 key 无副作用）。
         */
        @Setup(Level.Invocation)
        public void preparePartialMiss() {
            // 1. 先全量预热
            for (String name : topicNames) {
                getOrLoadA(cacheA, name);
                getOrLoadB(cacheB, name);
            }
            // 2. 清掉前 10%，制造局部 miss
            int missCount = Math.max(1, topicCount / 10);
            for (int i = 0; i < missCount; i++) {
                String name = topicNames.get(i);
                cacheA.remove(name);
                cacheB.remove(name);
            }
        }
    }

    @Benchmark
    public FakeTopicName mixed_A(MixedState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadA(s.cacheA, name);
        }
        return last;
    }

    @Benchmark
    public FakeTopicName mixed_B(MixedState s) {
        FakeTopicName last = null;
        for (String name : s.topicNames) {
            last = getOrLoadB(s.cacheB, name);
        }
        return last;
    }

    // =======================================================================
    // main：直接 java 运行入口
    // =======================================================================

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
                .include(TopicNameCacheBenchmark.class.getSimpleName())
                // 并发线程数：模拟 loadNamespaceTopics 的 executor 并发度
                .threads(8)
                .forks(1)
                .build();
        new Runner(opt).run();
    }
}
