In [None]:
# !python convert_checkpoint.py --model_dir ./spark-tts-merged --output_dir ./spark-tts-tensorrt --dtype bfloat16

In [None]:
# !trtllm-build --checkpoint_dir ./spark-tts-tensorrt --output_dir ./trt_engines/spark-tts-tensorrt/bf16/1-gpu --gemm_plugin bfloat16

In [None]:
import torch
import soundfile as sf
from pathlib import Path
import numpy as np
import re
import os
import librosa
from scipy.io import wavfile
import random
from IPython.display import Audio, display

import sys
sys.path.append('Spark-TTS')
from cli.SparkTTS import SparkTTS
from sparktts.models.audio_tokenizer import BiCodecTokenizer
from sparktts.utils.audio import audio_volume_normalize
from transformers import AutoTokenizer

# TensorRT-LLM imports
from tensorrt_llm.llmapi import LLM, SamplingParams, BuildConfig, KvCacheConfig

# 配置路径
ENGINE_DIR = "./trt_engines/spark-tts-tensorrt/bf16/1-gpu"
TOKENIZER_DIR = "spark-tts-merged"
AUDIO_TOKENIZER_DIR = "Spark-TTS-0.5B"

# 加载模型
audio_tokenizer = BiCodecTokenizer(AUDIO_TOKENIZER_DIR, "cuda")
device = torch.device("cuda:0")  # 或者 "cpu"

tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR)
print("Tokenizer加载成功!")

kv_cache_config = KvCacheConfig(
    max_tokens=4096,  # 减少最大token数量
    free_gpu_memory_fraction=0.4,  # 只使用40%的剩余显存
    enable_block_reuse=True  # 启用block重用
)

model = LLM(model=ENGINE_DIR, tokenizer=tokenizer,kv_cache_config=kv_cache_config)
print("TensorRT引擎加载成功!")


In [None]:
@torch.inference_mode()
def generate_speech_from_text(
    text: str,
    temperature: float = 0.6,   # Generation temperature
    top_k: int = 50,            # Generation top_k
    top_p: float = 0.95,        # Generation top_p
    max_new_audio_tokens: int = 2048, # Max tokens for audio part
    device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    seed: int = None  # 新增随机种子参数
) -> np.ndarray:
    """
    Generates speech audio from text using default voice control parameters.

    Args:
        text (str): The text input to be converted to speech.
        temperature (float): Sampling temperature for generation.
        top_k (int): Top-k sampling parameter.
        top_p (float): Top-p (nucleus) sampling parameter.
        max_new_audio_tokens (int): Max number of new tokens to generate (limits audio length).
        device (torch.device): Device to run inference on.
        seed (int): Random seed for reproducible generation. If None, uses random seed.

    Returns:
        np.ndarray: Generated waveform as a NumPy array.
    """

    # 设置随机种子
    if seed is not None:
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed(seed)
            torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
        np.random.seed(seed)
    

    prompt = "".join([
        "<|task_tts|>",
        "<|start_content|>",
        text,
        "<|end_content|>",
        "<|start_global_token|>"
    ])

    # 使用TensorRT-LLM的SamplingParams配置生成参数
    sampling_params = SamplingParams(
        max_tokens=max_new_audio_tokens,
        temperature=temperature,
        top_k=top_k,
        top_p=top_p,
        end_id=tokenizer.eos_token_id,
        pad_id=tokenizer.pad_token_id
    )

    print("Generating token sequence...")
    # 使用TensorRT-LLM进行推理
    outputs = model.generate([prompt], sampling_params=sampling_params)
    print("Token sequence generated.")

    # 获取生成的文本
    predicts_text = outputs[0].outputs[0].text
    # print(f"\nGenerated Text (for parsing):\n{predicts_text}\n") # Debugging

    # Extract semantic token IDs using regex
    semantic_matches = re.findall(r"<\|bicodec_semantic_(\d+)\|>", predicts_text)
    if not semantic_matches:
        print("Warning: No semantic tokens found in the generated output.")
        # Handle appropriately - perhaps return silence or raise error
        return np.array([], dtype=np.float32)

    pred_semantic_ids = torch.tensor([int(token) for token in semantic_matches]).long().unsqueeze(0) # Add batch dim

    # Extract global token IDs using regex (assuming controllable mode also generates these)
    global_matches = re.findall(r"<\|bicodec_global_(\d+)\|>", predicts_text)
    if not global_matches:
         print("Warning: No global tokens found in the generated output (controllable mode). Might use defaults or fail.")
         pred_global_ids = torch.zeros((1, 1), dtype=torch.long)
    else:
         pred_global_ids = torch.tensor([int(token) for token in global_matches]).long().unsqueeze(0) # Add batch dim

    pred_global_ids = pred_global_ids.unsqueeze(0) # Shape becomes (1, 1, N_global)

    print(f"Found {pred_semantic_ids.shape[1]} semantic tokens.")
    print(f"Found {pred_global_ids.shape[2]} global tokens.")


    # 5. Detokenize using BiCodecTokenizer
    print("Detokenizing audio tokens...")
    # Ensure audio_tokenizer and its internal model are on the correct device
    audio_tokenizer.device = device
    audio_tokenizer.model.to(device)
    for module in audio_tokenizer.model.modules():
        module.to(device)
    # Squeeze the extra dimension from global tokens as seen in SparkTTS example
    wav_np = audio_tokenizer.detokenize(
        pred_global_ids.to(device).squeeze(0), # Shape (1, N_global)
        pred_semantic_ids.to(device)           # Shape (1, N_semantic)
    )
    print("Detokenization complete.")

    return wav_np

