<a href="https://colab.research.google.com/github/shi991027/RAG/blob/main/audio_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install torch
!pip install scikit-learn
!pip install deepseek_tokenizer
!pip install openai
!pip install -U sentence-transformers
!pip install -Uqq fastembed
!pip install json_repair

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
!pip install dashscope

Collecting dashscope
  Downloading dashscope-1.23.1-py3-none-any.whl.metadata (6.8 kB)
Downloading dashscope-1.23.1-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m20.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dashscope
Successfully installed dashscope-1.23.1


In [40]:
from openai import OpenAI
from openai.resources.chat import Completions
from typing import Optional, Dict, Any, List
from enum import Enum, auto
import dashscope

class APIVendor(Enum):
    """API厂商枚举类"""
    ALIYUN = auto()
    DEEPSEEK = auto()
    ALIYUNAUDIO = auto()
    ALIYUNVIDEO=auto()
    @property
    def config(self):
        """获取厂商配置"""
        return {
            APIVendor.ALIYUN: {
                'api_key': 'sk-7f9a260343a54674b720a2d5fa772a5d',
                'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
                'suported_models': [
                    'qwen-vl-plus',
                ]
            },
            APIVendor.DEEPSEEK: {
                'api_key': 'sk-c553c5fa02e64d729f91e8593f914776',
                'base_url': 'https://api.deepseek.com',
                'suported_models': [
                    'deepseek-chat',
                ]
            },
            APIVendor.ALIYUNAUDIO: {
                'api_key': 'sk-7f9a260343a54674b720a2d5fa772a5d',
                'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
                'suported_models': [
                    'qwen-audio-turbo-latest',
                ]
            },
            APIVendor.ALIYUNVIDEO: {
                'api_key': 'sk-7f9a260343a54674b720a2d5fa772a5d',
                'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
                'suported_models': [
                    'qwen-vl-plus',
                ]
            }
        }[self]

# 自定义Completions类来重写create方法
class CustomCompletions(Completions):
    def __init__(self, client):
        super().__init__(client)
        self.client = client

    def create(self, messages=None, **params):
        """重写create方法，根据vendor类型适配不同的API调用"""
        if self.client.vendor == APIVendor.ALIYUNAUDIO:

            model=params.get('model')
            if not model:
              model=self.client.vendor.config['suported_models'][0]
            stream_value = params.get('stream', False)
            return dashscope.MultiModalConversation.call(
                api_key=self.client.vendor.config['api_key'],
                model=model,
                messages=messages,
                stream=stream_value,
                incremental_output=stream_value
            )
        else:
            # 调用原始的OpenAI方法
            return super().create(messages=messages, **params)

# 自定义Chat资源来使用我们的CustomCompletions
class CustomChat:
    def __init__(self, client):
        self.completions = CustomCompletions(client)
        self.client = client

# LLM客户端扩展OpenAI但替换chat资源
class LLMClient(OpenAI):
    """统一的LLM客户端，支持多种模型"""

    def __init__(self, vendor: Optional[APIVendor] = APIVendor.DEEPSEEK, **kwargs):
        """
        初始化LLM客户端

        Args:
            vendor: 指定厂商枚举，如果为None则使用默认厂商
            **kwargs: 父类OpenAI支持的初始化参数
        """
        if vendor:
            assert vendor in APIVendor, f"不支持的厂商: {vendor}"
            assert vendor.config['suported_models'], f"厂商没有支持的模型: {vendor}"
        else:
            vendor = APIVendor.DEEPSEEK

        self.vendor = vendor
        self.supported_models = vendor.config['suported_models']
        config = self.vendor.config

        # 合并配置，允许通过kwargs覆盖默认配置
        init_params = {
            'api_key': config['api_key'],
            'base_url': config['base_url']
        }
        init_params.update(kwargs)

        super().__init__(**init_params)

        # 用我们的自定义实现替换chat资源
        self.chat = CustomChat(self)

# 默认配置
DEFAULT_TEXT_LLM = LLMClient()  # model='deepseek-chat'
DEFAULT_VISION_LLM = LLMClient(vendor=APIVendor.ALIYUN)  # model='qwen-vl-plus'
DEFAULT_AUDIO_LLM = LLMClient(vendor=APIVendor.ALIYUNAUDIO)  # model='qwen-audio-turbo-latest'
DEFAULT_VIDEO_LLM= LLMClient(vendor=APIVendor.ALIYUNVIDEO)

In [41]:
from inspect import currentframe
"""
语言模型编程 (LMP) - 单文件实现

本模块提供了一个简洁的框架，用于与语言模型 (LLM) 进行交互，
例如 OpenAI 的 GPT 系列（包括 GPT-4o 等多模态模型）或其他模型如 DeepSeek。
它通过装饰器将 LLM 调用简化为 Python 函数调用。

主要特性:
- @simple: 用于函数返回简单文本提示，期望纯文本输出的装饰器。
- @structured: 用于函数返回提示，期望 JSON 输出（内置自动修复）的装饰器。
- 多模态输入: 通过返回列表中包含 ('image', data) 元组的方式支持文本和图像输入（URL、本地路径、Base64）。
- 流式输出: 在装饰后的函数调用后附加 `.stream()` 以获取基于生成器的流式响应。
- 批量处理: 附加 `.batch()` 以并发处理多个输入。
- 灵活配置: 在装饰器级别设置默认模型、系统消息、API 参数，或在每次调用时覆盖。
- 自动客户端选择: 根据模型名称启发式地选择合适的客户端（例如，为视觉模型选择 OpenAI 客户端）。

基本用法:

from lmp import simple, structured # 假设此文件保存为 lmp.py

# 纯文本示例
@simple(model='deepseek-chat')
def greet(name: str):
    # 函数文档字符串可用作系统消息
    '''你是一个友好的助手。'''
    return f"向 {name} 说声你好。"

print(greet("世界"))

# 多模态示例 (需要配置 OpenAI 客户端)
@structured(model='gpt-4o')
def analyze_image(image_path: str, focus: str):
    '''分析图像并返回 JSON。'''
    return [
        ("image", image_path), # 图像输入
        f"分析这张图片，重点关注 {focus}。返回包含 'description' 和 'details' 键的 JSON。" # 文本提示
    ]

# 确保设置了 OPENAI_API_KEY 或已配置客户端
# image_analysis = analyze_image("path/to/image.jpg", "主要主体")

# 多轮会话上下文的 lmp 示例
@simple(model='deepseek-chat')
def query_rewrite(query: str):
    '''你是一个问题改写助手，通过上下文对用户问题进行提炼改写，禁止出现代词，不要多余废话。'''
    return [
        ["告诉我关于秦始皇的事情"],
        ["秦始皇是中国历史上第一个皇帝，他统一了六国并建立了秦朝。"],
        [query]
    ]
# 流式输出示例
for chunk in query_rewrite.stream("他有哪些主要成就？"):
    print(chunk, end="", flush=True)

"""

from functools import wraps
from typing import Any, Dict, List, Union, Iterator, Callable, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from openai import OpenAI
from json_repair import repair_json # 用于修复可能格式错误的 JSON
import base64 # 用于图像 Base64 编码/解码
import mimetypes # 用于猜测文件类型
import os # 用于环境变量和文件路径操作
import logging # 用于日志记录


# 配置基本日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__) # 获取日志记录器

# ================== 工具函数 ==================
def _format_audio_data(data: Union[str, bytes]) -> Dict[str, Any]:
    """将图像 URL、本地路径或字节数据格式化为 API 兼容的字典"""
    url = ""
    detail = "auto"

    if isinstance(data, str):
        if data.startswith(("http://", "https://")):
            url = data
        elif data.startswith("data:image/"):
            url = data
        elif os.path.exists(data):
            try:
                with open(data, "rb") as image_file:
                    b64_data = base64.b64encode(image_file.read()).decode('utf-8')
                mime_type, _ = mimetypes.guess_type(data)
                mime_type = mime_type or "image/png"
                if not mime_type.startswith("image/"):
                    raise ValueError(f"无效的文件类型: {mime_type}")
                url = f"data:{mime_type};base64,{b64_data}"
            except Exception as e:
                raise ValueError(f"处理本地图像文件 '{data}' 时出错: {e}") from e
        else:
            # 新增：假设无前缀的字符串是 Base64 数据，添加默认 PNG 前缀
            try:
                # 简单验证是否可能是 Base64（检查长度和常见字符）
                if len(data) > 50 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" for c in data[:50]):
                    url = f"data:image/png;base64,{data}"
                else:
                    raise ValueError(f"无效的图像字符串: 不是 URL、数据 URI 或现有文件路径: '{data[:100]}...'")
            except Exception as e:
                raise ValueError(f"无法解析图像数据: {e}")
    elif isinstance(data, bytes):
        b64_data = base64.b64encode(data).decode('utf-8')
        url = f"data:image/png;base64,{b64_data}"
    else:
        raise TypeError(f"不支持的图像数据类型: {type(data)}")

    return url
def _format_video_data(data: Union[str, bytes]) -> Dict[str, Any]:
    """将图像 URL、本地路径或字节数据格式化为 API 兼容的字典"""
    url = ""
    detail = "auto"

    if isinstance(data, str):
        if data.startswith(("http://", "https://")):
            url = data

        else:
          raise TypeError(f"非合法的视频链接: {type(data)}")

    else:
        raise TypeError(f"不支持的视频数据类型: {type(data)}")

    return {"url": url, "detail": detail}

def _format_image_data(data: Union[str, bytes]) -> Dict[str, Any]:
    """将图像 URL、本地路径或字节数据格式化为 API 兼容的字典"""
    url = ""
    detail = "auto"

    if isinstance(data, str):
        if data.startswith(("http://", "https://")):
            url = data
        elif data.startswith("data:image/"):
            url = data
        elif os.path.exists(data):
            try:
                with open(data, "rb") as image_file:
                    b64_data = base64.b64encode(image_file.read()).decode('utf-8')
                mime_type, _ = mimetypes.guess_type(data)
                mime_type = mime_type or "image/png"
                if not mime_type.startswith("image/"):
                    raise ValueError(f"无效的文件类型: {mime_type}")
                url = f"data:{mime_type};base64,{b64_data}"
            except Exception as e:
                raise ValueError(f"处理本地图像文件 '{data}' 时出错: {e}") from e
        else:
            # 新增：假设无前缀的字符串是 Base64 数据，添加默认 PNG 前缀
            try:
                # 简单验证是否可能是 Base64（检查长度和常见字符）
                if len(data) > 50 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" for c in data[:50]):
                    url = f"data:image/png;base64,{data}"
                else:
                    raise ValueError(f"无效的图像字符串: 不是 URL、数据 URI 或现有文件路径: '{data[:100]}...'")
            except Exception as e:
                raise ValueError(f"无法解析图像数据: {e}")
    elif isinstance(data, bytes):
        b64_data = base64.b64encode(data).decode('utf-8')
        url = f"data:image/png;base64,{b64_data}"
    else:
        raise TypeError(f"不支持的图像数据类型: {type(data)}")

    return {"url": url, "detail": detail}

