In [10]:
import os
import glob
import pandas as pd
import plotly.graph_objects as go

# 设置 Excel 文件夹路径
folder_path = "X:/25.3.12/input/sankey2/"

# 获取所有 Excel 文件，并按照阶段顺序排序
file_list = sorted(glob.glob(os.path.join(folder_path, "*.xlsx")))

# 阶段标签
stages = ["阶段1", "阶段2", "阶段3"]

# 检查是否有文件
if not file_list or len(file_list) != 3:
    raise FileNotFoundError("未找到 3 个阶段的 Excel 文件，请检查路径")

# 读取所有 Excel 文件，并按阶段存储
dfs = {stages[i]: pd.read_excel(file, usecols=[0, 1]) for i, file in enumerate(file_list)}

# 初始化存储
all_nodes = []  # 所有节点（主题）
all_links = {"source": [], "target": [], "value": []}  # 主题之间的联系

# 解析各阶段的数据
stage_topics = {}  # 每个阶段的主题
stage_strengths = {}  # 每个主题的强度

for stage, df in dfs.items():
    if len(df.columns) < 2:
        raise ValueError(f"{stage} 数据格式错误，至少需要两列：主题、强度")

    # 主题（加上阶段前缀，以防重复）
    topics = [f"{stage}_{topic}" for topic in df.iloc[:, 0].astype(str).tolist()]
    strengths = df.iloc[:, 1].astype(float).tolist()

    # 存储数据
    stage_topics[stage] = topics
    stage_strengths[stage] = strengths
    all_nodes.extend(topics)  # 主题加入节点

# **构造 阶段1 → 阶段2，阶段2 → 阶段3 的联系**
for i in range(len(stages) - 1):
    current_stage = stages[i]
    next_stage = stages[i + 1]

    for topic1 in stage_topics[current_stage]:
        for topic2 in stage_topics[next_stage]:
            # 计算两个主题之间的相似性（基于主题名称相似性）
            words1 = set(topic1.replace(current_stage + "_", "").split())  # 去掉阶段前缀
            words2 = set(topic2.replace(next_stage + "_", "").split())

            overlap = len(words1 & words2)  # 计算共同词数
            if overlap > 0:  # 仅连接有共同词的主题
                all_links["source"].append(topic1)
                all_links["target"].append(topic2)
                all_links["value"].append(overlap)  # 关键词重叠数量作为权重

# **转换索引**
node_labels = list(set(all_nodes))
node_indices = {label: idx for idx, label in enumerate(node_labels)}

all_links["source"] = [node_indices[src] for src in all_links["source"]]
all_links["target"] = [node_indices[tgt] for tgt in all_links["target"]]

# **绘制桑基图**
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels
    ),
    link=dict(
        source=all_links["source"],
        target=all_links["target"],
        value=all_links["value"]
    )
))

# **显示图表**
fig.show()


In [11]:
import os
import glob
import pandas as pd
import plotly.graph_objects as go

# 设置 Excel 文件夹路径
folder_path = "X:/25.3.12/input/sankey2/"

# 获取所有 Excel 文件，并按照阶段顺序排序
file_list = sorted(glob.glob(os.path.join(folder_path, "*.xlsx")))

# 阶段标签
stages = ["阶段1", "阶段2", "阶段3"]

# 检查是否有文件
if not file_list or len(file_list) != 3:
    raise FileNotFoundError("未找到 3 个阶段的 Excel 文件，请检查路径")

# 读取所有 Excel 文件，并按阶段存储
dfs = {stages[i]: pd.read_excel(file) for i, file in enumerate(file_list)}

# 初始化存储
all_nodes = []  # 所有节点（主题 & 关键词）
all_links = {"source": [], "target": [], "value": []}  # 链接数据

# 解析各阶段的数据
stage_topics = {}  # 存储每个阶段的主题
stage_strengths = {}  # 主题强度
stage_words = {}  # 词分布