print('TensorRT model ready!')

def trim_audio_silence(
    waveform: np.ndarray,
    sample_rate: int = 16000,
    silence_threshold: float = 0.005,  # 更严格的阈值
    min_audio_length: float = 1.0  # 最小音频长度（秒）
) -> np.ndarray:
    """
    简化版本的静音去除函数
    
    Args:
        waveform (np.ndarray): 输入音频波形
        sample_rate (int): 采样率
        silence_threshold (float): 静音阈值
        min_audio_length (float): 最小保留音频长度
    
    Returns:
        np.ndarray: 处理后的音频
    """
    
    if len(waveform) == 0:
        return waveform
    
    # 计算绝对值的移动平均来检测音频活动
    window_size = int(0.05 * sample_rate)  # 50ms窗口
    abs_audio = np.abs(waveform)
    
    # 使用卷积计算移动平均
    if len(abs_audio) < window_size:
        return waveform
    
    moving_avg = np.convolve(abs_audio, np.ones(window_size)/window_size, mode='valid')
    
    # 找到最后一个超过阈值的位置
    threshold = np.max(moving_avg) * silence_threshold
    active_indices = np.where(moving_avg > threshold)[0]
    
    if len(active_indices) == 0:
        # 如果没有找到活动音频，返回最小长度
        min_samples = int(min_audio_length * sample_rate)
        return waveform[:min(min_samples, len(waveform))]
    
    # 最后一个活动位置
    last_active = active_indices[-1] + window_size  # 补偿卷积的偏移
    
    # 添加一些缓冲
    buffer_samples = int(0.2 * sample_rate)  # 0.2秒缓冲
    end_point = min(last_active + buffer_samples, len(waveform))
    
    # 确保最小长度
    min_samples = int(min_audio_length * sample_rate)
    end_point = max(end_point, min_samples)
    
    return waveform[:end_point]



def split_text_by_punctuation(text: str, min_length: int = 60): 
    """ 
    正确处理中英文标点符号的文本分割，并确保每个句子至少达到指定长度
    中文：。！？（无空格） 
    英文：. ! ?（有空格） 
    
    Args:
        text: 要分割的文本
        min_length: 每个句子的最小字符长度，默认20
    
    Returns:
        分割后的句子列表
    """
    # 预处理：替换换行符和合并连续空格
    text = re.sub(r'\s+', ' ', text)  # 关键修改：去除所有换行和多余空格
    
    # 匹配中文标点符号或英文标点符号（可能后跟空格） 
    pattern = r'([。！？]|[.!?]\s*)' 
    parts = re.split(pattern, text) 
    
    sentences = [] 
    current_sentence = "" 
    
    for part in parts: 
        if part.strip():  # 跳过空字符串 
            # 检查是否是标点符号 
            if re.match(r'^[。！？]$', part) or re.match(r'^[.!?]\s*$', part): 
                current_sentence += part 
                # 检查当前句子长度是否满足最小要求
                if len(current_sentence.strip()) >= min_length:
                    sentences.append(current_sentence.strip()) 
                    current_sentence = "" 
                # 如果长度不够，继续累积到下一个句子
            else: 
                current_sentence += part 
    
    # 处理最后一个句子 
    if current_sentence.strip(): 
        # 如果最后一个句子太短，尝试与前一个句子合并
        if len(current_sentence.strip()) < min_length and sentences:
            sentences[-1] += current_sentence
        else:
            sentences.append(current_sentence.strip()) 
            
    # 过滤空字符串并替换中文标点为逗号
    result = []
    for s in sentences:
        if s:
            # 替换中文标点符号：。！？→，
            s = s.replace('。', '，')
            s = s.replace('！', '，')
            s = s.replace('？', '，')
            result.append(s)
            
    return result