def _format_content_parts(parts: List[Union[str, tuple]]) -> List[Dict[str, Any]]:
    """将字符串和 ('类型', 数据) 元组的混合列表转换为 API 兼容的字典列表"""
    formatted_content: List[Dict[str, Any]] = []

    # 检查是否存在多模态内容
    has_multimodal = any(isinstance(part, tuple) and part[0] != 'text' for part in parts)

    for part in parts:
        # 处理文本部分
        if isinstance(part, str):
            # 如果有其他多模态内容，文本部分需要明确类型
            if has_multimodal:
                formatted_content.append({"type": "text", "text": part})
            else:
                formatted_content.append(part)
            continue

        # 处理元组部分
        if not (isinstance(part, tuple) and len(part) == 2):
            raise ValueError(f"不支持的内容部分: {type(part)}。应为 str 或 ('类型', 数据) 元组。")

        part_type, part_data = part
        if part_type == "image":
            try:
                formatted_content.append({"type": "image_url", "image_url": _format_image_data(part_data)})
            except (ValueError, TypeError) as e:
                log.error(f"跳过无效的图像数据: {e}")
        else:
            log.warning(f"内容中包含不支持的元组类型 '{part_type}'。已忽略。")
        if part_type == "audio":
            try:
                formatted_content.append({"type": "audio", "audio": _format_audio_data(part_data)})
            except (ValueError, TypeError) as e:
                log.error(f"跳过无效的图像数据: {e}")
        else:
            log.warning(f"内容中包含不支持的元组类型 '{part_type}'。已忽略。")
        if part_type == "video":
            try:
                formatted_content.append({"type": "video_url", "video_url": _format_video_data(part_data)})
            except (ValueError, TypeError) as e:
                log.error(f"跳过无效的图像数据: {e}")
        else:
            log.warning(f"内容中包含不支持的元组类型 '{part_type}'。已忽略。")

    return formatted_content

def extract_content(response, vendor):
    if vendor == APIVendor.ALIYUNAUDIO:
        return response.output.choices[0].message.content[0]['text']

    else:
        return response.choices[0].message.content
def get_stream_out(response_stream, vendor):
    if vendor == APIVendor.ALIYUNAUDIO:
        filtered_chunks = (chunk for chunk in response_stream
                           if 'output' in chunk
                           and 'choices' in chunk['output']
                           and len(chunk['output']['choices']) > 0
                           and 'message' in chunk['output']['choices'][0]
                           and 'content' in chunk['output']['choices'][0]['message']
                           and len(chunk['output']['choices'][0]['message']['content']) > 0)

        # 直接从筛选后的流中产生 (yield) 文本块
        for chunk in filtered_chunks:
            # 提取文本内容
            delta_content  = chunk.output.choices[0].message.content[0]['text']
            if delta_content :
                yield delta_content
    else :
        for chunk in response_stream:
            delta_content = chunk.choices[0].delta.content
            print(delta_content)
            if delta_content:  # 仅当有新内容时才 yield
                yield delta_content

def build_messages(
    result: Union[str, List[Union[str, Tuple]]],
    system_message: str,
    doc: str = None
) -> List[Dict[str, Any]]:
    """构建最终用于 API 调用的消息字典列表

    参数:
        result: 可以是单个字符串(用户消息)或列表(多轮对话消息对)
        system_message: 系统消息
        doc: 函数文档字符串，可作为备用系统消息

    返回:
        格式化的消息列表，包含角色和内容

    规则:
        1. 消息内容必须是字符串或 ('类型', 数据) 元组，字符串默认是 "text"类型缺省的内容元组。
        2. 其他类型当前仅支持(“image”, img_url), img_url是图像的链接地址或者 base64 编码的图片
        3. 若 result 是单字符串： 系统消息 + 第一轮纯文本用户消息
        4. 若 result 是单层列表： 系统消息 + 第一轮包含多模态数据的用户消息，比如["请描述图片内容", ("image", image_url)]
        5. 若 result 是双层列表[[]]: 系统消息 + 成对的用户/助手消息 + 最新一轮用户消息, 列表中的消息必须是奇数个元素
    """
    messages: List[Dict[str, Any]] = []
    effective_system_message = doc or system_message or "你是一个有用的助手。"

    if isinstance(result, str):
        # 单个字符串结果成为用户消息内容
        messages.append({"role": "system", "content": effective_system_message})
        messages.append({"role": "user", "content": result})
    elif isinstance(result, (list, tuple)):
        processed_list = list(result)

        # 检查是否为双层列表（对话历史）
        if all(isinstance(item, (list, tuple)) for item in processed_list):
            # 双层列表处理：系统消息 + 对话历史 + 最新用户消息
            messages.append({"role": "system", "content": effective_system_message})

            # 确保消息数量为奇数
            if len(processed_list) % 2 == 0:
                raise ValueError("双层列表中的消息数量必须是奇数，包含系统消息后形成完整的对话轮次")

            for i, msg in enumerate(processed_list):
                if not isinstance(msg, (str, tuple, list)):
                    raise ValueError(f"消息必须是字符串、元组或列表，得到: {type(msg)}")

                # 格式化消息
                if isinstance(msg, list):
                    content = _format_content_parts(msg)
                else:
                    content = _format_content_parts([msg])

                # 确定角色：奇数索引为用户，偶数索引为助手
                role = "user" if i % 2 == 0 else "assistant"
                # print(f"message {i}: {msg}, role: {role}, content: {content}")
                # 确保内容格式正确，直接使用格式化后的内容
                messages.append({"role": role, "content": content[0] if isinstance(content, list) and len(content) == 1 else content})
        else:
            # 单层列表处理：系统消息 + 用户消息
            messages.append({"role": "system", "content": effective_system_message})

            # 格式化用户消息
            user_content = _format_content_parts(processed_list)
            messages.append({"role": "user", "content": user_content})
    else:
        raise ValueError(f"被装饰的函数必须返回 str 或 list/tuple。得到: {type(result)}")

    # 验证最终消息结构
    if not all(isinstance(m, dict) and 'role' in m and 'content' in m for m in messages):
        log.error(f"内部错误: build_messages 创建了无效的消息结构: {messages}")
        raise TypeError("未能构建有效的消息字典。")

    return messages

def prepare_api_params(
    default_params: Dict[str, Any], # 装饰器设置的默认参数
    api_params: Dict[str, Any],     # 单次调用时传入的参数
    model: str,                     # 由装饰器确定的最终模型名称 (必需)
    stream: bool                    # 是否为流式调用
) -> Dict[str, Any]:
    """合并默认参数和调用特定参数"""
    final_params = default_params.copy()
    final_params.update(api_params) # 调用特定参数覆盖默认值
    final_params["model"] = model # 确保设置了模型
    final_params["stream"] = stream # 确保 stream 参数与调用类型匹配
    return final_params

def call_llm(client: OpenAI, messages: List[Dict[str, Any]], params: Dict[str, Any]):
    """执行实际的 LLM API 调用"""
    if client is None:
        raise ConnectionError("LLM 客户端未配置或初始化失败。")
    try:
        model = params.get('model')
        if not model:
            raise ValueError("params中必须包含model参数")
        # 记录调试信息
        log.debug(f"调用 LLM API: 模型={model}, 流式={params.get('stream', False)}, 参数={ {k:v for k,v in params.items() if k not in ['stream', 'model']} }, 消息数={len(messages)}")


        response =client.chat.completions.create(messages=messages, **params)


        return response
    except Exception as e:
        log.error(f"LLM API 调用失败: 模型={model}, 错误={e}", exc_info=True)
        if hasattr(e, 'response') and hasattr(e.response, 'text'):
            e.response_content = e.response.text
        raise


# ================== 装饰器逻辑 ==================

def _create_lmp_decorator(
    is_structured: bool = False, # 是否期望结构化输出 (JSON)
    llm_client: OpenAI = None,  # 是否为该装饰器指定了特定客户端
    model: str = None,          # 是否为该装饰器指定了特定模型
    system_message: str = None, # 是否显式指定了系统消息 (None 则依赖文档字符串)
    **default_params          # 传递给 API 的默认参数 (如 temperature)
) -> Callable:
    """创建实际装饰器的工厂函数"""

    if not llm_client:
        raise ValueError("必须提供有效的LLM客户端")
    if not model:
        raise ValueError("必须指定模型名称")

    def decorator(func: Callable) -> Callable:
        """实际的装饰器函数"""
        # 获取被装饰函数的文档字符串，作为潜在的系统消息来源
        func_docstring = func.__doc__.strip() if func.__doc__ else None

        @wraps(func) # 保留原函数的元信息 (名称, 文档字符串等)
        def call_normal(*args, **kwargs) -> Union[str, Dict[str, Any]]:
            """处理标准的 (非流式) LLM 调用"""
            # 从 kwargs 中弹出 api_params (如果存在)
            api_params_override = kwargs.pop("api_params", {})

            # 调用用户定义的函数获取结果 (提示或消息列表)
            func_result = func(*args, **kwargs)
            messages = build_messages(func_result, system_message, func_docstring)

            # 构建最终的消息列表，可能使用函数文档字符串作为系统消息

            # 准备最终的 API 参数
            final_params = prepare_api_params(
                default_params, api_params_override, model=model, stream=False # 非流式
            )
            # 执行 API 调用
            response = call_llm(llm_client, messages, final_params)
            current_vendor = llm_client.vendor
            # 提取响应内容

            content = extract_content(response, current_vendor)


            #content=response.output.choices[0].message.content[0]['text']
            #content =response.choices[0].message.content
            if is_structured:  # 如果期望 JSON 输出
              try:
                    # 尝试修复并解析 JSON
                return repair_json(content, return_objects=True)  # return_objects=True 更稳健
              except Exception as parse_error:
                log.warning(f"未能将 LLM 响应解析为 JSON: {parse_error}。返回原始内容:\n{content}")
                return content  # 解析失败则返回原始字符串
            else:  # 如果期望纯文本输出
              return content





        def call_stream(*args, **kwargs) -> Iterator[str]:
            """处理流式 LLM 调用，产生文本块"""
            api_params_override = kwargs.pop("api_params", {})
            func_result = func(*args, **kwargs)
            messages = build_messages(func_result, system_message, func_docstring)
            final_params = prepare_api_params(
                default_params, api_params_override, model=model, stream=True  # 流式
            )
            vendor=llm_client.vendor
            # 获取流式响应
            response_stream = call_llm(llm_client, messages, final_params)
            yield from get_stream_out(response_stream, vendor)

        def call_batch(*batch_inputs: Union[Any, List[Any]], max_workers: int = 10, **batch_kwargs) -> Iterator[Union[str, Dict[str, Any], Exception]]:
            """并发处理多个输入的批量调用"""
            # 标准化输入: 接受单个列表或多个参数作为输入项
            if len(batch_inputs) == 1 and isinstance(batch_inputs[0], (list, tuple)):
                all_inputs = batch_inputs[0]
            else:
                all_inputs = list(batch_inputs) # 将参数元组转为列表

            log.info(f"开始对 {len(all_inputs)} 个输入进行批量处理 (最大工作线程数={max_workers})")
            results = [None] * len(all_inputs) # 预分配结果列表以保持顺序
            futures = {} # 用于将 future 映射回原始索引

            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                for i, input_item in enumerate(all_inputs):
                    # 准备 call_normal 的参数: 处理单个参数 vs 参数元组/列表
                    call_args = input_item if isinstance(input_item, (list, tuple)) else [input_item]
                    # 提交任务到线程池
                    future = executor.submit(call_normal, *call_args, **batch_kwargs)
                    futures[future] = i # 存储 future 到索引的映射

                # 当任务完成时获取结果
                for future in as_completed(futures):
                    idx = futures[future] # 获取原始索引
                    try:
                        result = future.result() # 获取任务结果
                        results[idx] = result
                        log.debug(f"批量处理项 {idx} 成功完成。")
                    except Exception as e:
                        log.error(f"处理索引 {idx} 的批量项时出错: {e}", exc_info=True)
                        results[idx] = e # 在结果中存储异常对象

            # 按原始输入顺序产生结果
            yield from results


        # 将 stream 和 batch 方法附加到主装饰函数上
        call_normal.stream = call_stream
        call_normal.batch = call_batch
        return call_normal # 返回包装后的主函数

    return decorator # 返回配置好的装饰器


# ================== 公开的装饰器 ==================