for stage, df in dfs.items():
    if len(df.columns) < 3:
        raise ValueError(f"{stage} 数据格式错误，至少需要三列：主题、强度、词分布")

    # 主题、强度、词分布
    topics = [f"{stage}_{topic}" for topic in df.iloc[:, 0].astype(str).tolist()]
    strengths = df.iloc[:, 1].astype(float).tolist()
    word_distributions = df.iloc[:, 2].astype(str).tolist()

    # 解析词分布
    words = []
    for dist in word_distributions:
        word_list = []
        pairs = dist.split("+")
        for pair in pairs:
            try:
                weight, word = pair.strip().split("*")
                word_list.append((word.strip('"').strip("'"), float(weight)))
            except ValueError:
                continue  # 过滤错误数据
        words.append(word_list)

    # 存储数据
    stage_topics[stage] = topics
    stage_strengths[stage] = strengths
    stage_words[stage] = words
    all_nodes.extend(topics)  # 主题加入节点

# 关键词去重
all_keywords = list(set(word for words in stage_words.values() for sublist in words for word, _ in sublist))
all_nodes.extend(all_keywords)

# 映射节点索引
node_labels = list(set(all_nodes))
node_indices = {label: idx for idx, label in enumerate(node_labels)}

# **1. 构造 主题 → 关键词 的链接**
for stage, topics in stage_topics.items():
    strengths = stage_strengths[stage]
    words = stage_words[stage]

    for topic, strength, word_list in zip(topics, strengths, words):
        for word, weight in word_list:
            all_links["source"].append(topic)
            all_links["target"].append(word)
            all_links["value"].append(strength * weight)  # 计算贡献度

# **2. 构造 阶段间主题的演化路径**
for i in range(len(stages) - 1):
    current_stage = stages[i]
    next_stage = stages[i + 1]

    for topic1 in stage_topics[current_stage]:
        for topic2 in stage_topics[next_stage]:
            # 计算两个主题之间的相似度（基于关键词重叠）
            words1 = set(word for word, _ in stage_words[current_stage][stage_topics[current_stage].index(topic1)])
            words2 = set(word for word, _ in stage_words[next_stage][stage_topics[next_stage].index(topic2)])

            overlap = len(words1 & words2)  # 计算关键词重叠数量
            if overlap > 0:  # 仅连接有共同词的主题
                all_links["source"].append(topic1)
                all_links["target"].append(topic2)
                all_links["value"].append(overlap)  # 关键词重叠数量作为权重

# **转换索引**
all_links["source"] = [node_indices[src] for src in all_links["source"]]
all_links["target"] = [node_indices[tgt] for tgt in all_links["target"]]

# **绘制桑基图**
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels
    ),
    link=dict(
        source=all_links["source"],
        target=all_links["target"],
        value=all_links["value"]
    )
))

# **显示图表**
fig.show()


In [12]:
import os
import pandas as pd
import plotly.graph_objects as go

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# 构造节点（主题）
nodes = []
for stage in stages:
    nodes.extend([f"{stage}: {theme}" for theme in stage_themes[stage]])

# 去重节点
node_labels = list(set(nodes))
node_indices = {label: idx for idx, label in enumerate(node_labels)}

# 构造链接（连接不同阶段的主题）
links = {"source": [], "target": [], "value": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)

# 映射索引
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# 创建桑基图
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"]
    )
))

# 显示图表
fig.show()


In [14]:
import os
import pandas as pd
import plotly.graph_objects as go

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# 构造节点（仅保留主题名称）
nodes = []
raw_theme_labels = []  # 仅存储主题名
for stage in stages:
    for theme in stage_themes[stage]:
        nodes.append(f"{stage}: {theme}")  # 用于索引的唯一标识
        raw_theme_labels.append(theme)  # 只保留主题名称

# 去重节点
node_labels = list(dict.fromkeys(raw_theme_labels))  # 主题名称去重
node_indices = {label: idx for idx, label in enumerate(nodes)}  # 用完整主题映射索引

# 构造链接（连接不同阶段的主题）
links = {"source": [], "target": [], "value": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)

