# 在colab 中 使用GeminiAPI 完成小说生成

## 下载并解压配置文件

In [None]:
# 下载ZIP文件
!wget https://github.com/shaoyu12138/RequireAssets/raw/main/RequireAssets.zip -O /content/RequireAssets.zip

import shutil
import os

# 解压到当前工作目录
shutil.unpack_archive('/content/RequireAssets.zip', '/content/')
print("文件已解压到: /content/")

# 将RequireAssets文件夹内的内容移到当前工作目录
src_dir = '/content/RequireAssets'
if os.path.exists(src_dir):
    for item in os.listdir(src_dir):
        shutil.move(os.path.join(src_dir, item), '/content/')
    os.rmdir(src_dir)
    print("文件已移动并删除RequireAssets文件夹。")

# 删除ZIP文件
os.remove('/content/RequireAssets.zip')
print("ZIP文件已删除。")


## 下载依赖

In [None]:
!pip install -r requirements.txt

## **基础项目构建**

In [None]:
import os
book_name = "生成书籍"
book_name = os.path.join(book_name)

# 定义目录和文件结构
project_dir = book_name
configs_dir = os.path.join(project_dir, "configs")
temp_1_dir = os.path.join(project_dir, "temp_1")
temp_2_dir = os.path.join(project_dir, "temp_2")
output_dir = os.path.join(project_dir, "output")

# 创建文件夹
if not os.path.exists(configs_dir):
    os.makedirs(configs_dir)
if not os.path.exists(temp_1_dir):
    os.makedirs(temp_1_dir)
if not os.path.exists(temp_2_dir):
    os.makedirs(temp_2_dir)
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# 创建配置文件
outline_path = os.path.join(configs_dir, "outline.txt")

# 确保目录存在
os.makedirs(configs_dir, exist_ok=True)

# 创建文件（如果不存在）
if not os.path.exists(outline_path):
    with open(outline_path, "w", encoding="utf-8") as f:
        pass  # 创建空文件

print(f"配置文件已经创建{outline_path}")

## **运行第几组**

In [None]:
# 默认为5组，批次数量为6，一般输入是30部分，预期输出12w字
k = 1  # 起始批次编号
benchsize = 6  # 每个批次的文件夹数量

## **构建组级别的配置文件夹**

In [None]:
import os

def process_file(input_file, temp_1_dir, k, benchsize):
    """
    处理文本文件，将其按 "---" 分割，并在输出目录创建子文件夹和配置文件。

    Args:
        input_file: 输入文本文件的路径。
        temp_1_dir: 输出文件夹的路径。
        k:  批次编号 (用于计算起始文件夹编号)。
        benchsize: 每个批次生成的文件夹数量。
    """

    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            content = f.read()
    except FileNotFoundError:
        print(f"错误：找不到输入文件 {input_file}")
        return
    except Exception as e:
        print(f"读取文件时发生错误：{e}")
        return

    segments = content.split("---")
    segments = [seg.strip() for seg in segments if seg.strip()]  # 去除空字符串

    start_index = (k - 1) * benchsize  # 计算起始索引
    end_index = start_index + benchsize   #计算终止索引
    segments = segments[start_index:end_index]

    if not os.path.exists(temp_1_dir):
        try:
            os.makedirs(temp_1_dir)
        except Exception as e:
            print(f"创建输出文件夹 {temp_1_dir} 失败: {e}")
            return

    for i, segment in enumerate(segments):
        folder_name = str(start_index + i + 1)  # 使用计算出的起始索引
        folder_path = os.path.join(temp_1_dir, folder_name)

        try:
            os.makedirs(folder_path, exist_ok=True)
        except Exception as e:
            print(f"创建文件夹 {folder_path} 失败: {e}")
            continue

        config_file_path = os.path.join(folder_path, "config.txt")
        try:
            with open(config_file_path, 'w', encoding='utf-8') as config_file:
                config_file.write(segment)
        except Exception as e:
            print(f"写入配置文件 {config_file_path} 失败: {e}")
            continue

        print(f"已创建文件夹 {folder_name}，并写入 config.txt")

    print(f"任务完成，文件已分割并存储在 {temp_1_dir}")