def simple(
    llm_client: OpenAI = DEFAULT_TEXT_LLM,
    model: str = "deepseek-chat",
    system_message: str = None,
    **default_params
) -> Callable:
    """
    用于期望简单文本输出的 LLM 函数的装饰器。

    Args:
        llm_client: 覆盖默认客户端 (OpenAI 或其他)。
        model: 指定模型 (例如 'gpt-4o', 'deepseek-chat')。如果客户端未指定，模型会决定客户端选择。
        system_message: 显式系统消息 (覆盖函数文档字符串)。
        **default_params: 默认 API 参数 (例如 temperature=0.7)。

    Returns:
        带有 `.stream()` 和 `.batch()` 方法的装饰后函数。
    """
    return _create_lmp_decorator(
        is_structured=False, # 非结构化输出
        llm_client=llm_client,
        model=model,
        system_message=system_message,
        **default_params
    )

def structured(
    llm_client: OpenAI = DEFAULT_TEXT_LLM,
    model: str = "deepseek-chat",
    system_message: str = None,
    **default_params
) -> Callable:
    """
    用于期望 JSON 输出 (会尝试修复) 的 LLM 函数的装饰器。

    Args:
        llm_client: 覆盖默认客户端。
        model: 指定模型。如果客户端未指定，模型会决定客户端选择。
        system_message: 显式系统消息 (覆盖函数文档字符串)。
        **default_params: 默认 API 参数。

    Returns:
        装饰后函数，返回解析后的 JSON (或出错时返回原始字符串)，
        带有 `.stream()` (产生原始文本块) 和 `.batch()` 方法。
    """
    return _create_lmp_decorator(
        is_structured=True, # 结构化输出
        llm_client=llm_client,
        model=model,
        system_message=system_message,
        **default_params
    )

In [42]:
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
from deepseek_tokenizer import ds_token
from sentence_transformers import SentenceTransformer
import logging

class DocumentChunkExtractor:
    def __init__(self, model_name="tomaarsen/static-similarity-mrl-multilingual-v1"):
        """初始化文档块提取器

        Args:
            model_name: 使用的向量模型名称，默认使用static-similarity-mrl-multilingual-v1
        """
        # 静态向量模型，cpu 运行 ms 级,模型大小 400Mb，加载耗时 7s
        self.embed_model = SentenceTransformer(model_name)

    def extract_important_chunks(self, doc_text: str, max_chunks: int = 20, chunk_size: int = 128) -> list[str]:
        """提取文档中最重要的文本块

        Args:
            doc_text: 输入文档文本，非空字符串
            max_chunks: 抽取后返回的最大块数，必须大于等于4
            chunk_size: 文本块Token大小，必须大于0

        Returns:
            list[str]: 提取的重要文本块列表

        Raises:
            ValueError: 如果输入参数不合法
        """

        # 参数校验
        if not isinstance(doc_text, str) or not doc_text.strip():
            raise ValueError("doc_text must be a non-empty string")
        if not isinstance(max_chunks, int) or max_chunks < 4:
            raise ValueError("max_chunks must be an integer >= 4")
        if not isinstance(chunk_size, int) or chunk_size <= 0:
            raise ValueError("chunk_size must be a positive integer")

        try:
            # 分块并生成嵌入
            chunks = self._split_text(doc_text, chunk_size)
            if len(chunks) <= max_chunks:
                logging.info(f"文档已分割为{len(chunks)}个块(<=最大块数)，返回完整文本")
                return [doc_text]

            logging.info(f"文档已分割为{len(chunks)}个块，正在生成嵌入向量")
            embeddings = list(self.embed_model.encode(chunks))

            # 使用 KMeans 聚类获取最近的样本索引
            logging.info(f"正在进行KMeans聚类，聚类数量为{max_chunks-4}")
            closest_indices = self._kmeans_clustering(embeddings, n_clusters=max_chunks-4)

            # 获取首尾各两个块和聚类中心最近的文本块
            important_indices = set(list(range(2)) + list(range(len(chunks)-2, len(chunks))))
            cluster_indices = set([idx for idx in closest_indices if idx not in important_indices])
            all_indices = sorted(list(important_indices | cluster_indices))

            logging.info(f"从文档中选择了{len(all_indices)}个重要文本块")
            return [chunks[idx] for idx in all_indices]

        except Exception as e:
            logging.error(f"提取重要文本块时出错: {str(e)}")
            raise


    def _split_text(self, text, chunk_size=128):
        """将文本转换为 tokens，按 token 数量进行切分，然后将每个 token 切分块重新转换为汉字返回。

        Args:
            text: 输入的完整文本字符串
            chunk_size: 每个块的最大 token 数量

        Returns:
            分块后的文本列表，每个元素为一个字符串
        """
        # 将文本编码为 tokens
        token_list = ds_token.encode(text)
        # 按照指定的 chunk_size 将 token 切分
        chunks_tokens = [token_list[i:i + chunk_size] for i in range(0, len(token_list), chunk_size)]
        # 将每个 token 切分块重新解码为汉字并返回
        chunks = [ds_token.decode(tokens) for tokens in chunks_tokens]
        return chunks


    def _kmeans_clustering(self, embeddings, n_clusters=16, random_state=0):
        """使用KMeans对文本块进行聚类，找到距离簇心最近的样本

        Args:
            embeddings: 文本块的向量表示
            n_clusters: 聚类数量
            random_state: 随机种子

        Returns:
            list: 最接近聚类中心的样本索引列表
        """
        # 使用 KMeans 聚类算法
        kmeans = KMeans(n_clusters=n_clusters, random_state=random_state).fit(embeddings)
        labels = kmeans.predict(embeddings)
        cluster_centers = kmeans.cluster_centers_

        # 为每个聚类找到最接近中心的样本
        closest_indices = []
        for cluster_idx in range(n_clusters):
            # 获取当前聚类的所有样本索引
            cluster_indices = [i for i, label in enumerate(labels) if label == cluster_idx]
            # 计算当前聚类所有样本与聚类中心的相似度
            similarities = cosine_similarity([cluster_centers[cluster_idx]], [embeddings[i] for i in cluster_indices])[0]
            # 获取最相似样本的索引
            closest_indices.append(cluster_indices[similarities.argmax()])

        return closest_indices


In [43]:
# audio模型多轮上下文测试
print("\n========audio模型上下文测试=========")
@simple(llm_client=DEFAULT_AUDIO_LLM,model="qwen-audio-turbo-latest")
def describe_image(image_url: str) -> str:
    """你是一个音频分析员"""
    return ["请描述音频内容", ("audio", image_url)]


print("\n流式调用:")
for chunk in describe_image.stream("https://dashscope.oss-cn-beijing.aliyuncs.com/audios/welcome.mp3"):
    print(chunk, end="", flush=True)





流式调用:
音频内容为：“欢迎使用阿里云”。

In [44]:
# 视觉模型多轮上下文测试
print("\n========视觉模型上下文测试=========")
@structured(llm_client=DEFAULT_VISION_LLM,model="qwen-vl-plus")
def describe_image(image_url: str) -> str:
    """你是一个图像分析员"""
    return ["请描述图片内容", ("image", image_url)]




for chunk in describe_image.stream("https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg"):
    print(chunk, end="", flush=True)





这张
这张照片
照片展示
展示了一位女士和
了一位女士和一只狗在海滩
一只狗在海滩上互动的场景
上互动的场景。阳光从画面
。阳光从画面右侧斜射过来
右侧斜射过来，给整个场景
，给整个场景增添了一份温暖的感觉
增添了一份温暖的感觉。

-
。

- 女士坐在沙滩
 女士坐在沙滩上，穿着格
上，穿着格子衬衫、黑色
子衬衫、黑色裤子，并且戴着
裤子，并且戴着一块手表。
-
一块手表。
- 狗是一
 狗是一只金毛犬
只金毛犬（可能是拉布拉
（可能是拉布拉多），它正
多），它正伸出前爪与
伸出前爪与女人的手相碰
女人的手相碰或握手的动作，在
或握手的动作，在这个过程中看起来非常
这个过程中看起来非常开心和平静地
开心和平静地享受着这一刻。
享受着这一刻。
  
背景是平静

  
背景是平静的大海以及渐
的大海以及渐变色天空的颜色
变色天空的颜色变化，给人一种宁静
变化，给人一种宁静而美好的感觉。
而美好的感觉。整体氛围轻松愉悦
整体氛围轻松愉悦, 似乎是在
, 似乎是在一个晴朗的日子
一个晴朗的日子度过美好时光的好
度过美好时光的好地方！
地方！

In [49]:
# 视觉模型多轮上下文测试
print("\n========视觉模型上下文测试=========")
@structured(llm_client=DEFAULT_VIDEO_LLM,model="qwen-vl-max-latest")
def describe_video(video_url: str) -> str:
    """你是一个视频分析员"""
    return [
        ("video", video_url),#支持base64编码
        f"""
          你是一位视频理解与信息结构化专家，请分析以下视频内容，并根据视频实际情况提取摘要信息，输出标准 JSON 格式，并根据类型完成以下任务：

          1.为视频取一个标题，标题简短概述视频主题
          2.描述视频内容，按照视频的时间顺序，完整的讲述视频中的内容
          3.确定视频中出现的对象，按照时间顺序依次输出
          4.视频发生的场景
          5.视频中有文字就要尽可能提取出结构化的 Markdown 内容
            视频中没文字可以不输出Markdown内容


          请严格输出以下 JSON 格式内容，不要添加解释说明：
              {{

                "title": string,
                "summary": string,
                "objects": string[],
                "scene": string,
                "markdown_content": string | null
              }}

          示例 1：
              {{

                "title": "两人进行篮球单挑",
                "summary": "视频描述了两个运动员进行篮球单挑的场面，在一个回合中，球员A在进攻，球员B在防守，球员A在球员B的防守下投进了三分球",
                "objects": ["球员A", "球员B","篮球"],
                "scene": "篮球场",
                "markdown_content": "## 比赛信息\n\n- **球员A**：球衣号码45\n- **球员B**：球衣号码23\n- 比分：46：50\n- 场地：新光篮球场\n- 篮球品牌：Nike"
              }}


              请开始处理视频，现在，不要给出任何解释性文本，请直接输出结果,不要加任何其他的字样如json:
              （视频中没有文字严格不生成"markdown_content",视频中有文字就一定要生成"markdown_content"）
              ：

          """
    ]




result=describe_video("https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241115/cqqkru/1.mp4")
print(result)




{'title': '微笑女孩的户外瞬间', 'summary': '视频展示了一位短发女孩在户外微笑的画面。她穿着粉色毛衣和白色内搭，佩戴着简单的项链。女孩面带微笑，眼神明亮，表情自然愉悦，时而微微点头，展现出轻松愉快的心情。', 'objects': ['女孩', '粉色毛衣', '白色内搭', '项链'], 'scene': '户外建筑前', 'markdown_content': None}


In [None]:
# 视觉模型多轮上下文测试
print("\n========视觉模型上下文测试=========")
@structured(llm_client=DEFAULT_VIDEO_LLM,model="qwen-vl-plus")
def describe_video(image_url: str) -> str:
    """你是一个视频分析员"""
    return ["请描述图片内容", ("imageimage", image_url)]




result=describe_video("https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg")
print(result)




--------------------------------------------------------------------------------