# 映射索引（保持原始主题）
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# 创建桑基图（仅显示主题名称）
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels  # 只显示主题名称
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"]
    )
))

# 显示图表
fig.show()


In [17]:
import os
import pandas as pd
import plotly.graph_objects as go
import matplotlib.colors as mcolors

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# 构造节点（仅保留主题名称）
nodes = []
raw_theme_labels = []  # 仅存储主题名
for stage in stages:
    for theme in stage_themes[stage]:
        nodes.append(f"{stage}: {theme}")  # 用于索引的唯一标识
        raw_theme_labels.append(theme)  # 只保留主题名称

# 去重节点
node_labels = list(dict.fromkeys(raw_theme_labels))  # 主题名称去重
node_indices = {label: idx for idx, label in enumerate(nodes)}  # 用完整主题映射索引

# **生成颜色映射**
unique_themes = list(set(raw_theme_labels))  # 获取所有主题
colors = list(mcolors.TABLEAU_COLORS.values())  # 使用 Matplotlib 颜色
theme_colors = {theme: colors[i % len(colors)] for i, theme in enumerate(unique_themes)}  # 主题颜色映射

# **构造链接（连接不同阶段的主题）**
links = {"source": [], "target": [], "value": [], "color": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量
        links["color"].append(mcolors.to_hex(theme_colors[t2], keep_alpha=False))  # 转换成 HEX 颜色

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)
        links["color"].append(mcolors.to_hex(theme_colors[t3], keep_alpha=False))  # 转换成 HEX 颜色

# **映射索引（保持原始主题）**
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# **创建桑基图**
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels,
        color=[theme_colors[label] for label in node_labels]  # 设置节点颜色
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"],
        color=links["color"]  # ✅ 现在是 HEX 格式，不会报错
    )
))

# **显示图表**
fig.show()


In [18]:
import os
import pandas as pd

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# **构造 Excel 表格数据**
sankey_data = []

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        value = (s1 + s2) / 2  # 计算流量
        sankey_data.append([t1, t2, value])

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        value = (s2 + s3) / 2
        sankey_data.append([t2, t3, value])

# **转换为 DataFrame**
df_sankey = pd.DataFrame(sankey_data, columns=["Source", "Target", "Value"])

# **导出到 Excel**
output_path = "sankey_data.xlsx"
df_sankey.to_excel(output_path, index=False)

print(f"✅ Excel 生成完成: {output_path}")


✅ Excel 生成完成: sankey_data.xlsx


In [19]:
import os
import pandas as pd
import plotly.graph_objects as go
import random

# 生成随机莫卡迪色（Molokadi Colors）
def random_molokadi_color():
    molokadi_colors = [
        "#6B4226", "#9C6615", "#D2691E", "#CD853F", "#8B4513",
        "#A0522D", "#DEB887", "#F4A460", "#BC8F8F", "#8B0000",
        "#993300", "#8B5A2B", "#CC7722", "#B87333", "#E97451"
    ]
    return random.choice(molokadi_colors)

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# 构造节点（仅保留主题名称）
nodes = []
raw_theme_labels = []  # 仅存储主题名
for stage in stages:
    for theme in stage_themes[stage]:
        nodes.append(f"{stage}: {theme}")  # 用于索引的唯一标识
        raw_theme_labels.append(theme)  # 只保留主题名称

# 去重节点
node_labels = list(dict.fromkeys(raw_theme_labels))  # 主题名称去重
node_indices = {label: idx for idx, label in enumerate(nodes)}  # 用完整主题映射索引

# **生成颜色映射**
unique_themes = list(set(raw_theme_labels))  # 获取所有主题
theme_colors = {theme: random_molokadi_color() for theme in unique_themes}  # 随机分配颜色

# **构造链接（连接不同阶段的主题）**
links = {"source": [], "target": [], "value": [], "color": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量
        links["color"].append(f"rgba({int(theme_colors[t2][1:3], 16)},"
                              f"{int(theme_colors[t2][3:5], 16)},"
                              f"{int(theme_colors[t2][5:7], 16)}, 0.5)")  # 透明度 50%

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)
        links["color"].append(f"rgba({int(theme_colors[t3][1:3], 16)},"
                              f"{int(theme_colors[t3][3:5], 16)},"
                              f"{int(theme_colors[t3][5:7], 16)}, 0.5)")  # 透明度 50%