if __name__ == '__main__':
    input_file_path = os.path.join(configs_dir, "outline.txt")
    temp_1_directory_path = temp_1_dir

    if os.path.exists(input_file_path):
      process_file(input_file_path, temp_1_dir, k, benchsize)
    else:
        print(f"错误：输入文件路径不存在: {input_file_path}")

## **定义网文类型**
- 常见的网文类型可以见`story_type[]数组`内容，选择合适的序号即可。

In [None]:
# 默认使用 1
story_num = 1

## 故事类型数组
story_types = [
# 通用
    "网文",
# 可选列表
    "穿越文",
    "系统文",
    "种田文",
    "超能力文",
    "都市文",
    "同人文",
    "重生文",
]

story_num = int(story_num)- 1  # 将 story_num 转换为整数，使用从1开始的自然计数。
story_type = story_types[story_num]
print(story_type)

## **配置Google生成模型型号**
- python库的安装
```
pip install google-genai

```

In [None]:
# gemini-2.0-flash  gemini-exp-1206  gemini-2.0-flash-thinking-exp-01-21  gemini-2.0-pro-exp-02-05

# 默认模型名称
model_name = "gemini-2.0-flash"

# 生成组剧情时使用的模型名称
special_model_name = "gemini-2.0-flash"

## **定义生成的开始和结束章节**
- 默认全部生成

In [None]:
start_number = 1
end_number = 1000 # 选一个足够大的数以便处理全部的内容

## **规定并发数**

In [None]:
# 大纲生成的并发数 = APIKEYS的数目 * 0.5
max_threads = 3
# 文件生成的并发数
max_workers = 22

# 导入库文件

In [None]:
from gemini_utils import *
from string_utils import *

## 构造每一组的剧情
> 一个组有9章

In [None]:
import os
import concurrent.futures

def process_folders(input_dir_base, start_folder, end_folder, concurrency=4):
    # 声明全局变量 (这些全局变量需要在函数外部定义)
    global story_type  # 文章设定
    global special_model_name  # 使用的特殊模型型号

    # 预先加载提示词 (假设 file_to_string 和 replaceStringSymbol 函数已定义)
    PlotsExpand   = file_to_string('assets_plots/PlotsExpand.txt')
    PlotsExpand_1 = file_to_string('assets_plots/1.txt')
    PlotsExpand_2 = file_to_string('assets_plots/2.txt')
    PlotsExpand_3 = file_to_string('assets_plots/3.txt')
    PlotsExpand_4 = file_to_string('assets_plots/4.txt')
    PlotsExpand_5 = file_to_string('assets_plots/5.txt')
    PlotsExpand_6 = file_to_string('assets_plots/6.txt')
    PlotsExpand_7 = file_to_string('assets_plots/7.txt')
    PlotsExpand_8 = file_to_string('assets_plots/8.txt')
    PlotsExpand_9 = file_to_string('assets_plots/9.txt')

    # 替换故事类型
    PlotsExpand = replaceStringSymbol(PlotsExpand, "{story_type}", story_type)

    # 定义处理单个文件夹的内部函数
    def process_single_folder(folder_num):
        # 定义输入文件夹路径
        input_dir = os.path.join(input_dir_base, str(folder_num))
        # 检查文件夹是否存在
        if os.path.exists(input_dir):
            input_file = os.path.join(input_dir, 'config.txt')
            # 检查 config.txt 文件是否存在
            if os.path.exists(input_file):
                # 读取上下文参考文本，并替换模板中的 {storylines}
                publicPlotsExpand = PlotsExpand
                context_file = file_to_string(input_file)
                publicPlotsExpand = replaceStringSymbol(publicPlotsExpand, "{storylines}", context_file)

                PlotsExpands = [
                    publicPlotsExpand, PlotsExpand_1, PlotsExpand_2, PlotsExpand_3, PlotsExpand_4,
                    PlotsExpand_5, PlotsExpand_6, PlotsExpand_7, PlotsExpand_8, PlotsExpand_9
                ]
                # 开始流程设计 (假设 generate_answers 和 extract_code_blocks 函数已定义)
                answers = generate_answers(PlotsExpands, 9, special_model_name)
                answers = extract_code_blocks(answers, 9, False)
                # 保存结果到 setup.txt
                string_to_file(answers, os.path.join(input_dir, 'setup.txt'))
                print(f"Saved setup.txt in {input_dir}")
            else:
                print(f"Warning: config.txt not found in {input_dir}")
        else:
            print(f"Skipping folder {folder_num} (not found)")

    # 使用 ThreadPoolExecutor 进行并发处理
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
        # 为每个文件夹提交任务
        futures = [executor.submit(process_single_folder, folder_num)
                   for folder_num in range(start_folder, end_folder + 1)]
        # 等待所有任务完成
        concurrent.futures.wait(futures)

    print(f"Finished processing folders {start_folder} to {end_folder}")


