In [1]:
import uuid
from uuid import UUID, uuid4
from typing import List, Optional, Dict, Union, Tuple
from pydantic import BaseModel, Field, field_validator
import random
from enum import Enum
from pathlib import Path
import json

# Enums for audio properties
class AudioFormat(str, Enum):
    WAV = "wav"
    MP3 = "mp3"
    FLAC = "flac"
    OGG = "ogg"
    AAC = "aac"

class LanguageCode(str, Enum):
    ENGLISH = "en"
    FRENCH = "fr"
    SPANISH = "es"
    CHINESE = "zh"
    JAPANESE = "ja"
    GERMAN = "de"
    UNKNOWN = "unknown"

# Main audio model
class Audio(BaseModel):
    id: UUID = Field(default_factory=uuid4)
    file_path: str
    format: AudioFormat
    sample_rate: int = 44100  # Hz
    bit_depth: int = 16  # bits
    channels: int = 2  # 1=mono, 2=stereo
    duration: float  # Duration in seconds
    detected_language: Optional[LanguageCode] = None
    transcript: Optional[str] = None
    
    def model_post_init(self, __context):
        # Auto-save after initialization
        save_audio_metadata(self)

# Audio segment for time-based operations
class AudioSegment(BaseModel):
    id: UUID = Field(default_factory=uuid4)
    audio_id: UUID  # Reference to parent audio
    start_time: float  # Start time in seconds
    end_time: float  # End time in seconds
    
    @field_validator('end_time')
    def validate_time_range(cls, v, values):
        start = values.data.get('start_time')
        if start is not None and v <= start:
            raise ValueError('end_time must be greater than start_time')
        return v
    
    def model_post_init(self, __context):
        # Auto-save after initialization
        save_audio_metadata(self)

    @property
    def duration(self) -> float:
        return self.end_time - self.start_time

# Audio processing functions
def noise_reduction(audio_id: UUID, strength: float = 0.5) -> UUID:
    """
    Reduce noise in audio file.
    
    Parameters:
    - audio_id: UUID of the audio to process
    - strength: Noise reduction strength (0.0-1.0)
    
    Returns:
    - UUID of the processed audio
    """
    audio = load_audio_metadata(audio_id)
    
    # Create a new audio object for the result
    processed_audio = audio.model_copy()
    processed_audio.id = uuid4()
    processed_audio.file_path = f"processed_{processed_audio.id}.{audio.format.value}"
    
    # In a real implementation, this would call audio processing libraries
    print(f"Applying noise reduction with strength {strength} to {audio.file_path}")
    
    # Save the new metadata
    save_audio_metadata(processed_audio)
    return processed_audio.id

def text_to_speech(text: str, voice: str = "default", language: LanguageCode = LanguageCode.ENGLISH) -> UUID:
    """
    Convert text to speech.
    
    Parameters:
    - text: Text to convert to speech
    - voice: Voice identifier
    - language: Language code
    
    Returns:
    - UUID of the generated audio
    """
    # Create new audio for TTS result
    audio = Audio(
        file_path=f"tts_{uuid4()}.wav",
        format=AudioFormat.WAV,
        duration=len(text) * 0.07,  # Estimate duration based on text length
        detected_language=language,
        transcript=text
    )
    
    # In a real implementation, this would call a TTS service
    print(f"Generating speech for text: '{text}' using voice '{voice}' in {language}")
    
    save_audio_metadata(audio)
    return audio.id

def detect_language(audio_id: UUID) -> LanguageCode:
    """
    Detect spoken language in audio.
    
    Parameters:
    - audio_id: UUID of the audio to analyze
    
    Returns:
    - Detected language code
    """
    audio = load_audio_metadata(audio_id)
    
    # In a real implementation, this would use a speech recognition service
    detected = random.choice(list(LanguageCode))
    
    # Update audio with detected language
    audio.detected_language = detected
    save_audio_metadata(audio)
    
    return detected