def generate_with_retry(sentence, max_retries=3, base_delay=1):
    """带有指数退避的重试机制"""
    for attempt in range(max_retries):
        try:
            seed = random.randint(0, 2**32 - 1)
            segment_audio = generate_speech_from_text(sentence, seed=seed,)
            if segment_audio.size > 0:
                return segment_audio
            else:
                raise ValueError("生成的音频为空")
                
        except Exception as e:
            sentence = sentence + "。"
            if attempt == max_retries - 1:  # 最后一次尝试
                raise e
            
            # 指数退避 + 随机抖动
            # delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            delay = 0.1
            print(f"尝试 {attempt + 1} 失败: {str(e)}, {delay:.1f}秒后重试...")
            time.sleep(delay)
    
    return None

In [None]:
import time
start = time.perf_counter()

input_text = '''最近一周都在用AI写代码，就是这个项目，我把它开源在github上了。这个项目叫RimDJ，是一个基于AI的实时广播电台系统，可以把任何文本内容转换成带有DJ风格的语音广播。
先说说项目的整体架构吧。这是一个典型的微服务架构，采用了异步处理的设计模式。整个系统分为几个核心模块：LLM服务负责把用户输入的原始文本转换成广播稿，TTS服务把广播稿转换成语音，音频混合服务处理背景音乐，最后通过MP3编码服务实现实时流媒体广播。
技术栈方面，我选择了FastAPI作为Web框架，这个框架对异步处理支持特别好，非常适合这种实时流媒体的场景。语音合成用的是SparkTTS模型，这是一个开源的高质量TTS模型，支持中文语音合成。为了提升推理速度，我还用TensorRT对模型进行了加速优化，推理速度提升了好几倍。
最有意思的是语义感知队列系统。传统的队列只是简单的先进先出，但我设计了一个能理解广播稿完整性的智能队列。每个广播稿都有唯一的ID，系统会跟踪每个广播稿的所有音频片段，确保播放的连贯性。如果某个广播稿的片段丢失或超时，系统会自动清理相关的不完整数据，避免播放断断续续的内容。
在音频处理方面，我实现了一个多级音频处理流水线。首先是TTS生成WAV格式的原始音频，然后通过音频混合器与背景音乐进行混合，最后编码成MP3格式进行流媒体传输。整个过程都是实时的，延迟控制在几秒钟以内。
为了保证系统的稳定性，我设计了完善的错误处理机制。每个服务都有独立的错误计数器，当连续错误超过阈值时会自动重启。队列满了会智能丢弃过期数据，避免内存溢出。还有连接保活机制，确保客户端连接的稳定性。
部署方面采用了Docker容器化，支持GPU加速。我准备了两个Docker镜像，一个用于开发调试，包含Jupyter环境可以进行模型微调；另一个是生产运行镜像，专门优化了推理性能。
模型微调是这个项目的一大亮点。我用unsloth框架对SparkTTS进行了微调，让它能够生成更符合广播电台风格的语音。训练数据是我从wiki上找来的公开数据，微调也就只需要几十条广播场景的语音样本。微调后的模型在语音自然度和情感表达方面都有明显提升。
系统的实时性能表现很不错。在RTX四零九零显卡上，TTS生成速度能达到实时的3到4倍，也就是说生成1秒的语音只需要零点二五秒左右。加上TensorRT优化，整个处理流水线的端到端延迟控制在5秒左右，基本达到了实时广播的要求。
用户交互方面设计得很简单。提供了一个Web界面，用户可以直接输入任何文本内容，系统会自动转换成广播稿并生成语音。还有RESTful API接口，方便集成到其他系统中。
这个项目最大的技术挑战是处理音频流的同步问题。因为LLM生成文本、TTS合成语音、音频编码这些步骤的处理时间都不一样，如何保证最终输出的音频流是连续平滑的，这需要精心设计缓冲区和队列管理策略。
另一个挑战是内存管理。音频数据量很大，如果处理不当很容易造成内存泄漏。我实现了智能的缓冲区管理，会根据队列状态动态调整缓冲区大小，及时清理过期数据。
从技术选型来看，这个项目综合运用了深度学习、音频处理、Web开发、容器化部署等多个技术领域。特别是在AI模型优化方面，从模型微调到TensorRT加速，再到推理服务化，形成了一个完整的AI应用开发流程。
整个项目大概用了一周时间完成，代码量在两千行左右。虽然是个娱乐性质的项目，但在技术实现上还是很有挑战性的。特别是实时音频流处理这块，涉及到很多底层的音频编程知识。
这个项目展示了现代AI应用开发的典型模式：用预训练模型作为基础，通过微调适配特定场景，然后用工程化的方式部署成可用的服务。整个过程中，AI只是其中一个环节，更重要的是系统架构设计和工程实现。
点个赞吧，你已经能在github上找到它的代码，自己训练一个电台广播员。'''