# 调用示例 (假设 temp_1_dir, start_number, end_number, story_type, special_model_name 已定义)
process_folders(temp_1_dir, start_number, end_number ,max_workers)


## 配置每一个章节的输入文本

In [None]:
import os
import re

def process_folders_2(start_num, end_num, folder_path):
    """
    遍历指定文件夹路径中指定序号范围的文件夹，处理 setup.txt 并生成 output 文件。
    如果 output 文件夹已存在，则跳过该文件夹的处理。

    Args:
        start_num (int): 起始文件夹序号。
        end_num (int): 结束文件夹序号。
        folder_path (str): 文件夹路径。
    """
    for k in range(start_num, end_num + 1):
        folder_name = str(k)  # 将文件夹序号转换为字符串，用于后续路径拼接
        current_folder_path = os.path.join(folder_path, folder_name)

        if not os.path.isdir(current_folder_path):
            print(f"警告：文件夹 '{current_folder_path}' 不存在，跳过。")
            continue

        output_folder_path = os.path.join(current_folder_path, "output")
        if os.path.exists(output_folder_path):
            print(f"警告：文件夹 '{output_folder_path}' 已存在，跳过文件夹 '{current_folder_path}' 的处理。")
            continue

        config_file_path = os.path.join(current_folder_path, "setup.txt")

        if not os.path.isfile(config_file_path):
            print(f"警告：文件 '{config_file_path}' 不存在，跳过文件夹 '{current_folder_path}'。")
            continue

        try:
            with open(config_file_path, "r", encoding="utf-8") as f:
                config_content = f.read()
        except Exception as e:
            print(f"警告：读取文件 '{config_file_path}' 失败：{e}，跳过文件夹 '{current_folder_path}'。")
            continue

        # 使用 "===" 分割得到字符串数组
        split_content = re.split(r"===", config_content.strip())
        split_content = [item.strip() for item in split_content if item.strip()] #去除空字符串

        os.makedirs(output_folder_path, exist_ok=True) #创建 output 文件夹

        start_file_num = (k - 1) * 9 + 1 # 计算起始文件名序号
        for i, item in enumerate(split_content):
            file_num = start_file_num + i # 计算当前文件名序号
            output_file_name = f"{file_num}.txt"
            output_file_path = os.path.join(output_folder_path, output_file_name)

            try:
                with open(output_file_path, "w", encoding="utf-8") as f:
                    f.write(item)
            except Exception as e:
                print(f"警告：写入文件 '{output_file_path}' 失败：{e}，跳过写入此文件。")

        print(f"文件夹 '{current_folder_path}' 处理完成。")


process_folders_2(start_number, end_number, temp_1_dir)

## 构造路径文件夹

In [None]:
import os
import re

def find_output_files(folder_path, start_num, end_num):
    """
    查找指定文件夹中指定序号之间的子文件夹的 output 文件夹中的所有文件路径。

    Args:
        folder_path (str): 要查找的文件夹的路径。
        start_num (int): 子文件夹序号的起始值（包含）。
        end_num (int): 子文件夹序号的结束值（包含）。

    Returns:
        list: 包含所有目标文件路径的字符串列表。
    """
    all_files = []

    for item in os.listdir(folder_path):
        item_path = os.path.join(folder_path, item)
        if os.path.isdir(item_path):

            match = re.match(r'(\d+)', item)
            if match:
                try:
                    folder_num = int(match.group(1))
                except ValueError:
                   continue
                if start_num <= folder_num <= end_num:
                    output_folder = os.path.join(item_path, "output")
                    if os.path.exists(output_folder) and os.path.isdir(output_folder):
                        for root, _, files in os.walk(output_folder):
                            for file in files:
                                file_path = os.path.join(root, file)
                                all_files.append(file_path)

    return all_files