def transcribe_audio(audio_id: UUID, language: Optional[LanguageCode] = None) -> str:
    """
    Transcribe speech in audio to text.
    
    Parameters:
    - audio_id: UUID of the audio to transcribe
    - language: Language code (if known)
    
    Returns:
    - Transcribed text
    """
    audio = load_audio_metadata(audio_id)
    
    # Use provided language or detected language
    lang = language or audio.detected_language or LanguageCode.ENGLISH
    
    # In a real implementation, this would use a speech recognition service
    sample_texts = {
        LanguageCode.ENGLISH: "This is a sample transcription.",
        LanguageCode.FRENCH: "Ceci est une transcription exemple.",
        LanguageCode.SPANISH: "Esta es una transcripción de muestra.",
        LanguageCode.CHINESE: "这是一个示例转录。",
        LanguageCode.JAPANESE: "これはサンプル文字起こしです。",
        LanguageCode.GERMAN: "Dies ist eine Beispieltranskription.",
        LanguageCode.UNKNOWN: "This is a sample transcription."
    }
    
    transcript = sample_texts.get(lang, "This is a sample transcription.")
    
    # Update audio with transcript
    audio.transcript = transcript
    save_audio_metadata(audio)
    
    return transcript

def split_audio_by_time(audio_id: UUID, timestamps: List[float]) -> List[UUID]:
    """
    Split audio at specified timestamps.
    
    Parameters:
    - audio_id: UUID of the audio to split
    - timestamps: List of timestamps in seconds where to split
    
    Returns:
    - List of UUIDs for the resulting AudioSegment objects
    """
    audio = load_audio_metadata(audio_id)
    
    # Sort timestamps and add start/end if needed
    all_points = sorted([0] + timestamps + [audio.duration])
    
    segments = []
    for i in range(len(all_points) - 1):
        segment = AudioSegment(
            audio_id=audio.id,
            start_time=all_points[i],
            end_time=all_points[i+1]
        )
        segments.append(segment)
        save_audio_metadata(segment)
    
    return [segment.id for segment in segments]

def split_audio_by_silence(audio_id: UUID, min_silence_duration: float = 0.5, silence_threshold: float = -40) -> List[UUID]:
    """
    Split audio at silent parts.
    
    Parameters:
    - audio_id: UUID of the audio to split
    - min_silence_duration: Minimum silence duration to trigger split (seconds)
    - silence_threshold: Volume threshold for silence detection (dB)
    
    Returns:
    - List of UUIDs for the resulting AudioSegment objects
    """
    audio = load_audio_metadata(audio_id)
    
    # In a real implementation, this would analyze the audio waveform
    # For this example, we'll generate random splits
    num_splits = random.randint(2, 5)
    points = sorted([random.uniform(0, audio.duration) for _ in range(num_splits)])
    
    # Create segments
    return split_audio_by_time(audio_id, points)

def merge_audio_segments(segment_ids: List[UUID], crossfade: float = 0.0) -> UUID:
    """
    Merge multiple audio segments into a single audio file.
    
    Parameters:
    - segment_ids: List of AudioSegment UUIDs to merge
    - crossfade: Crossfade duration between segments (seconds)
    
    Returns:
    - UUID of the resulting audio
    """
    if not segment_ids:
        raise ValueError("No segments provided for merging")
    
    segments = [load_audio_metadata(seg_id) for seg_id in segment_ids]
    
    # Get parent audio to determine properties
    parent_audios = [load_audio_metadata(seg.audio_id) for seg in segments]
    
    # Use properties from first parent audio
    first_parent = parent_audios[0]
    
    # Calculate total duration
    total_duration = sum(seg.duration for seg in segments)
    if crossfade > 0 and len(segments) > 1:
        total_duration -= crossfade * (len(segments) - 1)
    
    # Create new audio for merged result
    merged_audio = Audio(
        file_path=f"merged_{uuid4()}.{first_parent.format.value}",
        format=first_parent.format,
        sample_rate=first_parent.sample_rate,
        bit_depth=first_parent.bit_depth,
        channels=first_parent.channels,
        duration=total_duration
    )
    
    # In a real implementation, this would perform actual audio merging
    print(f"Merging {len(segments)} segments with {crossfade}s crossfade")
    
    save_audio_metadata(merged_audio)
    return merged_audio.id