In [None]:
# 视觉模型多轮上下文测试
print("\n========视觉模型上下文测试=========")
@structured(llm_client=DEFAULT_VISION_LLM,model="qwen-vl-plus")
def analyse_image(image_url: str) -> str:
    """你是一个图像分析员"""
    return [
        ("image", image_url),#支持base64编码
        f"""
          你是一位图像理解与信息结构化专家，请分析以下图像内容，并根据图像实际情况提取摘要信息，输出标准 JSON 格式。你需要自动判断图像类型，并根据类型完成以下任务：
          1. 判断图像类型（如：“风景照”、“证件照”、“图表”、“文本图”、“插画”、“截图”、“实物照片”等）
          2.为图像取一个标题
          3，维图像生成一个描述摘要
          4,描述图像中主体对象
          5，场景描述
          6，图片中有文字就要尽可能提取出结构化的 Markdown 内容
            图片中没文字可以不输出Markdown内容


          请严格输出以下 JSON 格式内容，不要添加解释说明：
              {{
                "image_type": string,
                "title": string,
                "summary": string,
                "objects": string[],
                "scene": string,
                "markdown_content": string | null
              }}

          示例 1：文本图（产品介绍页面截图）
              {{
                "image_type": "文本图",
                "title": "AI产品功能介绍",
                "summary": "该图为某AI平台产品功能页截图，包含智能问答、多模态数据处理、API 接入功能模块介绍及操作说明，内容结构清晰。",
                "objects": ["标题栏", "图标", "文本段落", "功能区块"],
                "scene": "产品介绍页面截图",
                "markdown_content": "## 产品功能一览\n\n- 智能问答系统\n- 多模态数据处理\n- API 接入支持\n\n> 联系我们获取更多功能详情。"
              }}
              示例 2：自然图（风景照片）
              {{
                "image_type": "风景照",
                "title": "雪山湖泊倒影图",
                "summary": "图像展示一座雪山在湖面中的倒影，天空晴朗，色彩冷静宁静，构图对称。",
                "objects": ["雪山", "湖泊", "天空", "倒影"],
                "scene": "自然风光摄影",
                "markdown_content": null
              }}

              请开始处理图像，现在，不要给出任何解释性文本，请直接输出结果,不要加任何其他的字样如json:
              （图像中没有文字严格不生成"markdown_content",图像中有文字就一定要生成"markdown_content"）
              ：

          """
    ]



result=analyse_image("https://dashscope.oss-cn-beijing.aliyuncs.com/images/dog_and_girl.jpeg")
print(result)




{'image_type': '实物照片', 'title': '海滩上的友好互动', 'summary': '这张图片展示了一个人和一只狗在沙滩上愉快地玩耍的温馨时刻，在夕阳下显得格外温暖和谐。', 'objects': ['女人', '金毛犬', '沙子', '海洋波浪'], 'scene': '海边休闲时光', 'markdown_content': ''}


In [None]:
code_txt='''import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# 模拟供应链数据
data = {
    'date': pd.date_range(start='2023-01-01', periods=100, freq='D'),
    'raw_material_price': np.random.normal(100, 5, 100),
    'demand': np.random.normal(200, 20, 100),
    'transport_time': np.random.normal(2, 0.5, 100)  # 天
}

df = pd.DataFrame(data)
df['predicted_demand'] = df['demand'] + np.random.normal(0, 5, 100)

# 构建一个简单的预测模型
X = df[['raw_material_price', 'transport_time']]
y = df['predicted_demand']
model = RandomForestRegressor(n_estimators=100)
model.fit(X, y)

# 预测未来30天的需求
future_dates = pd.date_range(start=df['date'].max() + timedelta(days=1), periods=30, freq='D')
future_data = pd.DataFrame({
    'date': future_dates,
    'raw_material_price': np.random.normal(100, 5, 30),
    'transport_time': np.random.normal(2, 0.5, 30)
})

future_data['predicted_demand'] = model.predict(future_data[['raw_material_price', 'transport_time']])

# 可视化
plt.figure(figsize=(10, 6))
plt.plot(df['date'], df['predicted_demand'], label='历史预测需求')
plt.plot(future_data['date'], future_data['predicted_demand'], label='未来预测需求', linestyle='--')
plt.xlabel('日期')
plt.ylabel('需求量')
plt.title('供应链需求预测')
plt.legend()
plt.grid(True)
plt.show()
"'''

In [None]:
@structured(llm_client=DEFAULT_TEXT_LLM,model="deepseek-chat")
def analyse_code(code_txt) -> str:
    """你是一位代码分析专家"""
    return f"""
    请根据提供的代码文本，生成一个合适的标题、摘要以及代码中的关键变量。适用于后续检索和快速理解代码内容。

    标题 (title)：应包含代码语言，并简要概括代码的主要功能或用途，例如 "Python - 图像处理脚本" 或 "JavaScript - REST API 服务器"。
    摘要 (summary)：概述代码的核心逻辑、关键功能模块和主要实现方法，避免逐行解释，但应涵盖整体结构和主要算法思路。
    关键变量 (keywords)：提取代码中最重要的变量、至多 10 个，包括但不限于导入变量、类名、函数名称，核心参数配置、核心逻辑变量等。

    请按照 JSON 格式 输出，确保格式规范，示例如下：
    {{
      "title": "代码语言 - 数据分析工具",
      "summary": "该 Python 脚本用于处理和分析 CSV 数据集，使用 Pandas 进行数据清洗，并利用 Matplotlib 绘制可视化图表。主要功能包括数据加载、缺失值处理、统计分析和趋势绘制。",
      "keywords": ["dataframe", "csv_file", "analyze_data", "plot_trends"]
    }}

    输入代码如下：
    {code_txt}
    现在，不要给出任何解释性文本，请直接输出结果（注意不要输出json这个词）:
    """


print(analyse_code(code_txt))

{'title': 'Python - 供应链需求预测分析', 'summary': '该 Python 脚本使用随机森林回归模型对供应链需求进行预测分析。首先生成模拟供应链数据（包括原材料价格、运输时间和需求），然后训练随机森林模型，预测未来30天的需求，并使用Matplotlib可视化历史预测和未来预测结果。', 'keywords': ['RandomForestRegressor', 'pd.DataFrame', 'future_dates', 'predicted_demand', 'raw_material_price', 'transport_time', 'model.fit', 'model.predict', 'plt.plot', 'plt.show']}