sentences = split_text_by_punctuation(input_text)
print(f"分割后得到 {len(sentences)} 个句子")


audio_segments = []
with torch.no_grad():
    for i, sentence in enumerate(sentences):
        print(f"正在生成第 {i+1}/{len(sentences)} 个句子: '{sentence}'")
        try:
            segment_audio = generate_with_retry(sentence)
            audio_segments.append(trim_audio_silence(segment_audio))
            print(f"第 {i+1} 个句子生成成功")
        except Exception as e:
            print(f"第 {i+1} 个句子最终生成失败: {str(e)}")
            print(f"句子生成失败: '{sentence}', 错误: {str(e)}")
            
        # print(f"正在生成第 {i+1}/{len(sentences)} 个句子: '{sentence}'")
        # segment_audio = generate_speech_from_text(sentence)
        # if segment_audio.size > 0:
        #     audio_segments.append(segment_audio)
            
if audio_segments:
    # 添加静音间隔
    sample_rate = audio_tokenizer.config.get("sample_rate", 16000)
    silence = np.zeros(int(0.2 * sample_rate), dtype=np.float32)  # 200ms静音
    
    generated_waveform = audio_segments[0]
    for segment in audio_segments[1:]:
        generated_waveform = np.concatenate([generated_waveform, silence, segment])
else:
    generated_waveform = np.array([], dtype=np.float32)        
# generated_waveform = generate_speech_from_text(input_text)

output_filename = "generated_speech_controllable.wav"
sample_rate = audio_tokenizer.config.get("sample_rate", 16000)
sf.write(output_filename, generated_waveform, sample_rate)
print(f"Audio saved to {output_filename}")

# from IPython.display import Audio, display
# display(Audio(generated_waveform, rate=sample_rate))




mp3_file = random.choice([
    r"./dataset/rickey/rickey_000.mp3",
    r"./dataset/rickey/rickey_002.mp3",
    r"./dataset/rickey/rickey_004.mp3",
    r"./dataset/rickey/rickey_006.mp3",
    r"./dataset/rickey/rickey_008.mp3"
])

mp3_audio, mp3_sr = librosa.load(mp3_file, sr=None)
if mp3_sr != sample_rate:
    # 重采样mp3音频到目标采样率
    mp3_audio = librosa.resample(mp3_audio, orig_sr=mp3_sr, target_sr=sample_rate)

if hasattr(generated_waveform, 'cpu'):
    generated_waveform = generated_waveform.cpu().numpy()
if generated_waveform.ndim > 1:
    generated_waveform = generated_waveform.squeeze()

concatenated_audio = np.concatenate([mp3_audio, generated_waveform])

output_filename = "concatenated_audio.wav"
sf.write(output_filename, concatenated_audio, sample_rate)
print(f"拼接后的音频已保存到 {output_filename}")

display(Audio(concatenated_audio, rate=sample_rate))


elapsed = time.perf_counter() - start
print(f"耗时: {elapsed:.4f} s")