def apply_effect(audio_id: UUID, effect_type: str, parameters: Dict = None) -> UUID:
    """
    Apply audio effect to the audio.
    
    Parameters:
    - audio_id: UUID of the audio to process
    - effect_type: Type of effect (e.g., "reverb", "eq", "compression")
    - parameters: Effect-specific parameters
    
    Returns:
    - UUID of the processed audio
    """
    audio = load_audio_metadata(audio_id)
    parameters = parameters or {}
    
    # Create a new audio object for the result
    processed_audio = audio.model_copy()
    processed_audio.id = uuid4()
    processed_audio.file_path = f"{effect_type}_{processed_audio.id}.{audio.format.value}"
    
    # In a real implementation, this would apply the actual effect
    print(f"Applying {effect_type} effect with parameters {parameters} to {audio.file_path}")
    
    save_audio_metadata(processed_audio)
    return processed_audio.id

def change_audio_format(audio_id: UUID, target_format: AudioFormat) -> UUID:
    """
    Convert audio to a different format.
    
    Parameters:
    - audio_id: UUID of the audio to convert
    - target_format: Target audio format
    
    Returns:
    - UUID of the converted audio
    """
    audio = load_audio_metadata(audio_id)
    
    # If already in target format, return original
    if audio.format == target_format:
        return audio.id
    
    # Create a new audio object for the result
    converted_audio = audio.model_copy()
    converted_audio.id = uuid4()
    converted_audio.format = target_format
    
    # Update file path with new extension
    base_name = Path(audio.file_path).stem
    converted_audio.file_path = f"{base_name}.{target_format.value}"
    
    # In a real implementation, this would convert the audio file
    print(f"Converting {audio.file_path} from {audio.format} to {target_format}")
    
    save_audio_metadata(converted_audio)
    return converted_audio.id

def adjust_audio_properties(
    audio_id: UUID, 
    sample_rate: Optional[int] = None,
    bit_depth: Optional[int] = None,
    channels: Optional[int] = None
) -> UUID:
    """
    Adjust audio properties like sample rate, bit depth, or channels.
    
    Parameters:
    - audio_id: UUID of the audio to adjust
    - sample_rate: Target sample rate in Hz
    - bit_depth: Target bit depth in bits
    - channels: Target number of channels
    
    Returns:
    - UUID of the adjusted audio
    """
    audio = load_audio_metadata(audio_id)
    
    # Create a new audio object for the result
    adjusted_audio = audio.model_copy()
    adjusted_audio.id = uuid4()
    
    # Update properties if provided
    if sample_rate is not None:
        adjusted_audio.sample_rate = sample_rate
    if bit_depth is not None:
        adjusted_audio.bit_depth = bit_depth
    if channels is not None:
        adjusted_audio.channels = channels
    
    adjusted_audio.file_path = f"adjusted_{adjusted_audio.id}.{audio.format.value}"
    
    # In a real implementation, this would adjust the audio properties
    print(f"Adjusting audio properties: sample_rate={sample_rate}, bit_depth={bit_depth}, channels={channels}")
    
    save_audio_metadata(adjusted_audio)
    return adjusted_audio.id

# Storage functions
AUDIO_DATA_DIR = "./audio_data"

def save_audio_metadata(obj: Union[Audio, AudioSegment]):
    """Save audio metadata to JSON file"""
    # Ensure directory exists
    Path(AUDIO_DATA_DIR).mkdir(parents=True, exist_ok=True)
    
    # Save to JSON file
    file_path = Path(AUDIO_DATA_DIR) / f"{obj.id}.json"
    with open(file_path, 'w') as f:
        f.write(obj.model_dump_json())

def load_audio_metadata(obj_id: UUID) -> Union[Audio, AudioSegment]:
    """Load audio metadata from JSON file"""
    file_path = Path(AUDIO_DATA_DIR) / f"{obj_id}.json"
    
    if not file_path.exists():
        raise ValueError(f"No audio data found for ID {obj_id}")
    
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # Determine type by checking for specific fields
    if 'start_time' in data:
        return AudioSegment.model_validate(data)
    else:
        return Audio.model_validate(data)