filelist = find_output_files(temp_1_dir, start_number, end_number)

## 章节内容的写作和润色的通用函数

In [None]:
import concurrent.futures
import time

def process_single_file(prompt, file_path):
    """
    处理单个文件的核心函数。

    Args:
        prompt (str): 输入的提示词字符串。
        file_path (str): 要处理的文件路径。
    """
    try:
        input_file = file_to_string(file_path)
        if not input_file:
            print(f"跳过空文件或无法读取的文件: {file_path}")
            return

        answer_1 = generate_text(prompt + "\n" + input_file)
        string_to_file(answer_1, file_path)
        print(f"处理完成: {file_path}")
    except Exception as e:
        print(f"处理文件时出错: {file_path}, 错误: {e}")


def process_files_with_concurrency(prompt, file_paths, concurrency= max_workers, request_interval=0.1):
    """
    使用线程池并发处理多个文件，并控制并发请求的时间间隔。

    Args:
        prompt (str): 输入的提示词字符串。
        file_paths (list[str]): 要处理的文件路径列表。
        concurrency (int, optional): 并发线程数量
        request_interval (int, optional): 并发任务请求的时间间隔（秒），默认为 0.1s。
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = []
        for file_path in file_paths:
            future = executor.submit(process_single_file, prompt, file_path)
            futures.append(future)
            time.sleep(request_interval)  # 添加时间间隔

        for future in concurrent.futures.as_completed(futures):
            future.result()  # 检查是否有异常抛出，如果有则会重新抛出

# 定义提示词

In [None]:
TextCrafting = file_to_string('assets_compose/TextCrafting.txt')
PolishedDraft = file_to_string('assets_compose/PolishedDraft.txt')
PerfectText  = file_to_string('assets_compose/PerfectText.txt')

## 小说创作

In [None]:
process_files_with_concurrency(TextCrafting,filelist)

## 去除标题

In [None]:
import re
import os

def remove_brackets_and_contents_line(file_paths):
    """
    删除文件中包含[]及其内部内容的整行，并避免出现空行
    :param file_paths: 包含文件路径的列表
    """
    # 正则表达式匹配包含[]及其内容的整行
    pattern = r'.*\[.*?\][^\n]*\n?'  # 修改了正则表达式，匹配一整行

    for file_path in file_paths:
        # 检查文件是否存在
        if not os.path.exists(file_path):
            print(f"文件 {file_path} 不存在，跳过处理。")
            continue

        try:
            # 读取文件内容
            with open(file_path, 'r', encoding='utf-8') as file:
                content = file.read()

            # 使用正则表达式替换匹配的整行为空字符串
            new_content = re.sub(pattern, '', content)

             # 确保文件末尾有换行
            if new_content and not new_content.endswith('\n'):
                new_content += '\n'

            # 将修改后的内容写回文件
            with open(file_path, 'w', encoding='utf-8') as file:
                file.write(new_content)

            print(f"文件 {file_path} 中包含[]的行已被删除。")
        except Exception as e:
            print(f"处理文件 {file_path} 时发生错误：{e}")

remove_brackets_and_contents_line(filelist)

## 构建缓存文件夹

In [None]:
import os
import shutil

def copy_output_files(root_dir, start_num, end_num, output_dir):
    """
    遍历指定文件夹中数字命名的子文件夹，复制其 output 文件夹中的文件到指定输出文件夹。

    Args:
        root_dir (str): 根文件夹路径。
        start_num (int): 子文件夹数字名称的起始值（包含）。
        end_num (int): 子文件夹数字名称的结束值（包含）。
        output_dir (str): 输出文件存放的文件夹路径。
    """

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)  # 创建输出文件夹如果不存在

    for item in os.listdir(root_dir):
        item_path = os.path.join(root_dir, item)

        if os.path.isdir(item_path) and item.isdigit():
            try:
                num = int(item)
                if start_num <= num <= end_num:
                    output_folder = os.path.join(item_path, "output")
                    if os.path.exists(output_folder) and os.path.isdir(output_folder):
                        for file in os.listdir(output_folder):
                           file_path = os.path.join(output_folder, file)
                           if os.path.isfile(file_path):
                              shutil.copy(file_path, output_dir)
            except ValueError:
                pass # 忽略无法转换为数字的文件夹名

goal_dir = os.path.join(temp_2_dir, "temp_2")
copy_output_files(temp_1_dir, start_number, end_number, goal_dir)

## 获取待润色文件地址字符串列表

In [None]:
import os

def list_file_paths(folder_path):
  """
  列出指定文件夹路径中所有文件的绝对路径。

  Args:
    folder_path: 字符串，要扫描的文件夹路径。

  Returns:
    一个列表，包含文件夹中所有文件的绝对路径字符串。
    如果文件夹不存在或不是一个文件夹，则返回一个空列表。
  """
  file_paths = []
  if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
    return file_paths  # 返回空列表，文件夹不存在或不是文件夹

  for root, _, files in os.walk(folder_path):
    for file in files:
      file_path = os.path.join(root, file)
      file_paths.append(file_path)
  return file_paths

filelist = list_file_paths(goal_dir)

## 小说润色

In [None]:
process_files_with_concurrency(PolishedDraft,filelist)

## 移动最终目录

In [None]:
import os
import shutil

def copy_folder_contents(source_folder, destination_folder):
    """
    将源文件夹中的所有内容复制到目标文件夹。
    如果目标文件夹不存在，则创建它。

    Args:
      source_folder: 源文件夹的路径。
      destination_folder: 目标文件夹的路径。
    """
    try:
        # 检查源文件夹是否存在
        if not os.path.exists(source_folder):
            print(f"错误：源文件夹 '{source_folder}' 不存在。")
            return

        # 检查目标文件夹是否存在，如果不存在则创建
        if not os.path.exists(destination_folder):
            os.makedirs(destination_folder)
            print(f"已创建目标文件夹 '{destination_folder}'。")

        # 获取源文件夹中的所有文件和文件夹
        for item in os.listdir(source_folder):
            source_item_path = os.path.join(source_folder, item)
            destination_item_path = os.path.join(destination_folder, item)

            try:
                # 复制文件或文件夹
                if os.path.isfile(source_item_path):
                    shutil.copy2(source_item_path, destination_item_path)  # 使用copy2保留元数据
                    print(f"已将文件 '{item}' 从 '{source_folder}' 复制到 '{destination_folder}'。")
                elif os.path.isdir(source_item_path):
                    shutil.copytree(source_item_path, destination_item_path)
                    print(f"已将文件夹 '{item}' 从 '{source_folder}' 复制到 '{destination_folder}'。")
                else:
                    print(f"'{item}' 是未知类型，无法复制。")
            except Exception as e:
                print(f"复制 '{item}' 时出错: {e}")

        print(f"已完成将 '{source_folder}' 中的所有内容复制到 '{destination_folder}'。")

    except Exception as e:
        print(f"出现错误：{e}")

# 示例用法：
source_folder = goal_dir  # 请替换为你的源文件夹路径
destination_folder = os.path.join(output_dir, "Output_TXT")  # 请替换为你的目标文件夹路径
copy_folder_contents(source_folder, destination_folder)


## 获取获取最终目录

In [None]:
import os

def list_file_paths(folder_path):
  """
  列出指定文件夹路径中所有文件的绝对路径。

  Args:
    folder_path: 字符串，要扫描的文件夹路径。

  Returns:
    一个列表，包含文件夹中所有文件的绝对路径字符串。
    如果文件夹不存在或不是一个文件夹，则返回一个空列表。
  """
  file_paths = []
  if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
    return file_paths  # 返回空列表，文件夹不存在或不是文件夹

  for root, _, files in os.walk(folder_path):
    for file in files:
      file_path = os.path.join(root, file)
      file_paths.append(file_path)
  return file_paths

filelist = list_file_paths(destination_folder)

## 完成最终润色

In [None]:
process_files_with_concurrency(PerfectText,filelist)

## 输出文件的格式化

In [None]:
import os
import shutil

def process_txt_files(folder_path):
    """
    遍历指定文件夹中的所有 txt 文件，使用 extract_code_blocks 函数提取代码块，
    并将提取的代码块保存为 .md 格式的文件。

    Args:
        folder_path: 目标文件夹的路径。
    """

    md_files_dir = os.path.join(os.path.abspath(os.path.join(folder_path, '..')), "Output_MD")  # 创建 MD 文件保存目录
    os.makedirs(md_files_dir, exist_ok=True)  # 确保目录存在，如果存在则不报错

    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            txt_file_path = os.path.join(folder_path, filename)
            md_filename = os.path.splitext(filename)[0] + ".md"  # 构建输出 MD 文件名
            md_file_path = os.path.join(md_files_dir, md_filename)

            try:
                with open(txt_file_path, 'r', encoding='utf-8') as infile, \
                     open(md_file_path, 'w', encoding='utf-8') as outfile:

                    text = infile.read()  # 读取整个文件内容
                    code_blocks = extract_code_blocks(text)  # 提取代码块,提取所有代码块

                    outfile.write(code_blocks)  # 将提取的代码块写入 MD 文件

                print(f"已处理文件: {filename} -> {md_filename}")

            except Exception as e:
                print(f"处理文件 {filename} 时出错：{e}")

    print(f"\n所有文件已处理完毕，MD 文件保存在: {md_files_dir}")


process_txt_files(destination_folder)

## 构建Word文件夹
```python
pip install python-docx
```

In [None]:
import os
from docx import Document
from docx.shared import Pt
from docx.enum.style import WD_STYLE_TYPE
from docx.oxml.ns import qn
from docx.shared import RGBColor


def convert_md_to_doc_with_format(base_path):
    """
    在指定路径中查找 Output_MD 文件夹，并将其中的 md 文件转换为 doc 文件，
    应用自定义格式：
        - 无 '#' 开头的行：楷体，二号字，黑色。
        - '#' 开头的行：移除所有 '#'，楷体，二号字，加粗，浅蓝色。
    同时在同级目录下创建 Output_Word 文件夹。

    Args:
        base_path (str): 基础路径，函数将在这个路径下寻找 Output_MD 文件夹。
    """
    md_files_dir = None
    for item in os.listdir(base_path):
        if os.path.isdir(os.path.join(base_path, item)) and item == "Output_MD":
            md_files_dir = os.path.join(base_path, item)
            break

    if not md_files_dir:
        print("Error: Output_MD not found in the given path.")
        return

    word_files_dir = os.path.join(base_path, "Output_Word")
    os.makedirs(word_files_dir, exist_ok=True)  # 创建 Output_Word 文件夹，如果存在则不报错

    for filename in os.listdir(md_files_dir):
        if filename.lower().endswith(".md"):
            md_filepath = os.path.join(md_files_dir, filename)
            doc_filename = os.path.splitext(filename)[0] + ".docx"
            doc_filepath = os.path.join(word_files_dir, doc_filename)

            try:
                with open(md_filepath, "r", encoding="utf-8") as md_file:
                    lines = md_file.readlines()

                # 创建 docx 文档并添加内容
                document = Document()

                # 设置全局字体为楷体
                style = document.styles.add_style('KaiTi', WD_STYLE_TYPE.PARAGRAPH)
                style.font.name = '楷体'
                style._element.rPr.rFonts.set(qn('w:eastAsia'), '楷体')

                for line in lines:
                    line = line.strip()  # 去除行首尾的空白字符
                    if not line:  # 跳过空行
                        continue

                    if line.startswith("#"):
                        # 处理标题行
                        cleaned_line = line.lstrip("#").strip()  # 删除所有#并去除首尾空白
                        paragraph = document.add_paragraph(style="KaiTi")
                        run = paragraph.add_run(cleaned_line)
                        run.font.size = Pt(22)
                        run.font.bold = True
                        run.font.color.rgb = RGBColor(110, 155, 197)  # 浅蓝色(light blue)
                    else:
                        # 处理普通行
                        paragraph = document.add_paragraph(style="KaiTi")
                        run = paragraph.add_run(line)
                        run.font.size = Pt(22)

                # 保存文档为 .docx 格式
                document.save(doc_filepath)
                print(f"Converted: {filename} -> {doc_filename}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

convert_md_to_doc_with_format(output_dir)

## 统计报告
- 总字数
- 少于1000字的文件名
```
pip install tabulate
```

In [None]:
import os
from collections import Counter
from tabulate import tabulate
import re

def xiaoshuo_tongji(folder_path, output_path):
    """
    统计文件夹中所有 txt 小说文件的字数信息，并以 Markdown 格式输出到文件。

    Args:
        folder_path (str): 文件夹的路径。
        output_path (str): 输出 Markdown 文件的路径。
    """

    file_paths = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.txt')]
    file_word_counts = []
    file_paths_dict = {}  # 用于存储文件路径和字数的字典

    for file_path in file_paths:
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()

                # 使用字符串长度计算中文字符数量
                word_count = len(content.replace('\n','').replace(' ','').replace('\r',''))
                file_word_counts.append(word_count)
                file_paths_dict[file_path] = word_count  # 将文件路径和字数对应

        except Exception as e:
            print(f"读取文件 {file_path} 时出错: {e}")
            continue

    if not file_word_counts:
        print("文件夹中没有找到任何 txt 文件。")
        return

    # 基础统计
    total_files = len(file_word_counts)
    total_word_count = sum(file_word_counts)
    avg_word_count = total_word_count // total_files
    max_word_count = max(file_word_counts)
    min_word_count = min(file_word_counts)

    # 找到最大和最小字数对应的文件路径
    max_file_path = [path for path, count in file_paths_dict.items() if count == max_word_count][0]
    min_file_path = [path for path, count in file_paths_dict.items() if count == min_word_count][0]

    # 字数中位数 (手动计算)
    sorted_word_counts = sorted(file_word_counts)
    mid = total_files // 2
    if total_files % 2 == 0:
        median_word_count = (sorted_word_counts[mid - 1] + sorted_word_counts[mid]) // 2
    else:
        median_word_count = sorted_word_counts[mid]

    # 字数区间
    bins = [0, 800, 1000, 1200, 1500, 1800, 2000, 2300, 2500, 2700, 3000, 3300, 3500, 4000, 5000, 6000, float('inf')]
    labels = ['800以下', '1000-1200', '1200-1500', '1500-1800', '1800-2000', '2000-2300', '2300-2500', '2500-2700', '2700-3000', '3000-3300', '3300-3500', '3500-4000', '4000-5000', '5000-6000', '6000以上']

    word_count_freq = Counter()
    for count in file_word_counts:
        for i in range(len(bins) - 1):
            if bins[i] < count <= bins[i+1]:
                if i == len(bins) - 2:  # 处理最后一个区间
                    word_count_freq[labels[-1]] += 1
                else:
                    word_count_freq[labels[i]] += 1
                break

     # 创建 Markdown 表格, 添加文件路径链接
    table_data = [
        ["总文件数", total_files, ""],  # 第3列留空
        ["平均字数", avg_word_count, ""],
        ["字数中位数", median_word_count, ""],
        ["最大字数", max_word_count, f"[{os.path.basename(max_file_path)}]({max_file_path})"],
        ["最小字数", min_word_count, f"[{os.path.basename(min_file_path)}]({min_file_path})"],
        ["总字数", total_word_count, ""],
    ]
    table_headers = ["指标", "数值", "文件路径"]
    table = tabulate(table_data, headers=table_headers, tablefmt="grid") # 使用 "grid" 表格格式


    # 字数区间分布表格 (只显示数量大于0的行, 并按区间排序)
    range_table_data = [[interval, count] for interval, count in word_count_freq.items() if count > 0]
    range_table_data.sort(key=lambda x: bins[labels.index(x[0])])  # 根据 bins 中的顺序排序
    range_table_headers = ["字数区间", "文件数量"]
    range_table = tabulate(range_table_data, headers=range_table_headers, tablefmt="grid")



    # 写入 Markdown 文件
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write("# 小说数据统计\n\n")
        f.write("## 总体统计\n\n")
        f.write(table + "\n\n")
        f.write("## 字数区间分布\n\n")
        f.write(range_table + "\n\n")

    print(f"统计结果已保存到: {output_path}")

# 示例用法
folder_path = os.path.join(output_dir, "Output_TXT")  # 替换为你的小说文件夹路径
output_path = os.path.join(output_dir, "statistics.md")  # 替换为你的输出文件路径

xiaoshuo_tongji(folder_path, output_path)

## 在colab中打包并下载

In [None]:
import shutil
from google.colab import files

# 定义文件夹路径和压缩包名称
folder_path = "/content/生成书籍/output"
zip_path = "/content/output.zip"

# 打包为zip
shutil.make_archive(zip_path.replace(".zip", ""), 'zip', folder_path)

# 下载文件
files.download(zip_path)