# **映射索引（保持原始主题）**
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# **创建桑基图**
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels,
        color=[theme_colors[label] for label in node_labels]  # 设置节点颜色
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"],
        color=links["color"]  # ✅ 连接线为透明 50% 的莫卡迪色
    )
))

# **显示图表**
fig.show()


In [20]:
import os
import pandas as pd
import plotly.graph_objects as go
import random
import matplotlib.colors as mcolors

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# **生成“莫卡迪色”随机颜色**
def random_mokadi_color():
    """ 生成随机的莫卡迪色 """
    colors = [
        "#E57373", "#F06292", "#BA68C8", "#64B5F6", "#4DB6AC",
        "#81C784", "#DCE775", "#FFD54F", "#FF8A65", "#A1887F"
    ]
    return random.choice(colors)

# **构造颜色映射**
unique_themes = list(set([theme for stage in stages for theme in stage_themes[stage]]))  # 获取所有主题
theme_colors = {theme: random_mokadi_color() for theme in unique_themes}  # 主题颜色映射

# **构造节点（仅保留主题名称）**
nodes = []
raw_theme_labels = []  # 仅存储主题名
for stage in stages:
    for theme in stage_themes[stage]:
        nodes.append(f"{stage}: {theme}")  # 用于索引的唯一标识
        raw_theme_labels.append(theme)  # 只保留主题名称

# **去重节点**
node_labels = list(dict.fromkeys(raw_theme_labels))  # 主题名称去重
node_indices = {label: idx for idx, label in enumerate(nodes)}  # 用完整主题映射索引

# **构造链接（连接不同阶段的主题）**
links = {"source": [], "target": [], "value": [], "color": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量
        links["color"].append(mcolors.to_rgba(theme_colors[t2], alpha=0.5))  # 透明度50%

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)
        links["color"].append(mcolors.to_rgba(theme_colors[t3], alpha=0.5))  # 透明度50%

# **映射索引（保持原始主题）**
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# **修正颜色格式（Plotly 需要 HEX 格式）**
def rgba_to_hex(rgba):
    """ 将 RGBA 转换为 HEX 并手动调整透明度 """
    r, g, b, a = rgba
    return f"rgba({int(r*255)}, {int(g*255)}, {int(b*255)}, {a})"

links["color"] = [rgba_to_hex(color) for color in links["color"]]

# **创建桑基图**
fig = go.Figure(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels,
        color=[theme_colors[label] for label in node_labels]  # 主题颜色
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"],
        color=links["color"]  # ✅ 线条颜色更浅，透明度 50%
    )
))

# **显示图表**
fig.show()


In [6]:
import os
import pandas as pd
import plotly.graph_objects as go
import random
import matplotlib.colors as mcolors

# 设置 Excel 文件路径
folder_path = "X:/25.3.12/input/sankey2/"
file_paths = {
    "第一阶段": os.path.join(folder_path, "1.xlsx"),
    "第二阶段": os.path.join(folder_path, "2.xlsx"),
    "第三阶段": os.path.join(folder_path, "3.xlsx"),
}

# 读取 Excel 文件
data = {}
for stage, path in file_paths.items():
    if os.path.exists(path):
        df = pd.read_excel(path)
        if len(df.columns) < 2:
            raise ValueError(f"{path} 数据格式错误，至少需要两列（主题、强度）")
        data[stage] = df.iloc[:, 0].astype(str).tolist(), df.iloc[:, 1].astype(float).tolist()

# 获取所有阶段的主题
stages = list(data.keys())
stage_themes = {stage: themes for stage, (themes, strengths) in data.items()}
stage_strengths = {stage: strengths for stage, (themes, strengths) in data.items()}