# Example functions to create objects
def create_audio_from_file(file_path: str, format: AudioFormat, **kwargs) -> UUID:
    """Create Audio object from file path"""
    # Get audio properties (in a real implementation, would extract from the file)
    props = {
        'sample_rate': 44100,
        'bit_depth': 16,
        'channels': 2,
        'duration': 60.0,  # Default 1 minute
    }
    
    # Override with provided kwargs
    props.update(kwargs)
    
    audio = Audio(
        file_path=file_path,
        format=format,
        **props
    )
    
    return audio.id

# Higher-level processing functions
def enhance_audio(audio_id: UUID) -> UUID:
    """
    Apply a series of enhancements to improve audio quality.
    
    Parameters:
    - audio_id: UUID of the audio to enhance
    
    Returns:
    - UUID of the enhanced audio
    """
    # Apply noise reduction
    reduced_id = noise_reduction(audio_id, strength=0.7)
    
    # Apply EQ effect
    eq_params = {
        "low": 1.2,
        "mid": 1.0,
        "high": 1.1
    }
    eq_id = apply_effect(reduced_id, "eq", eq_params)
    
    # Apply compression
    comp_params = {
        "threshold": -20,
        "ratio": 4,
        "attack": 5,
        "release": 50
    }
    compressed_id = apply_effect(eq_id, "compression", comp_params)
    
    return compressed_id

def voice_activity_detection(audio_id: UUID) -> List[Tuple[float, float]]:
    """
    Detect segments with voice activity.
    
    Parameters:
    - audio_id: UUID of the audio to analyze
    
    Returns:
    - List of (start_time, end_time) tuples for voice segments
    """
    audio = load_audio_metadata(audio_id)
    
    # In a real implementation, this would analyze the audio
    # For this example, generate random voice segments
    segments = []
    current_time = 0
    
    while current_time < audio.duration:
        # Random voice segment length
        voice_length = random.uniform(1.0, 5.0)
        if current_time + voice_length > audio.duration:
            voice_length = audio.duration - current_time
            
        segments.append((current_time, current_time + voice_length))
        
        # Random silence between segments
        silence_length = random.uniform(0.2, 2.0)
        current_time += voice_length + silence_length
        
        if current_time >= audio.duration:
            break
    
    return segments

def auto_generate_segments(audio_id: UUID) -> List[UUID]:
    """
    Automatically generate segments based on voice activity.
    
    Parameters:
    - audio_id: UUID of the audio to segment
    
    Returns:
    - List of UUIDs for the resulting AudioSegment objects
    """
    # Detect voice activity
    voice_segments = voice_activity_detection(audio_id)
    
    # Create segments
    segment_ids = []
    audio = load_audio_metadata(audio_id)
    
    for start, end in voice_segments:
        segment = AudioSegment(
            audio_id=audio.id,
            start_time=start,
            end_time=end
        )
        save_audio_metadata(segment)
        segment_ids.append(segment.id)
    
    return segment_ids

def batch_transcribe_segments(segment_ids: List[UUID]) -> Dict[UUID, str]:
    """
    Transcribe multiple audio segments.
    
    Parameters:
    - segment_ids: List of AudioSegment UUIDs to transcribe
    
    Returns:
    - Dictionary mapping segment UUIDs to transcriptions
    """
    results = {}
    
    for seg_id in segment_ids:
        segment = load_audio_metadata(seg_id)
        audio = load_audio_metadata(segment.audio_id)
        
        # Extract segment from audio
        # In a real implementation, this would extract the actual audio segment
        
        # Transcribe the segment
        transcription = f"Transcription of segment {seg_id} from {segment.start_time:.2f}s to {segment.end_time:.2f}s"
        
        results[seg_id] = transcription
    
    return results

In [2]:
def process_podcast_recording(file_path: str):
    """处理播客录音：降噪处理后转换为MP3格式"""
    
    # 创建原始音频
    original_audio_id = create_audio_from_file(
        file_path=file_path,
        format=AudioFormat.WAV,
        duration=45.5,
        sample_rate=48000
    )
    print(f"已加载播客录音：{file_path}")
    
    # 降噪处理
    cleaned_audio_id = noise_reduction(original_audio_id, strength=0.6)
    print("已应用降噪处理")
    
    # 转换为MP3格式（便于分享）
    mp3_audio_id = change_audio_format(cleaned_audio_id, AudioFormat.MP3)
    print("已转换为MP3格式")
    
    return mp3_audio_id