In [56]:
content='''
随着人类社会夜间活动的增加，各类光源在夜间环境中的广泛应用，造成的问题便是对夜间图像造成了新的干扰，耀斑。这一现象的原因源于镜头光学系统的物理结构特性。镜头通常由多个透镜元件组成，光线在穿过这些透镜时，尽管大部分能够直接透过，但仍有部分光线在透镜表面发生反射。这些反射光在镜头内部多次反射或散射后，最终到达图像传感器，形成光斑或辉光，尤其是当透镜元件数量增加时，这种内部反射和扩散效应会更加显著，从而导致更为明显的眩光现象。尽管现代光学镜头设计已通过多种技术手段在一定程度上缓解了辉光的影响，然而在实际拍摄场景中，镜头表面的灰尘、油渍、雨点、微小划痕，以及空气中的水雾或悬浮颗粒等，都会显著增强光线在镜头表面的散射与反射，进而加剧辉光现象的出现。
特别是对于长期暴露在户外环境中的车载摄像头和监控摄像头，以及在日常使用中频繁受到磨损的手机摄像头，外界环境的影响往往难以避免。随着车载系统和监控设备广泛应用于自动驾驶和智能安防等领域，如何在成像后处理环节有效地抑制辉光现象，成为解决这一问题的更为实际和具有针对性的方法。
大多数夜间图像增强算法以提高图像暗部区域的细节为核心目标。这些方法即使提升了图像的亮度，但经过我们的测试这种亮度提升会造成耀斑现象的加剧，导致更严重的图像破坏，在如今复杂的夜间图像场景中，去除耀斑的任务更加重要。
目前去耀斑任务的难点一直在于缺少相关数据集，采集配对的耀斑图像和无耀斑图像是一件非常困难的事情，至今仍然没有真实收集的能够达到训练规模的数据集。一些人也发现了这个问题，whu等人，sun等人基于太阳光的特点生成了耀斑模拟，然而这些方法在与夜间复杂情况下的耀斑状况差别较大，对真实耀斑的处理效果并不好。Yuekun Da 等人先后提出了Flare7K和Flare7K++数据集，这是第一个针对夜间辉光生成的数据集，采用了对辉光完全手工合成，再叠加到真实图像上的想法，是目前与真实耀斑效较为接近的一种方法。然而其手工合成的辉光叠加到图像上往往效果不够自然，其割裂感往往成为一种较为学习的特征。这也就让其训练出的模型在真实图像中，面对柔和的夜间辉光时，往往起不到效果。
我们提出了一种创新性的模拟耀斑的方法，让耀斑和真实世界中一样，由光源而发，先确定图像中的真实光源，在基于光源进行辉光模拟，而不是预先设定好耀斑的形式。耀斑的特性很大程度上应该由光源本身决定，于是我们首先对图像中光源的特性进行提取，光源的形状，亮度，颜色等。由这些因素限定耀斑的特性。这样生成的耀斑自然程度会比把耀斑直接添加到背景图像上高很多，更加符真实耀斑图像的特征学习难度。为了保证模拟的耀斑和真实的耀斑有着相同的物理规律，特性。我们采集了大概三百张的真实耀斑图像总结出了真实的可进行仿真的规律。为了充分模拟夜间耀斑的复杂多样性，基于这些规律我们不限定具体的值，只给定合理的范围，在合理的范围内随机设定各类参数。同时我们设计的，查找图像中可能光源的函数，允许一张图像中同时存在多个候选光源。我们会为每个光源分别生成辉光，这些操做可以保证我们数据集可以根据一定数量的夜间背景图生成大量的配对图像。
辉光具有较强的局部性，我们发现Flare7K的有监督方法中部分图像处理效果不好的原因在于模型无法正确感知到辉光的位置。基于此我们想通过一个简单的阈值分割任务来引导模型学习辉光的位置，这种做法强化了模型学习耀斑位置的能力。我们设计了一全新的模型架构，采用了双路径解码器，每个路径的解码器对应着学习mask，和学习去除辉光的任务。由学习mask的结果以及特征图引导去耀斑学习任务。我们的数据集以及Flare7K等合成数据集可以较为轻易的获取到耀斑区域的mask，可以用于监督我们的mask部分的效果。我们提出了mask图引导的跳跃链接新方法，为此我们设计了一个特征激活融合模块。实验结果表明我们的方法对很多困难的辉光图有着较好的处理效果。
本研究的主要贡献有四个方面：
1.首先我们提出了首个从图像中定位光源，获取光源信息，生成辉光的方法。我们的方法适用于绝大部分场景下耀斑的模拟生成。生成的耀斑有很高的视觉真实性，符合真实耀斑的物理规律。同时支持单张图像中的的多光源分别生成。
2.第一个完全基于夜间场景的大规模高质量耀斑配对数据集。我们收集了五百张真实有干净光源的夜间图像，这些图像作为原图，基于这些图像上的光源我们生成了耀斑模拟，我们对模拟的参数只设定上下界，来保证其处于在一个合理的范围区间，在这个区间内随机生成参数。通过组合可以实现单个光源生成大量特征有明显区别且合理的耀斑，结合单张图像多光源生成。我们基于这五百张原图生成了16000张高质量张配对辉光数据集。
3.我们设计了一个针对于耀斑等这类局部图像损坏的修复模型。由简单的阈值分割任务进行引导，加强位置信息，提升去耀斑这类复杂任务的学习效果。创新性的网络架构设计可以保证增加较少参数的情况下完成阈值分割的学习和引导任务。
4.我们改进了传统跳跃连接容易产生的信息冗余和信息失配的问题，我们采用了mask特征图引导连接的思想。为此设计了一特征激活融合模块，这个模块可以广泛应用于各类特征融合的任务。
镜头光晕数据集。收集大规模的成对光晕污染图像和无光晕图像数据集依靠拍摄是非常难以实现的。为了解决这个问题，wu等人提出了第一个合成光晕数据集，包含2001张拍摄的光晕图和3000张模拟光晕图。为合成耀斑的思路做了很大的贡献，但其耀斑图像均来自于统一镜头，模拟的光晕图整体都是白光图，形式非常统一，与夜间的真实复杂情况差别较大。用其训练之后的模型对夜间耀斑的去除效果较差。
yudakan等人基于 Adobe After Effects 中的插件手动合成了7000张耀斑图像，耀斑的形式非常丰富，且视觉真实性很高，其对耀斑成分的分析为耀斑的模拟提供了很大的指导。但其存在的问题是这种预设好的耀斑图直接添加到背景图像上会存在一些问题。首先是与背景的割裂感，耀斑图像无法根据光源的情况去选择，耀斑的特性和光源特性往往差别教导，这也就导致添加到背景图像上时会与原图有很强的割裂感，而真实图像中辉光往往和背景图之间是平滑的过度，这种割裂感特征往往会干扰模型的训练，削弱了模型在处理真实辉光的判断和处理能力。其次便是其叠加的方式会导致图像上的光源本身的信息被遮挡，往往真实场景中，光源附近的信息是我们需要的，这种遮挡会导致处理后的图像存在伪影，其在Flare7K++中有所改善处理，但其通过裁剪耀斑中亮块作为光源信息的方法和真实场景中有一定差距，导致其结果还是存在光源周围信息丢失的问题。其采用的背景图像是常规图像，这些图像整体较亮，会导致模拟后的辉光中离中心较远处亮度较暗的部分没有办法很好体现，这也就导致其结果对夜间图像中亮度不是很高的光雾的处理效果较差。

Eino-Vi
lle Talvala [15]等人较早对薄雾眩光进行了量化分析，并通过使用高频遮挡掩膜多次捕获图像，以估计和去除眩光。尽管此方法在薄雾眩光的处理上具有创新性，但其局限性在于对复杂场景中的强眩光效果有限。Wanyu Wu [16]等人设计了一种光源感知的盲去卷积模型（LBDN），结合光源空间位置掩膜和基于APSF（大气点扩散函数）的先验信息，用于估计多重散射图，但其对图像无辉光部分的破坏较大，会导致处理后的图像整体偏暗。Yeying Jin [18] 等人构建了一个未配对的夜间光晕和干净光源数据集，并提出了一种结合层分离网络与CycleGAN的双层设计。然而，其层分离网络在处理大范围光晕时效果不理想，且CycleGAN在训练过程中表现出较大的不稳定性，容易产生伪影，同时可能导致光源本身被遮挡或削弱的问题。Sharma 和 Tan [17]提出了一种基于相机响应函数（CRF）估计、频域分解和HDR成像的光效应抑制方法。然而，该方法在处理无色白光和强光晕时效果不佳，容易出现细节丢失和抑制不足的问题。Wu [19]等人提出了第一个针对辉光的数据集，生成了配对的有辉光和干净的图像，基于U-Net的pix2pix模型在其数据集上训练了一个去除眩光的网络。其训练的模型对光源的要求较高，很容易出现无法识别光源并无法消除辉光的问题，并且在夜间场景下效果不是很好。Yuekun Da[20,21]等手工合成辉光，并将其合成的辉光通过变形然后添加到背景图片上的方法制作了一个合成数据集，训练了一个端到端的辉光去除框架，其问题是在数据集处提到的对光源周围信息的抑制和对光雾较弱的处理能力。
//辉光介绍

近年来，许多研究者在根据光源模拟光源周围辉光效应时，采用了基于成像原理的方法，利用与光雾模拟类似的技术，使用（APSF, Axially Symmetric Point Spread Function）来描述理想点光源经过镜头系统后在图像平面上形成的光强分布。然而，我们在实验中发现，基于简单APSF生成的辉光与真实图像中的辉光效果之间存在明显的视觉差距,难以直接用于生成符合实际需求的高质量数据集。
为解决上述问题，我们从真实包含辉光的图像中学习辉光特。我们收集了100张真实的辉光图像，这些图像的设计尽量简化了场景复杂性，仅包含光源本身及其周围的辉光效果，同时背景接近全黑。能够有效避免复杂背景对数据分析的干扰，使得后续的分析更加专注于辉光的本质特征。
基于该数据集，我们进一步对真实图像中光源的亮度分布及其随角度和距离变化的衰减特性进行了深入研究。这种数据驱动的分析方法为更真实地重建辉光效应提供了可能，并为生成高质量的模拟数据集奠定了基础。辉光特性分析与模拟方法
	通过对真实辉光图像的研究，我们总结发现辉光效应可从以下两个角度进行整体建模和模拟：
径向亮度衰减（光晕效应，Halo Effect）：
	辉光的第一部分表现为亮度随着距离光源中心的增加而呈现非线性下降。这种现象通常表现为光源周围的光晕效果，且亮度衰减的速率随距离的变化逐渐减缓。
为了得到径向亮度的合理分布函数，我们从每张真实图像中提取辉光强度随光心距离变化的衰减曲线，并对其进行归一化处理以消除绝对亮度差异的影响。通过统计分析大量图像中的衰减曲线，我们提取并总结出了整体的平均衰减趋势，以此为基准进行模拟
点扩散函数（PSF, Point Spread Function）相似的高斯衰减模型，对辉光随距离的亮度变化进行拟合与模拟。然而，与直接使用标准PSF函数不同，我们在模型中并不固定指数函数的衰减速率，而是仅对其在上、下边界进行了约束，同时我们采用了多种函数的混合 / 分段模型，将多类型衰减形函数（指数、高斯、多项式）拼接或加权叠加，并加入随机噪声的起伏以模拟真实光学衰减中更加复杂的效果；也就是说，我们先通过对大量图像的统计分析得到了一个合理的衰减速率范围，然后在此范围内随机选择衰减函数的类型以及不同的衰减速率，噪声形式。这样做的好处在于。符合整体衰减规律：通过分析大量真实光源辉光图像，已获取了一个真实可信的衰减速率范围，能够保证生成的辉光模拟数据拥有与实际物理过程相一致的整体趋势。增强生成数据的随机性与多样性：在每次生成辉光数据时，都可以在预先设定的上下界之间随机地选取衰减速率，从而得到不同梯度的衰减曲线。避免了一刀切的固定衰减参数，使生成的数据分布更加丰富，更有助于增强训练模型时间的泛化性环向亮度起伏（条纹效应，Streak Effect）：
	辉光的第二特性是沿环向的亮度起伏，通常表现为线状的耀斑或条纹（streaks），Streak（条形耀斑）是辉光的一种重要组成部分，具有与光晕相似的径向亮度衰减特性，但同时表现出显著的环向分布特性和亮度边界。我们同样对收集到的辉光图像进行定量分析，总结了Streak的以下特性：环向分布特性：
Streak通常出现在特定角度，呈现条状分布，可分为以下两种典型形态：宽条状： 宽度较大，亮度高于周围光晕，透明度较高，衰减较为明显。细线状： 宽度较小，亮度非常高，透明度较低，衰减不显著。周围光晕的“缺口效应”：Streak的出现通常伴随着两侧光晕亮度的下降，形成类似“缺口”的区域。
基于上述特性我们分别设计模拟两种streaks的形态，每次随机角度生成，引入径向噪声：为增强光线的丝状的真实感，我们在径向方向上加入丝状噪声。在Streak两侧光晕区域内引入亮度抑制函数，通过降低光晕亮度实现“缺口效应”的模拟。抑制程度与Streak的宽度和亮度相关联，以确保模拟效果更加逼真。我们使用收集的的五百张真实携带夜间干净光源的图像作为原图。用上面的真实光源模拟辉光生成。我们采用了一种基于动态阈值的光源检测方法，通过设定不同的亮度阈值，精确地检测图像中的光源区域。为了获取到完整的光源区域，应用形态学操作对生成的掩膜（mask）进行处理，以确保光源形状的完整性和连续性。随后，我们对检测到的光源区域进行颜色特征提取。研究表明，相较于光源周围的光晕区域，光源核心区域的颜色饱和度通常更低。基于这一统计规律，我们设计了一种颜色饱和度自适应调整算法，以确保最终生成的光晕颜色更贴近真实场景。为提高数据集的多样性，我们从每幅图像中检测出的多个符合条件的光源中随机选择若干个用于光晕模拟。这种随机化策略有效增加了数据样本的多样性，进一步增强了模型的泛化能力。
细节优化：在实现整体光晕与光条（streak）模拟效果的基础上，我们进一步引入了多种细节优化，使生成结果更加贴近真实光晕特性。具体而言，我们在光晕中添加了径向的丝状噪声，从而增强了光晕的光线细节表现，并通过结合细微光线的叠加效果，成功模拟了真实辉光中的闪烁（shimmer）现象。此外，为模拟真实光晕中普遍存在的色散效应，我们设计了色散模拟机制，特别关注边缘区域更为显著的色散特性，从而更好地还原真实场景。
为了提高生成光晕的颜色表现力，我们引入了随机的小幅度颜色调整，增强辉光效果的真实性和多样性。
针对辉光可能遮挡光源的问题，我们基于光源检测得到的掩膜（mask）对辉光在光源处的亮度进行了抑制。这种处理方式不仅符合真实场景中辉光不会明显遮挡光源的现象，还使得模型在学习辉光去除任务时能够更多地保留光源区域的信息，从而有效缓解光源处理后可能出现的过暗和细节丢失问题。我们使用收集的的五百张真实携带夜间干净光源的图像作为原图。用上面的真实光源模拟辉光生成。我们采用了一种基于动态阈值的光源检测方法，通过设定不同的亮度阈值，精确地检测图像中的光源区域。为了获取到完整的光源区域，应用形态学操作对生成的掩膜（mask）进行处理，以确保光源形状的完整性和连续性。随后，我们对检测到的光源区域进行颜色特征提取。研究表明，相较于光源周围的光晕区域，光源核心区域的颜色饱和度通常更低。基于这一统计规律，我们设计了一种颜色饱和度自适应调整算法，以确保最终生成的光晕颜色更贴近真实场景。为提高数据集的多样性，我们从每幅图像中检测出的多个符合条件的光源中随机选择若干个用于光晕模拟。这种随机化策略有效增加了数据样本的多样性，进一步增强了模型的泛化能力。
细节优化：在实现整体光晕与光条（streak）模拟效果的基础上，我们进一步引入了多种细节优化，使生成结果更加贴近真实光晕特性。具体而言，我们在光晕中添加了径向的丝状噪声，从而增强了光晕的光线细节表现，并通过结合细微光线的叠加效果，成功模拟了真实辉光中的闪烁（shimmer）现象。此外，为模拟真实光晕中普遍存在的色散效应，我们设计了色散模拟机制，特别关注边缘区域更为显著的色散特性，从而更好地还原真实场景。
为了提高生成光晕的颜色表现力，我们引入了随机的小幅度颜色调整，增强辉光效果的真实性和多样性。
辉光与其他类型的图像破坏具有独特的局部特征，因此，模型在学习去除辉光时，如何将注意力集中在辉光区域，对于提升修复效果至关重要。许多先进的图像修复方法（例如Flare7K++）采用了基于Transformer的注意力机制，旨在通过捕捉图像的全局信息来改进修复效果。然而，我们在实验中发现，这种全局注意力机制未能有效地引导模型将注意力集中在辉光区域。辉光区域的特征通常具有较高的局部集中性，而全局注意力机制却倾向于平滑处理图像的整体结构，难以针对局部细节进行精准的处理。
在此基础上，我们设计了一个全新的图像修复架构——SharedEncoder-mask2GlowNet。该方法旨在解决传统图像修复模型在处理局部破损时缺乏对破损区域精确关注的问题。该方法的核心思想是通过简单任务的学习来引导复杂任务。我们采用了先对辉光图像进行阈值分割，学习辉光区域的mask，由mask各层特征图指导辉光去除任务。
为此，我们在经典的编码器-解码器结构基础上，提出了一种创新性的网络设计，旨在通过共享编码器和双路径解码器的设计。我们保留了常规Transformer架构中的编码器部分，并对解码器进行了创新性的设计，提出了两个并行链路以应对不同的任务需求。其中，第一个链路为主路径，专注于学习辉光去除的过程，旨在通过逐步恢复图像中的结构信息，从而实现辉光的有效去除。第二个链路为辅助路径，专门用于学习辉光区域的掩码生成，目的是准确地识别和定位图像中的辉光区域，为主路径提供有价值的位置信息。
在图像处理流程中，信息流通过编码器后，首先进入辅助路径，该路径的任务是学习辉光区域的掩码。为了更好地提升信息流的表达能力，我们设计了一个特征激活增强模块。通过该模块，辅助路径中提取到的特征图会被传递至主路径进行进一步处理。我们对传统的跳跃连接（Skip Connections）进行了创新性改进，以便更加高效地融合辅助路径和主路径之间的信息。具体来说，辅助路径生成的掩码特征图包含了辉光区域的位置信息，并且该掩码图与主路径每一层的特征图保持相同的尺寸，从而确保位置信息可以精确地映射到主路径的特征图上。在信息流的传递过程中，主路径的每一层特征图、辅助路径的特征图以及跳跃连接的特征图都作为输入，传递至我们设计的特征激活增强模块。通过这一模块，掩码特征图能够帮助主路径特征图和跳跃连接的特征图实现选择性的信息融合和增强。经过融合后的信息流将继续作为输出传递至主路径下一层的解码器模块。这一过程会在每一层依次进行，确保信息能够在网络中流动并充分利用不同路径的信息，实现了由学习mask这个简单任务做引导，提供辉光区域的信息和有关特征来强化主路径去辉光的学习。这种创新的设计有效提升了信息流的表达能力，尤其是在处理复杂的辉光区域时，能够更好地捕捉和增强细节。
此外，在辅助路径的设计中，我们仅采用较少的注意力头（Attention Heads），以进一步降低计算开销。结合与编码器部分的共享结构，这种设计有效地控制了整体模型参数的增长，从而在保证掩码生成准确度的同时，实现了更高的计算效率与资源利用率。
针对辉光可能遮挡光源的问题，我们基于光源检测得到的掩膜（mask）对辉光在光源处的亮度进行了抑制。这种处理方式不仅符合真实场景中辉光不会明显遮挡光源的现象，还使得模型在学习辉光去除任务时能够更多地保留光源区域的信息，从而有效缓解光源处理后可能出现的过暗和细节丢失问题。
我们提出了一种特征融合增强模块，其核心思想是利用辅助路径（auxiliary path）学习得到的掩膜（mask）特征图，引导主路径（main path）与跳跃连接（skip connection）的特征融合过程。
掩膜特征图仅表征了特定区域与背景之间的粗粒度区分，缺乏对空间位置信息以及通道重要性的有效建模。为了让其在融合过程提供足够的引导信息，我们设计了一个掩膜特征图的增强模块。首先，我们对掩膜特征图进行位置编码，采用多频傅里叶特征嵌入的策略，区别于传统的单一sin(x)/cos(x)或者Transformer中固定频率的位置编码方式，我们的模块显式地采用指数增长的频率序列（如2^k）对二维归一化坐标进行多尺度的正余弦调制。这种多尺度的傅里叶特征嵌入能够有效捕捉从高频到低频的周期性空间结构特征，实现更全面、连续的空间位置编码，引入空间位置感知能力。接着我们引入一个线性映射层，将高维的傅里叶嵌入映射到指定的维度，作为位置编码后的特征，将其与原始掩膜特征图进行拼接，这样的操做增强了mask特征的空间特征表达。最后采用1×1卷积层代替全连接层，加强对于复杂非线性关系的表达能力，生成通道注意与原始掩膜特征图进行逐通道相乘，从而实现特征图通道维度上的动态调整，

在特征引导融合过程中，我们首先对输入特征进行了卷积预处理，以提取局部空间特征表示。随后，通过逐元素乘法计算输入特征图（记为x和z）与增强后的掩膜特征图y之间的空间关联性，再通过Sigmoid函数生成范围为[0,1]的空间注意力图。通过这种掩膜引导的注意力机制，我们显式地强调了特征图x和z中与掩膜特征图y高度相关的空间区域。最终，经注意力加权增强后的特征图x与z在进行拼接，从而进一步丰富融合后的特征表示，以提升网络对目标区域的辨别能力。


'''