# **生成“莫卡迪色”随机颜色**
def random_mokadi_color():
    """ 生成随机的莫卡迪色 """
    colors = [
        "#E57373", "#F06292", "#BA68C8", "#64B5F6", "#4DB6AC",
        "#81C784", "#DCE775", "#FFD54F", "#FF8A65", "#A1887F"
    ]
    return random.choice(colors)

# **构造颜色映射**
unique_themes = list(set([theme for stage in stages for theme in stage_themes[stage]]))  # 获取所有主题
theme_colors = {theme: random_mokadi_color() for theme in unique_themes}  # 主题颜色映射

# **构造节点（仅保留主题名称）**
nodes = []
raw_theme_labels = []  # 仅存储主题名
for stage in stages:
    for theme in stage_themes[stage]:
        nodes.append(f"{stage}: {theme}")  # 用于索引的唯一标识
        raw_theme_labels.append(theme)  # 只保留主题名称

# **去重节点**
node_labels = list(dict.fromkeys(raw_theme_labels))  # 主题名称去重
node_indices = {label: idx for idx, label in enumerate(nodes)}  # 用完整主题映射索引

# **构造链接（连接不同阶段的主题）**
links = {"source": [], "target": [], "value": [], "color": []}

# 连接第一阶段到第二阶段
for t1, s1 in zip(stage_themes["第一阶段"], stage_strengths["第一阶段"]):
    for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
        links["source"].append(f"第一阶段: {t1}")
        links["target"].append(f"第二阶段: {t2}")
        links["value"].append((s1 + s2) / 2)  # 取平均作为流量
        links["color"].append(mcolors.to_rgba(theme_colors[t2], alpha=0.5))  # 透明度50%

# 连接第二阶段到第三阶段
for t2, s2 in zip(stage_themes["第二阶段"], stage_strengths["第二阶段"]):
    for t3, s3 in zip(stage_themes["第三阶段"], stage_strengths["第三阶段"]):
        links["source"].append(f"第二阶段: {t2}")
        links["target"].append(f"第三阶段: {t3}")
        links["value"].append((s2 + s3) / 2)
        links["color"].append(mcolors.to_rgba(theme_colors[t3], alpha=0.5))  # 透明度50%

# **映射索引（保持原始主题）**
links["source"] = [node_indices[src] for src in links["source"]]
links["target"] = [node_indices[tgt] for tgt in links["target"]]

# **修正颜色格式（Plotly 需要 HEX 格式）**
def rgba_to_hex(rgba):
    """ 将 RGBA 转换为 HEX 并手动调整透明度 """
    r, g, b, a = rgba
    return f"rgba({int(r*255)}, {int(g*255)}, {int(b*255)}, {a})"

links["color"] = [rgba_to_hex(color) for color in links["color"]]

# **创建桑基图**
fig = go.Figure()

# **添加桑基图**
fig.add_trace(go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels,
        color=[theme_colors[label] for label in node_labels]  # 主题颜色
    ),
    link=dict(
        source=links["source"],
        target=links["target"],
        value=links["value"],
        color=links["color"]  # ✅ 线条颜色更浅，透明度 50%
    )
))

# **添加时间轴**
fig.add_shape(
    type="line",
    x0=0, y0=-0.1, x1=1, y1=-0.1,  # 画一条水平线
    line=dict(color="black", width=2)  # 黑色粗线
)

# **添加时间标签**
fig.add_annotation(
    x=0.1, y=-0.21, text="2011-2018", showarrow=False, font=dict(size=14)
)
fig.add_annotation(
    x=0.5, y=-0.21, text="2019-2021", showarrow=False, font=dict(size=14)
)
fig.add_annotation(
    x=0.9, y=-0.21, text="2022-2024", showarrow=False, font=dict(size=14)
)

# **调整图形布局**
fig.update_layout(
    title="主题演化图",
    xaxis=dict(visible=False),
    yaxis=dict(visible=False),
    margin=dict(l=50, r=50, t=50, b=100)  # 让下方有足够空间放时间轴
)

# **显示图表**
fig.show()