# 使用示例
podcast_mp3_id = process_podcast_recording("podcast_episode12.wav")

已加载播客录音：podcast_episode12.wav
Applying noise reduction with strength 0.6 to podcast_episode12.wav
已应用降噪处理
Converting processed_7980250c-2b88-4809-a78d-a68419886ef2.wav from AudioFormat.WAV to AudioFormat.MP3
已转换为MP3格式


In [3]:
def transcribe_interview(interview_file: str):
    """处理采访录音：按静默部分分段并转录"""
    
    # 创建音频
    interview_id = create_audio_from_file(
        file_path=interview_file,
        format=AudioFormat.FLAC,
        duration=1250.0,  # 约20分钟
        channels=2
    )
    print(f"已加载采访录音：{interview_file}")
    
    # 先检测语言
    language = detect_language(interview_id)
    print(f"检测到语言：{language}")
    
    # 按静默部分分段（可能是问答之间的停顿）
    segments_ids = split_audio_by_silence(
        interview_id, 
        min_silence_duration=0.7,
        silence_threshold=-35
    )
    print(f"将采访分为{len(segments_ids)}个段落")
    
    # 批量转录所有段落
    transcriptions = batch_transcribe_segments(segments_ids)
    
    # 打印转录结果
    for i, (segment_id, text) in enumerate(transcriptions.items()):
        segment = load_audio_metadata(segment_id)
        print(f"段落 {i+1} [{segment.start_time:.1f}s - {segment.end_time:.1f}s]: {text}")
    
    return segments_ids, transcriptions

# 使用示例
segments, texts = transcribe_interview("expert_interview.flac")

已加载采访录音：expert_interview.flac
检测到语言：LanguageCode.JAPANESE
将采访分为3个段落
段落 1 [0.0s - 658.4s]: Transcription of segment 576315ca-1fd8-41a5-8095-a67aabac5320 from 0.00s to 658.44s
段落 2 [658.4s - 898.9s]: Transcription of segment edcc542a-9815-4f54-b400-6f8c44081132 from 658.44s to 898.91s
段落 3 [898.9s - 1250.0s]: Transcription of segment 1715178f-6a76-4c9e-b4b7-40e0499b77fd from 898.91s to 1250.00s


In [4]:
def create_audiobook_chapter(chapter_text: str, voice: str = "narrator"):
    """创建有声书章节：文本转语音并增强音频效果"""
    
    # 文本转语音
    raw_audio_id = text_to_speech(
        text=chapter_text,
        voice=voice,
        language=LanguageCode.ENGLISH
    )
    print(f"已生成有声书音频，长度约{len(chapter_text) * 0.07:.1f}秒")
    
    # 应用音频效果，使其听起来更专业
    # 1. 先添加轻微混响，增加空间感
    reverb_audio_id = apply_effect(
        raw_audio_id, 
        "reverb", 
        {"room_size": 0.2, "damping": 0.5, "wet_level": 0.1, "dry_level": 0.9}
    )
    
    # 2. 应用压缩和均衡，使声音更饱满
    enhanced_audio_id = enhance_audio(reverb_audio_id)
    print("已应用专业音频增强效果")
    
    return enhanced_audio_id

# 使用示例
chapter_text = "It was the best of times, it was the worst of times, it was the age of wisdom..."
audiobook_id = create_audiobook_chapter(chapter_text, voice="british_male")

Generating speech for text: 'It was the best of times, it was the worst of times, it was the age of wisdom...' using voice 'british_male' in LanguageCode.ENGLISH
已生成有声书音频，长度约5.6秒
Applying reverb effect with parameters {'room_size': 0.2, 'damping': 0.5, 'wet_level': 0.1, 'dry_level': 0.9} to tts_608038da-589a-4f93-a920-982985f6141c.wav
Applying noise reduction with strength 0.7 to reverb_bccb2be8-30e9-4203-8188-02b95a18cdd2.wav
Applying eq effect with parameters {'low': 1.2, 'mid': 1.0, 'high': 1.1} to processed_cc9e18c1-af8e-419b-b118-1c464836ebee.wav
Applying compression effect with parameters {'threshold': -20, 'ratio': 4, 'attack': 5, 'release': 50} to eq_c3ecd01a-79ac-4c32-a99a-1aec3c854f38.wav
已应用专业音频增强效果