In [57]:
chunk_extractor = DocumentChunkExtractor()
txt = chunk_extractor.extract_important_chunks(doc_text=content, max_chunks=10,chunk_size=256)
print(txt)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.



modules.json:   0%|          | 0.00/141 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/214 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/149k [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/434M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.56M [00:00<?, ?B/s]

['随着人类社会夜间活动的增加，各类光源在夜间环境中的广泛应用，造成的问题便是对夜间图像造成了新的干扰，耀斑。这一现象的原因源于镜头光学系统的物理结构特性。镜头通常由多个透镜元件组成，光线在穿过这些透镜时，尽管大部分能够直接透过，但仍有部分光线在透镜表面发生反射。这些反射光在镜头内部多次反射或散射后，最终到达图像传感器，形成光斑或辉光，尤其是当透镜元件数量增加时，这种内部反射和扩散效应会更加显著，从而导致更为明显的眩光现象。尽管现代光学镜头设计已通过多种技术手段在一定程度上缓解了辉光的影响，然而在实际拍摄场景中，镜头表面的灰尘、油渍、雨点、微小划痕，以及空气中的水雾或悬浮颗粒等，都会显著增强光线在镜头表面的散射与反射，进而加剧辉光现象的出现。\n特别是对于长期暴露在户外环境中的车载摄像头和监控摄像头，以及在日常使用中频繁受到磨损的手机摄像头，外界环境的影响往往难以避免。随着车载系统和监控设备广泛应用于自动驾驶和智能安防等领域，如何在成像后处理环节有效地抑制辉光现象，成为解决这一问题的更为实际和具有针对性的方法。\n大多数夜间图像增强算法以提高图像', '暗部区域的细节为核心目标。这些方法即使提升了图像的亮度，但经过我们的测试这种亮度提升会造成耀斑现象的加剧，导致更严重的图像破坏，在如今复杂的夜间图像场景中，去除耀斑的任务更加重要。\n目前去耀斑任务的难点一直在于缺少相关数据集，采集配对的耀斑图像和无耀斑图像是一件非常困难的事情，至今仍然没有真实收集的能够达到训练规模的数据集。一些人也发现了这个问题，whu等人，sun等人基于太阳光的特点生成了耀斑模拟，然而这些方法在与夜间复杂情况下的耀斑状况差别较大，对真实耀斑的处理效果并不好。Yuekun Da 等人先后提出了Flare7K和Flare7K++数据集，这是第一个针对夜间辉光生成的数据集，采用了对辉光完全手工合成，再叠加到真实图像上的想法，是目前与真实耀斑效较为接近的一种方法。然而其手工合成的辉光叠加到图像上往往效果不够自然，其割裂感往往成为一种较为学习的特征。这也就让其训练出的模型在真实图像中，面对柔和的夜间辉光时，往往起不到效果。\n我们提出了一种创新性的模拟耀斑的方法，让耀斑和真实世界中一样，', '由光源而发，先确定图像中的真实光源，在基于光源进行辉光模拟，而不是预先设定好耀斑的形式。耀斑的特性很大程度上应该

In [58]:
@structured(llm_client=DEFAULT_TEXT_LLM,model="deepseek-chat")
def doc_sketch(txt: str,max_chunks, chunk_size) -> str:

    chunk_extractor = DocumentChunkExtractor()
    txt = chunk_extractor.extract_important_chunks(doc_text=content, max_chunks=10,chunk_size=256)


    return f"""
    请根据以下提供的文档，生成一个合适的标题和一段 400-500 字的摘要。
    标题应简洁明了，能够准确概括文档的核心内容。摘要应完整呈现文档的主要论点、关键信息和结论，要确保一定提到文档中的每个片段，适用于后续检索和快速理解原文内容。
    请按照 JSON 格式输出，确保格式规范且可解析，
    格式如下：{{
      "title":"生成的标题",
      "language":文章语言：用英文简写（en，zh_CN等，根据文章语言判断）
      "summary":生成的摘要}}
    请确保 title 反映文档核心内容，summary 精炼但包括所有段落的内容。
    输入内容如下：{txt}
    现在，不要给出任何解释性文本，请直接输出:
    """
result = doc_sketch(txt=content,max_chunks=10,chunk_size=256)
print(result)

{'title': '夜间图像辉光现象的模拟与去除方法研究', 'language': 'zh_CN', 'summary': '本文探讨了夜间图像中辉光现象的成因及其去除方法。辉光现象主要由镜头光学系统的物理结构特性引起，光线在透镜表面反射和散射后形成光斑或辉光，尤其在透镜元件数量增加时更为明显。尽管现代光学设计已部分缓解此问题，但镜头表面的灰尘、油渍等环境因素仍会加剧辉光现象。车载摄像头、监控摄像头和手机摄像头等长期暴露在户外或频繁磨损的设备尤其受到影响。\n\n目前，夜间图像增强算法虽能提升暗部细节，但会加剧耀斑现象。去耀斑任务的难点在于缺乏真实配对的数据集。现有方法如Flare7K和Flare7K++数据集通过手工合成辉光并叠加到真实图像上，但效果不够自然，导致模型在真实场景中表现不佳。\n\n本文提出了一种创新性的耀斑模拟方法，通过提取图像中光源的形状、亮度、颜色等特性来生成更自然的耀斑。该方法基于300张真实耀斑图像总结的规律，在合理范围内随机设定参数，并允许一张图像中存在多个候选光源，从而生成大量配对图像。此外，通过阈值分割任务引导模型学习辉光位置，提升处理效果。\n\n针对辉光的重要组成部分——条形耀斑（Streak），本文分析了其环向分布特性和亮度边界，设计了两种典型形态的模拟方法，并引入径向噪声和亮度抑制函数以增强真实感。研究还采用动态阈值的光源检测方法和颜色饱和度自适应调整算法，确保光晕颜色贴近真实场景。\n\n在模型设计上，本文提出了一种辅助路径学习辉光区域掩码的方法，通过特征激活增强模块和改进的跳跃连接，实现信息高效融合。此外，引入多频傅里叶特征嵌入策略，增强空间位置感知能力，并通过掩膜引导的注意力机制提升网络对目标区域的辨别能力。这些创新设计显著提升了辉光去除的效果。'}


In [34]:
from http import HTTPStatus
from dashscope.audio.asr import Transcription
import json


import dashscope
dashscope.api_key = "sk-7f9a260343a54674b720a2d5fa772a5d"

transcribe_response = Transcription.async_call(
    model='paraformer-v2',
    file_urls=['https://uy.wzznft.com/i/2025/04/15/p3cj3l.mp3',
          'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav'],
    diarization_enabled=True,
    language_hints=['zh', 'en']  # “language_hints”只支持paraformer-v2模型
)

while True:
    if transcribe_response.output.task_status == 'SUCCEEDED' or transcribe_response.output.task_status == 'FAILED':
        break
    transcribe_response = Transcription.fetch(task=transcribe_response.output.task_id)

if transcribe_response.status_code == HTTPStatus.OK:
    print(json.dumps(transcribe_response.output, indent=4, ensure_ascii=False))
    print('transcription done!')

{
    "task_id": "c2ad5c4d-fd89-487a-bb6b-bc18d3467985",
    "task_status": "SUCCEEDED",
    "submit_time": "2025-04-15 15:41:20.472",
    "scheduled_time": "2025-04-15 15:41:20.491",
    "end_time": "2025-04-15 15:43:55.793",
    "results": [
        {
            "file_url": "https://uy.wzznft.com/i/2025/04/15/p3cj3l.mp3",
            "transcription_url": "https://dashscope-result-bj.oss-cn-beijing.aliyuncs.com/prod/paraformer-v2/20250415/15%3A43/649048e8-667c-4934-9d34-2a73cfb52f53-1.json?Expires=1744789435&OSSAccessKeyId=LTAI5tQZd8AEcZX6KZV4G8qL&Signature=fnspp34i5XEVgnciYp%2FmGhx7BiI%3D",
            "subtask_status": "SUCCEEDED"
        },
        {
            "file_url": "https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav",
            "transcription_url": "https://dashscope-result-bj.oss-cn-beijing.aliyuncs.com/prod/paraformer-v2/20250415/15%3A41/a0bb646f-54c8-4b9b-8d07-a986b0d9b787-1.json?Expires=1744789435&OSSAccessKeyId=LTAI5tQZd8AE

In [63]:
from http import HTTPStatus
from dashscope.audio.asr import Transcription
import json
import requests
import time
# 设置API密钥
import dashscope
dashscope.api_key = "sk-7f9a260343a54674b720a2d5fa772a5d"

def extract_sentences_from_transcription(transcription_content):
    """
    Extract the sentences content from transcription JSON without word details
    """
    try:
        # Extract the sentences from the transcript
        sentences = transcription_content["transcripts"][0]["sentences"]

        # Remove the 'words' field from each sentence
        for sentence in sentences:
            if "words" in sentence:
                del sentence["words"]

        return sentences
    except (KeyError, IndexError) as e:
        print(f"Error extracting sentences: {str(e)}")
        return None

def get_transcription_content(transcription_result):
    """
    Extract and download transcription content from all URLs in the result
    """
    all_sentences = {}

    # Extract all result items that have succeeded
    successful_items = [item for item in transcription_result["results"]
                        if item["subtask_status"] == "SUCCEEDED"]

    for item in successful_items:
        file_url = item["file_url"]
        transcription_url = item["transcription_url"]

        try:
            response = requests.get(transcription_url)
            response.raise_for_status()

            # Parse the JSON content
            transcription_content = response.json()

            # Extract sentences content without words
            sentences_content = extract_sentences_from_transcription(transcription_content)
            if sentences_content:
                all_sentences[file_url] = sentences_content

            print(f"Successfully retrieved transcription for: {file_url}")
        except Exception as e:
            print(f"Error retrieving transcription for {file_url}: {str(e)}")

    return all_sentences

def transcribe_audio(file_urls, language_hints=['zh', 'en']):
    """
    Transcribe audio files and return the extracted sentences content without word details

    Args:
        file_urls: List of URLs of audio files to transcribe
        language_hints: List of language hints for the transcription model

    Returns:
        Dictionary mapping file URLs to their transcribed sentences content
    """
    dashscope.api_key = "sk-7f9a260343a54674b720a2d5fa772a5d"

    # 发起异步转写请求
    transcribe_response = Transcription.async_call(
        model='paraformer-v2',
        file_urls=file_urls,
        language_hints=language_hints,
        diarization_enabled=True
    )

    # 循环检查任务状态，直到完成或失败
    while True:
        if transcribe_response.output.task_status == 'SUCCEEDED' or transcribe_response.output.task_status == 'FAILED':
            break
        transcribe_response = Transcription.fetch(task=transcribe_response.output.task_id)

    # 检查转写是否成功
    if transcribe_response.status_code == HTTPStatus.OK:
        # 获取句子内容（无单词详情）
        sentences_contents = get_transcription_content(transcribe_response.output)
        return sentences_contents
    else:
        print(f"Transcription failed with status code: {transcribe_response.status_code}")
        return {}

# 示例使用方式
if __name__ == "__main__":
    file_urls = [
        'https://geilien.cn/gaokao/2024/mp3/quanguoyijuan.mp3',
        'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav'
    ]
    start_time = time.time()
    results = transcribe_audio(file_urls)
    end_time = time.time()
    time_long=end_time - start_time
    print(time_long)
    #print(results)
    # 显示结果

    # for file_url, sentences in results.items():
    #     file_name = file_url.split('/')[-1]
    #     print(f"\n{file_name} sentences:")
    #     print(json.dumps(sentences, indent=2, ensure_ascii=False))

Successfully retrieved transcription for: https://geilien.cn/gaokao/2024/mp3/quanguoyijuan.mp3
Successfully retrieved transcription for: https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav
178.95487546920776


In [50]:
def format_sentences_by_file(results):
    """
    Convert sentences results to formatted strings grouped by file.

    Args:
        results: Dictionary mapping file URLs to sentences information

    Returns:
        Dictionary mapping file names to their formatted sentence strings
    """
    formatted_strings_by_file = {}

    for file_url, sentences in results.items():
        file_name = file_url.split('/')[-1]
        sentences_strings = []

        for sentence in sentences:
            begin_time_sec = sentence['begin_time'] / 1000  # Convert ms to seconds
            end_time_sec = sentence['end_time'] / 1000      # Convert ms to seconds

            sentence_str = (
                f"Sentence ID: {sentence['sentence_id']}, "
                f"Time: [{begin_time_sec:.2f}s - {end_time_sec:.2f}s], "

                f"Text: \"{sentence['text']}\""
            )

            sentences_strings.append(sentence_str)

        # Join all sentences for this file with newlines
        formatted_strings_by_file[file_name] = "\n".join(sentences_strings)

    return formatted_strings_by_file

# 示例用法
if __name__ == "__main__":
    # 您的结果示例
    # results = {
    #     'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav': [
    #         {'begin_time': 0, 'end_time': 4720, 'text': 'Hello world, 这里是阿里巴巴语音实验室。', 'sentence_id': 1, 'speaker_id': 0}
    #     ],
    #     'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_female2.wav': [
    #         {'begin_time': 100, 'end_time': 3820, 'text': 'Hello word, 这里是阿里巴巴语音实验室。', 'sentence_id': 1, 'speaker_id': 0},
    #         {'begin_time': 4000, 'end_time': 5500, 'text': '欢迎使用我们的服务。', 'sentence_id': 2, 'speaker_id': 0}
    #     ]
    # }
    file_urls = [
        'http://sound2.yywz123.com/english96ad/lesson/tinglimeirlian1/sound/14937facefdcebdba.mp3',
        'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav'
    ]

    results = transcribe_audio(file_urls)

    # 转换为按文件分组的字符串
    formatted_outputs = format_sentences_by_file(results)

    # 打印结果
    for file_name, formatted_text in formatted_outputs.items():
        print(f"=== {file_name} ===")
        print(formatted_text)
        print("\n")


Successfully retrieved transcription for: http://sound2.yywz123.com/english96ad/lesson/tinglimeirlian1/sound/14937facefdcebdba.mp3
Successfully retrieved transcription for: https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav
=== 14937facefdcebdba.mp3 ===
Sentence ID: 1, Time: [2.80s - 6.66s], Text: "Here's a picture we couldn't see star last night. "
Sentence ID: 2, Time: [6.66s - 7.76s], Text: "Yes, was a pity. "
Sentence ID: 3, Time: [7.76s - 9.96s], Text: "I here's was one of us here. "
Sentence ID: 4, Time: [9.96s - 19.96s], Text: "If everyone says it's a really great film, let's see the things you like that horror films and filllers and science fiction. "
Sentence ID: 5, Time: [19.96s - 23.51s], Text: "That's why I want to see Star Wars come back. "
Sentence ID: 6, Time: [23.51s - 26.09s], Text: "You what kind of films do you like? "
Sentence ID: 7, Time: [26.09s - 28.34s], Text: "Historical films and comedy. "
Sentence ID: 8, Time: [28.34s

In [67]:
def format_sentences_by_file(results):
    """
    Extract and concatenate all sentence texts for each file.

    Args:
        results: Dictionary mapping file URLs to sentences information

    Returns:
        Dictionary mapping file names to concatenated text
    """
    text_by_file = {}

    for file_url, sentences in results.items():
        file_name = file_url.split('/')[-1]

        # Extract only the text from each sentence and join them
        all_text = " ".join([sentence['text'] for sentence in sentences])

        # Store the combined text for this file
        text_by_file[file_name] = all_text

    return text_by_file

# 示例用法
if __name__ == "__main__":
    file_urls = [
        'https://geilien.cn/gaokao/2024/mp3/quanguoyijuan.mp3',
        'https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav'
    ]

    results = transcribe_audio(file_urls)

    # 提取并合并每个文件的文本内容
    text_by_file = format_sentences_by_file(results)

    # 打印结果
    for file_name, text in text_by_file.items():
        print(f"=== {file_name} ===")
        print(text)
        print("\n")

Successfully retrieved transcription for: https://geilien.cn/gaokao/2024/mp3/quanguoyijuan.mp3
Successfully retrieved transcription for: https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_male2.wav
=== quanguoyijuan.mp3 ===


=== hello_world_male2.wav ===
Hello world, 这里是阿里巴巴语音实验室。




In [None]:
!pip install moviepy

In [None]:
from moviepy.editor import VideoFileClip

def video_to_audio(input_video_path, output_audio_path):
    """
    将视频文件转换为音频文件

    参数:
        input_video_path (str): 输入视频文件的路径
        output_audio_path (str): 输出音频文件的路径（建议使用.mp3或.wav扩展名）

    返回:
        bool: 转换成功返回True，失败返回False
    """
    try:
        # 加载视频文件
        video_clip = VideoFileClip(input_video_path)

        # 提取音频
        audio_clip = video_clip.audio

        # 将音频保存到指定路径
        audio_clip.write_audiofile(output_audio_path)

        # 关闭资源
        audio_clip.close()
        video_clip.close()

        return True
    except Exception as e:
        print(f"转换过程中出现错误: {str(e)}")
        return False

# 使用示例
if __name__ == "__main__":
    input_path = "1.mp4"  # 输入视频路径
    output_path = "1.mp3"  # 输出音频路径

    success = video_to_audio(input_path, output_path)
    if success:
        print(f"视频已成功转换为音频并保存到: {output_path}")
    else:
        print("视频转换失败。")

  if event.key is 'enter':



MoviePy - Writing audio in 1.mp3




MoviePy - Done.
视频已成功转换为音频并保存到: 1.mp3


In [65]:
text='''Sentence ID: 1, Time: [2.80s - 6.66s], Text: "Here's a picture we couldn't see star last night. "
Sentence ID: 2, Time: [6.66s - 7.76s], Text: "Yes, was a pity. "
Sentence ID: 3, Time: [7.76s - 9.96s], Text: "I here's was one of us here. "
Sentence ID: 4, Time: [9.96s - 19.96s], Text: "If everyone says it's a really great film, let's see the things you like that horror films and filllers and science fiction. "
Sentence ID: 5, Time: [19.96s - 23.51s], Text: "That's why I want to see Star Wars come back. "
Sentence ID: 6, Time: [23.51s - 26.09s], Text: "You what kind of films do you like? "
Sentence ID: 7, Time: [26.09s - 28.34s], Text: "Historical films and comedy. "
Sentence ID: 8, Time: [28.34s - 29.96s], Text: "I see nothing more. "
Sentence ID: 9, Time: [29.96s - 35.12s], Text: "And riding films or I have you seen the music box more several time? "
'''

In [68]:
content='''
这是2024年普通高等学校招生全国统一考试英语科听力部分，该部分分为第一、第二、两节。 注意，回答听力部分时，请先将答案标在试卷上。 听力部分结束前，你将有两分钟的时间将你的答案转图到答题卡上。 现在是听力试音时间。 Hello, international friends club.  Can I help you?  Oh, hello, I read about your club in the paper today, and I thought I'd phto find out a bit more.  Yes, certainly.  Well, we're a sort of social club for people from different countries.  It's quite a new club.  We have about 50 members at the moment, but we're growing all the time.  That sounds interesting.  I'm British, actually, and I came to Washington about three months ago.  I'm looking for ways to meet people.  What kind of events do .  you organize?  Well, we have social togeand, sports events, and we also have language ings.  Could you tell me something about the language evenings?  Yes.  every day except Thursday, we have a language evening.  People can come and practice their languages, you know, over a drink or something.  We have different languages on different evenings.  Monday Spanish, Tuesday Italian, Wednesday German and Friday French.  On Thursday, we usually have a meal in a restaurant for anyone who wants to come.  Well.  that sounds great.  I really need to practice my French.  Okay, well, if you can just give me your name and address, I'll send you the form and some more information.  If you join now, you can have .  the first .  month free试音到此结束。 听力考试正式开始。 请看听力部分，第一节第一节听下面5段对话，每段对话后有一个小题，从题中所给的ABC3个选项中选出最佳选项。 听完每段对话后，你都有10秒钟的时间来回答有关小题和阅读下一小题，每段对话仅读一遍。 例如现在你有5秒钟的时间看试卷上的例题。 你将听到以。 下内容，excuse me.  the shirt is it's 9:15.  你将有5秒钟的时间将正确答案标在试卷上。 衬衫的价格为九磅，15便是，所以你选择C项并将其标在试卷上。 现在你有5秒钟的时间阅读第一小题的有关内容。 Thanks for the wonderful weekend.  Kate.  That's okay.  Bob and I are glad you came to see us.  Oh, I have to go in my flight.  Will take off soon.  Do contact me when you're .  in Sydney.  Sure we will.  Paul, listen to the radio.  It's you've stolen my heart, one of the songs played at .  our wedding.  Yeah, how beautiful.  It has been popular for almost two decades.  David, forget about mark.  His aunt is in town, so he can't go .  with us today.  What a pity.  It's the last day of the art show.  How may I help you?  I bought .  a desk and asked for it to be delivered to my .  house this Friday.  Yes, what's the problem?  I need to have it delivered this Saturday.  Next, please.  Oh.  hi.  I missed my 9:00 train to Bedford.  Do I have to buy another ticket?  No.  The next train leaves at a 9:45 at platform eleven.  Thank you.  第一节到此结束。 第二节听下面5段对话或独白。 每段对话或独白后有几个小题，从题中所给的ABC3个选项中选出最佳选项。 听每段对话或读白前，你将有时间阅读各个小题，每小题5秒钟。 听完后，各小题将给出5秒钟的作答时间，每段对话或独白读两遍。 听下面一段对话，回答第六和第7两个小题。 现在你有10秒钟的时间阅读这两个小题。 Honey, have you checked today's weather .  forecast?  Yes, it's cold and wet.  There is a warning for strong winds up to 100 km per hour.  What are we going to do then?  Nothing much.  Just stay indoors.  There is a risk, falling trees and power lines.  right?  And the low temperatures could bring snow to the forest area.  I hope it's over quickly.  Well, it won't get better until late Wednesday anyway.  I have to move the car away .  from the trees.  Yeah, you cannot not be too careful, honey.  Have you checked today's weather .  forecast?  Yes, it's cold and wet.  There is a warning for strong winds up to 100 km per hour.  What are we going to do then?  Nothing much.  Just stay indoors.  There is a risk falling trees and power lines.  right?  And the low temperatures .  could bring snow to the forest area.  I hope it's over quickly.  Well, it won't get better until late Wednesday anyway.  I have to move the car away .  from the trees.  Yeah, you cannot not be too careful.  听下面一段对话，回答第八至第13个小题。 现在你有15秒钟的时间阅读这三个小题。 Hello Dave.  This is Kathy from Sunny California.  Hi Kathy, you finally called.  How was the move all settled in?  Sorry I .  hadn't called sooner, but it's been a busy month.  We're slowly getting things set up in our new home.  Yeah, I understand.  How are Jeff and the children?  Jeff is doing well with his new job.  Tom has made many new friends here and has a lot to do.  Fiona is fine though she misses her grandma.  By the way, thank you for looking in on my mother from time to time.  I call her every week, but it isn't the same as .  seeing her.  No problem.  Betty and I are friends now.  How is the .  weather there?  It's nice and warm and we are able to spend some time every week on the beach with the children.  That's great.  Hello Dave.  Is Kathy .  from Sunny California?  Hi Kathy.  You finally called.  How was the move all settled in?  Sorry I hadn't called sooner.  but it's been a busy month.  We're slowly getting things set up in our new home.  Yeah, I understand.  How are Jeff and the children?  Jeff is doing well with his new job.  Tom has made many new friends here and has a lot to do.  Fiona is fine, though.  She misses her grandma.  By the way, thank you for looking in on my mother from time to time.  I call her every week, but it isn't the .  same as seeing her.  No problem.  Betty and I are friends now.  How is the weather there?  It's nice and warm, and we are able to spend some time every week on the beach .  with the children.  That's great.  听下面一段对话，回答第11至第13 3个小题。 现在您有15秒钟的时间阅读这三个小题。 Jack, how did you get to school when you were in primary school?  I lived close to my school, so I walked every day.  Why?  Well, I remember .  that when we were kids, we often walked, rode a bike or caught the bus to school.  Few of us were dropped off at the school gate .  by our parents.  I see what you mean.  These days, you can see traffic jams around schools that drop off and pick kup times, but it's hard to blame the parents.  They have good reasons for driving their kids to school, mostly to do with safety .  and convenience.  You have a point there, but it could also mean children are missing out on much needed exercise and other life skills.  Some parents are just being overprotective with their children, learning nothing but living in fear of everything.  Studies have found that children who spend more time outside tend to be healthier, better adjusted and better at dealing with stress.  Jack, how did you get to school when you were in primary school?  I lived close to my school, so I walked every day.  Why?  Well.  I remember that when we were kids, we often walked, rode a bike or caught the bus to school.  Few of us were dropped off at the school gate .  by our parents.  I see what you mean.  These days, you can see traffic jams around schools that drop off and pick kup times, but it's hard to blame the parents.  They have good reasons for driving their kids to school, mostly to do with safety .  and convenience.  You have a point there, but it could also mean children are missing out on much needed exercise and other life skills.  Some parents are just being overprotective with their children, learning nothing but living in fear of everything.  Studies have found that children who spend more time outside tend to be healthier, better adjusted and better at dealing with stress.  听下面一段对话，回答第14至第17 4个小题。 现在你有20秒钟的时间阅读这四个小题。 So Marie, your kitchen garden looks excellent.  What made you turn to social media to record your vegetable growing?  Initially, I used the online platform as a diary, something to look back on, giving me a sense of achievement and keeping me motivated and moving forward.  As time went by, other gardeners and like minded people began to follow my progress too.  I know you grow lots of fruit on your land.  Which would you recommend to beginners .  as the best to grow strawberries would be a good choice.  They produce a lot of fruit in their .  first season.  That's cool.  Well, do you have plans to try new or any particular .  crops next year?  Next season I will be adding some pear trees to the fruit area.  I will be adding more herbs, which I can use in the kitchen.  And after a couple of years of failure, I will try .  growing carrots again.  What advice would you offer someone thinking of doing kitchen gardening?  Have a plan of what you want your kitchen garden to look like.  Don't be too discouraged if things don't go according to plan.  Learn from your mistakes and move on.  There's is always next season.  So Marie, your kitchen garden looks excellent.  What made you turn to social media to record your vegetable growing?  Initially, I used the online platform as a diary, something to look back on, giving me a sense of achievement and keeping me motivated and moving forward.  As time went by, other gardeners and like minded people began to follow my progress too.  I know you grow lots of fruit on your land.  Which would you recommend to beginners .  as the best to grow strawberries would be a good choice.  They produce a lot of fruit in their .  first season.  That's cool.  Well, do you have plans to try new or any particular .  crops next year?  Next season, I will be adding some pear trees to the fruit area, I will be adding more herbs, which I can use in the kitchen, and after a couple of years of failure, I will try .  growing carrots again.  What advice would you offer someone thinking of doing kitchen .  gardening?  Have a plan of what you want your kitchen garden to look like.  Don't be too discouraged if things don't go according to plan.  Learn from your mistakes and move on.  There is always next season.  拼下面一段独白，回答第18至第23个小题。 现在你有15秒钟的时间阅读这三个小题。 Welcome to meet the author.  Well, many readers of sports times turn to the last page of their magazine first in order to read Jacob Johnson's weekly article under the title life of Johnson.  The articles, along with his novels and essay collections, have earned Johnson the reputation as one of the funniest humans on the planet.  Johnson began writing about sports as a second year student at the University of	Colorado, covering high school volleyball games for his hometown newspaper.  After graduating in 1981, he moved on to work at the Denver weekly for two years and the Los Angeles post for two more years before landing at sports times.  He has been voted national sports writer of the year eleven times.  So now let's welcome the funny man with serious talent, Jacob Johnson.  Welcome to meet the author.  Well, many readers of sports times turn to the last page of their magazine first in order to read Jacob Johnson's weekly article under the title life of Johnson.  The articles, along with his novels and essay collections, have earned Johnson the reputation as one of the funniest humans on the planet.  Johnson began writing about sports as a second year student at the University of	Colorado, covering high school volleyball games for his hometown newspaper.  After graduating in 1981, he moved on to work at the Denver weekly for two years and the Los Angeles post for two more years before landing at sports times.  He has been voted national sports writer of the year eleven times.  So now let's welcome the funny man with serious talent, Jacob Johnson.  第二节到此结束，现在你有两分钟的时间将试卷上的答案转图到答题卡上。 听力部分到此结束。


'''

In [69]:
@simple(llm_client=DEFAULT_TEXT_LLM,model="deepseek-chat")
def doc_sketch(txt: str,max_chunks, chunk_size) -> str:
    chunk_extractor = DocumentChunkExtractor()
    txt = chunk_extractor.extract_important_chunks(doc_text=content, max_chunks=10,chunk_size=256)




    return f"""

      你是一位音频理解与信息结构化专家，请分析以下音频内容，并根据音频实际情况提取摘要信息，输出标准 JSON 格式，并根据类型完成以下任务：
      0.音频的语言：用英文简写（en，zh_CN等）如有多种语言请全部给出
      1.总结出音频的内容的标题
      2.生成音频内容的摘要描述，按照文本的前后顺序，完整的讲述音频中的内容，字数控制在400-500之间
      3.给出音频中提到的关键对象，按照时间顺序依次输出，选择最重要的关键对象，不要超过二十个
      4.音频发生的场景（如有多个就依次打出）
      5.根据文本语义确定音频中说话人的数量，关系


      请严格按照下面的例子的格式输出 JSON 格式内容，不要添加解释说明：
          {{
            "language":"en",
            "topic": "寒冷天气与汽车启动问题",
            "summary": "音频描述了两位说话人讨论寒冷天气的影响，尤其是汽车启动困难的问题。提到气温为零下18度，车主没有车库，汽车启动困难以及如何通过喝咖啡取暖。",
            "objects": [ "汽车", "车库", "咖啡", "停车场","天气"],
            "scene": "场景:寒冷的室外环境，停车场",
            "relationship": "数量:2,关系:情侣"
            }}
        输入音频文本如下：{txt}
    """
result = doc_sketch(txt=content,max_chunks=10,chunk_size=256)
print(result)

```json
{
  "language": "zh_CN",
  "topic": "英语听力考试与社交俱乐部介绍",
  "summary": "音频内容主要包括两部分：第一部分是2024年普通高等学校招生全国统一考试英语科听力部分的试音内容，介绍了国际朋友社交俱乐部的活动，包括语言交流晚会和社交活动。第二部分是听力考试的正式内容，包含多段对话，涉及天气警告、搬家后的生活情况、孩子上学方式的变化以及园艺爱好者的经验分享。最后还介绍了体育作家Jacob Johnson的职业生涯和成就。",
  "objects": ["国际朋友社交俱乐部", "语言交流晚会", "天气警告", "搬家", "孩子上学方式", "园艺", "草莓", "梨树", "胡萝卜", "社交媒体", "体育作家", "Jacob Johnson", "体育杂志", "高中排球比赛", "丹佛周刊", "洛杉矶邮报", "体育时报", "国家体育作家奖"],
  "scene": "场景:英语听力考试现场，社交俱乐部，家庭生活，学校，园艺场地，体育杂志编辑部",
  "relationship": "数量:多段对话中的不同说话人，关系:考生与考官，俱乐部工作人员与潜在会员，朋友之间，家庭成员，园艺爱好者与采访者，主持人与嘉宾"
}
```