In [5]:
def process_multilingual_speech(file_path: str):
    """处理多语言演讲：检测语言并按时间分段"""
    
    # 创建音频
    speech_id = create_audio_from_file(
        file_path=file_path,
        format=AudioFormat.WAV,
        duration=900.0,  # 15分钟
        sample_rate=44100
    )
    print(f"已加载演讲：{file_path}")
    
    # 按时间点分段（假设我们已知演讲者在特定时间点切换语言）
    time_points = [120.5, 350.2, 560.0, 720.8]  # 秒
    segments_ids = split_audio_by_time(speech_id, time_points)
    print(f"已将演讲分为{len(segments_ids)}个段落")
    
    # 检测每个段落的语言
    results = []
    for i, seg_id in enumerate(segments_ids):
        segment = load_audio_metadata(seg_id)
        segment_audio_id = create_audio_from_file(
            f"segment_{i}.wav",
            format=AudioFormat.WAV,
            duration=segment.duration,
            sample_rate=44100
        )
        
        language = detect_language(segment_audio_id)
        
        # 转录该段
        transcript = transcribe_audio(segment_audio_id, language)
        
        results.append({
            "segment": i+1,
            "time_range": f"{segment.start_time:.1f}s - {segment.end_time:.1f}s",
            "language": language,
            "transcript": transcript
        })
    
    # 打印结果
    for result in results:
        print(f"段落 {result['segment']} [{result['time_range']}]: {result['language']} - {result['transcript'][:50]}...")
    
    return results

# 使用示例
speech_analysis = process_multilingual_speech("international_conference.wav")

已加载演讲：international_conference.wav
已将演讲分为5个段落
段落 1 [0.0s - 120.5s]: LanguageCode.GERMAN - Dies ist eine Beispieltranskription....
段落 2 [120.5s - 350.2s]: LanguageCode.FRENCH - Ceci est une transcription exemple....
段落 3 [350.2s - 560.0s]: LanguageCode.UNKNOWN - This is a sample transcription....
段落 4 [560.0s - 720.8s]: LanguageCode.ENGLISH - This is a sample transcription....
段落 5 [720.8s - 900.0s]: LanguageCode.UNKNOWN - This is a sample transcription....


In [6]:
def analyze_phone_call(call_recording: str):
    """分析电话通话：检测语音活动并降噪处理"""
    
    # 创建音频
    call_id = create_audio_from_file(
        file_path=call_recording,
        format=AudioFormat.MP3,
        duration=185.0,  # 约3分钟
        sample_rate=8000,  # 电话音质
        channels=1  # 单声道
    )
    print(f"已加载通话录音：{call_recording}")
    
    # 先降噪（电话通常有噪音）
    cleaned_id = noise_reduction(call_id, strength=0.8)
    print("已应用降噪处理")
    
    # 检测语音活动（找出说话部分）
    voice_segments = voice_activity_detection(cleaned_id)
    print(f"检测到{len(voice_segments)}个语音段落")
    
    # 创建语音段落
    segment_ids = []
    for i, (start, end) in enumerate(voice_segments):
        segment = AudioSegment(
            audio_id=cleaned_id,
            start_time=start,
            end_time=end
        )
        save_audio_metadata(segment)
        segment_ids.append(segment.id)
        print(f"语音段落 {i+1}: {start:.1f}s - {end:.1f}s (持续{end-start:.1f}s)")
    
    # 转录所有语音段落
    transcriptions = batch_transcribe_segments(segment_ids)
    
    return cleaned_id, segment_ids, transcriptions

# 使用示例
clean_call_id, voice_segments, texts = analyze_phone_call("customer_service_call.mp3")

已加载通话录音：customer_service_call.mp3
Applying noise reduction with strength 0.8 to customer_service_call.mp3
已应用降噪处理
检测到43个语音段落
语音段落 1: 0.0s - 2.0s (持续2.0s)
语音段落 2: 3.9s - 7.9s (持续4.0s)
语音段落 3: 9.2s - 10.4s (持续1.2s)
语音段落 4: 12.4s - 15.3s (持续2.9s)
语音段落 5: 17.1s - 19.3s (持续2.2s)
语音段落 6: 20.7s - 22.9s (持续2.2s)
语音段落 7: 24.7s - 28.9s (持续4.3s)
语音段落 8: 29.7s - 32.6s (持续2.9s)
语音段落 9: 34.1s - 37.9s (持续3.8s)
语音段落 10: 39.9s - 41.6s (持续1.7s)
语音段落 11: 42.2s - 44.9s (持续2.7s)
语音段落 12: 45.3s - 47.1s (持续1.8s)
语音段落 13: 48.3s - 51.3s (持续3.0s)
语音段落 14: 52.6s - 56.5s (持续3.9s)
语音段落 15: 58.4s - 61.0s (持续2.6s)
语音段落 16: 62.1s - 65.9s (持续3.8s)
语音段落 17: 67.6s - 68.8s (持续1.1s)
语音段落 18: 69.4s - 71.5s (持续2.1s)
语音段落 19: 73.0s - 76.6s (持续3.6s)
语音段落 20: 77.1s - 80.3s (持续3.2s)
语音段落 21: 82.2s - 85.9s (持续3.7s)
语音段落 22: 86.5s - 91.0s (持续4.4s)
语音段落 23: 91.6s - 93.6s (持续2.1s)
语音段落 24: 95.0s - 99.1s (持续4.2s)
语音段落 25: 100.1s - 103.1s (持续3.0s)
语音段落 26: 104.0s - 106.9s (持续2.9s)
语音段落 27: 107.2s - 111.7s (持续4.5s)
语音段落 28: 113.3s - 1

In [7]:
def create_music_compilation(track_files: List[str], output_format: AudioFormat = AudioFormat.FLAC):
    """创建音乐合辑：合并多个音频并转换格式"""
    
    # 加载所有音轨
    track_ids = []
    for file_path in track_files:
        # 确定文件格式
        extension = Path(file_path).suffix.lower()[1:]
        format_value = None
        for fmt in AudioFormat:
            if fmt.value == extension:
                format_value = fmt
                break
        if not format_value:
            format_value = AudioFormat.WAV  # 默认
            
        # 创建音频
        track_id = create_audio_from_file(
            file_path=file_path,
            format=format_value,
            duration=random.uniform(180, 300)  # 3-5分钟
        )
        track_ids.append(track_id)
    
    print(f"已加载{len(track_ids)}个音轨")
    
    # 创建每个音轨的片段（完整轨道）
    segment_ids = []
    for track_id in track_ids:
        track = load_audio_metadata(track_id)
        segment = AudioSegment(
            audio_id=track_id,
            start_time=0,
            end_time=track.duration
        )
        save_audio_metadata(segment)
        segment_ids.append(segment.id)
    
    # 合并所有片段，使用交叉淡入淡出
    merged_id = merge_audio_segments(segment_ids, crossfade=3.0)
    print("已合并所有音轨，使用3秒交叉淡入淡出")
    
    # 转换为目标格式
    final_id = change_audio_format(merged_id, output_format)
    print(f"已转换为{output_format.value}格式")
    
    # 调整音频属性（高质量输出）
    adjusted_id = adjust_audio_properties(
        final_id,
        sample_rate=96000,
        bit_depth=24
    )
    print("已调整为高质量音频属性")
    
    return adjusted_id

# 使用示例
track_files = ["track1.mp3", "track2.wav", "track3.mp3", "track4.flac"]
compilation_id = create_music_compilation(track_files, AudioFormat.FLAC)

已加载4个音轨
Merging 4 segments with 3.0s crossfade
已合并所有音轨，使用3秒交叉淡入淡出
Converting merged_cf230c21-cedb-4c0c-be15-abd26cfcf56c.mp3 from AudioFormat.MP3 to AudioFormat.FLAC
已转换为flac格式
Adjusting audio properties: sample_rate=96000, bit_depth=24, channels=None
已调整为高质量音频属性
