In [18]:
        # "yt-dlp -F https://www.twitch.tv/videos/2386208922"  # 查看视频流信息

# sb1        mhtml 110x62       0 │                  mhtml │ images                     storyboard
# sb0        mhtml 220x124      0 │                  mhtml │ images                     storyboard
# Audio_Only mp4   audio only     │ ~790.97MiB  217k m3u8  │ audio only  mp4a.40.2 217k
# 160p       mp4   284x160     30 │ ~  1.04GiB  291k m3u8  │ avc1.4D000C mp4a.40.2
# 360p       mp4   640x360     30 │ ~  2.64GiB  743k m3u8  │ avc1.4D001E mp4a.40.2
# 480p       mp4   852x480     30 │ ~  5.24GiB 1471k m3u8  │ avc1.4D001F mp4a.40.2
# 720p60     mp4   1280x720    60 │ ~ 12.22GiB 3432k m3u8  │ avc1.4D0020 mp4a.40.2
# 1080p60    mp4   1920x1080   60 │ ~ 30.05GiB 8441k m3u8  │ avc1.64002A mp4a.40.2

In [None]:
import subprocess
import os
from urllib.parse import urlparse

def download_twitch(video_url, outputfile, start_time=None, end_time=None, stream='1080p60'):
    parsed_url = urlparse(video_url)
    video_code = parsed_url.path.split('/')[-1]
    # 命名
    if start_time and end_time:
        output_path = os.path.join(
            outputfile, 
            f"{video_code}_{start_time.replace(':', '')}_{end_time.replace(':', '')}.%(ext)s"
        )
    else:
        output_path = os.path.join(
            outputfile, 
            f"{video_code}.%(ext)s"
        )
    part_file = output_path + ".part"
    # 找到output_path最后一个.的位置取之前的字符串
    noext = output_path[:output_path.rfind('.')]+".mp4"
    # 判断是否已经下载
    if os.path.exists(noext):
        print(f"{noext} 已存在，跳过下载。")
        return
    # # 判断 start_time.replace(':', '') 是否出现在文件名中
    # for file in os.listdir(outputfile):
    #     if start_time.replace(':', '') in file:
    #         print(f"{file} 起始位置已存在，跳过下载。")
    #         return
    # command.append('--force-overwrites')  # 覆盖已有文件，如果part损坏

    command = [
        'yt-dlp', video_url,   # 调用 yt-dlp
        '-f', stream,    # 指定只下载 xxx 格式
        '-o', output_path,     # 输出文件名
    ]

    if start_time and end_time:
        command += ['--download-sections', f"*{start_time}-{end_time}"]
    # 检查是否已有部分下载
    if os.path.exists(part_file):
        print(f"检测到未完成的文件 '{part_file}'，将尝试续传。")
    try:
        result = subprocess.run(command, check=True, text=True, capture_output=True)
        print(result.stdout)  # 输出 yt-dlp 的日志信息
        print(f"下载成功： {output_path}\n --------------------\n ")
    except subprocess.CalledProcessError as e:
        print(f"下载出错: {e}")
        print(e.stderr)  # 输出错误信息
    except FileNotFoundError:
        print("错误：未找到 yt-dlp，请确保已安装并配置环境变量")

# 转换6h44m10s   --->   "05:44:10"
def convert_timestr(ss):
    h = ss.split('h')[0]
    m = ss.split('h')[1].split('m')[0]
    s = ss.split('m')[1].split('s')[0]
    ms = ss.split('s')[1].split('x')[0]
    return f"{h.zfill(2)}:{m.zfill(2)}:{s.zfill(2)}.{ms.zfill(3)}"

def seconds_to_hms(seconds):
    hours = int(seconds // 3600)  # 计算小时数
    minutes = int((seconds % 3600) // 60)  # 计算分钟数
    secs = seconds % 60  # 计算秒数（包括小数部分）
    return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"  # 格式化为 HH:MM:SS.sss

def hms_to_seconds(time_str):
    hours, minutes, seconds = time_str.split(':')
    seconds1, milliseconds = seconds.split('.')
    total_seconds = int(hours) * 3600 + int(minutes) * 60 + int(seconds1) + int(milliseconds) / 1000
    return total_seconds

# 使用示例
if __name__ == "__main__":
    # 替换为你的 Twitch 视频 URL
    twitch_url = "https://www.twitch.tv/videos/2386208922"
    save_directory = "E:\\mande\\202503_PLAN"
    start_time = convert_timestr("6h50m10s400x")
    end_time = convert_timestr("6h50m20s000x")
    # 格式"00:10:00"6h44m10s
    # download_twitch(twitch_url, save_directory, stream='Audio_Only')
    # download_twitch(twitch_url, save_directory,start_time,end_time, stream='1080p60')


In [None]:
import librosa
import numpy as np
import os
from urllib.parse import urlparse
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from dtw import dtw 

# 参数设置
SAMPLE_RATE = 22050
WINDOW_SIZE = 0.5
HOP_SIZE = 0.1
CLIP_DURATION = 1
THRESHOLD = 3500

def load_templates(template_folder):
    templates_features = []
    for filename in os.listdir(template_folder):
        if filename.endswith(('.aac')):
            filepath = os.path.join(template_folder, filename)
            print(f"正在加载模板文件: {filename}")
            audio, sr = librosa.load(filepath, sr=SAMPLE_RATE)
            stft = np.abs(librosa.stft(y=audio, n_fft=256, hop_length=64))
            stft = np.log(stft + 1e-8)
            duration = librosa.get_duration(y=audio, sr=SAMPLE_RATE)
            print(f"模板 {filename} 的 STFT 形状: {stft.shape}, 时长: {duration:.3f}秒")
            templates_features.append(stft)
    print(f"模板加载完成，共加载 {len(templates_features)} 个模板")
    return templates_features

def extract_audio_features(audio_path):
    print(f"开始提取音频特征: {audio_path}")
    y, sr = librosa.load(audio_path, sr=SAMPLE_RATE)
    stft = np.abs(librosa.stft(y=y, n_fft=256, hop_length=64))
    stft = np.log(stft + 1e-8)
    duration = librosa.get_duration(y=y, sr=sr)
    print(f"音频特征提取完成，STFT 形状: {stft.shape}, 时长: {duration:.2f}秒")
    return y, sr, stft, duration

def find_matches(templates_features, video_features, sr, window_size, hop_size, path):
    print("开始进行滑动窗口匹配...")
    n_samples_hop = int(hop_size * sr)
    matches = []
    
    os.makedirs(path, exist_ok=True)
    output_file = os.path.join(path, "matches_record.txt")
    with open(output_file, 'w') as f:
        f.write("Matches Record (timestamp in seconds, with distance):\n")
    
    for start in range(0, video_features.shape[1] - max(t.shape[1] for t in templates_features), n_samples_hop):
        segment = video_features[:, start:start + min(t.shape[1] for t in templates_features)]
        min_distance = float('inf')
        
        print(f"Segment 形状: {segment.shape}, 模板形状: {templates_features[0].shape}")
        print(f"Segment 时间范围: {start * hop_size / sr:.3f}s - {(start + segment.shape[1] * 64 / 22050):.3f}s")
        
        if start < 3 * n_samples_hop:
            plt.figure(figsize=(10, 4))
            librosa.display.specshow(segment, sr=sr, x_axis='time', y_axis='hz', cmap='viridis')
            plt.colorbar(format='%+2.0f dB')
            plt.title(f'Segment STFT at {start * hop_size / sr:.2f}s')
            plt.savefig(os.path.join(path, f'segment_{start}.png'))
            plt.close()
            if start == 0:
                plt.figure(figsize=(10, 4))
                librosa.display.specshow(templates_features[0], sr=sr, x_axis='time', y_axis='hz', cmap='viridis')
                plt.colorbar(format='%+2.0f dB')
                plt.title('Template STFT')
                plt.savefig(os.path.join(path, 'template_0.png'))
                plt.close()
        
        for template_features in templates_features:
            if segment.shape[1] >= template_features.shape[1] * 0.3:
                segment_adjusted = segment[:, :template_features.shape[1]]
                distance, _, _, _ = dtw(segment_adjusted.T, template_features.T, dist=lambda x, y: np.linalg.norm(x - y))
                print(f"模板 {templates_features.index(template_features)} 的 DTW 距离: {distance}")
                min_distance = min(min_distance, distance)
        
        print(f"窗口 {start * hop_size / sr:.2f}s 的最小距离: {min_distance}")
        if min_distance < THRESHOLD:
            start_time = start * hop_size / sr
            matches.append(start_time)
            print(f"找到匹配：{start_time:.3f}秒, 距离: {min_distance}")
            with open(output_file, 'a') as f:
                f.write(f"{start_time:.3f} (distance: {min_distance})\n")

    if matches:
        print(f"找到 {len(matches)} 个初步匹配项，开始去重...")
        deduped_matches = [matches[0]]
        for i in range(1, len(matches)):
            if matches[i] - deduped_matches[-1] > CLIP_DURATION / 2:
                deduped_matches.append(matches[i])
        with open(output_file, 'a') as f:
            f.write("\nDeduplicated Matches:\n")
            for match in deduped_matches:
                f.write(f"{match:.3f}\n")
        print(f"去重完成，最终匹配数: {len(deduped_matches)}")
        print(f"匹配结果已记录到: {output_file}")
        return deduped_matches
    
    print("未找到任何匹配项")
    with open(output_file, 'a') as f:
        f.write("No matches found\n")
    return matches

# 将时间戳转换为 "HH:MM:SS" 格式
def seconds_to_timestr(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = seconds % 60
    return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"

# 主函数：分析音频并下载对应片段
def process_twitch_segments(twitch_url, audio_path, template_folder, output_folder):
    print(f"开始处理 Twitch 视频: {twitch_url}")
    parsed_url = urlparse(twitch_url)
    video_code = parsed_url.path.split('/')[-1]
    output_folder = os.path.join(output_folder, video_code)
    print(f"输出文件夹: {output_folder}")
    
    templates_features = load_templates(template_folder)
    y, sr, video_features, duration = extract_audio_features(audio_path)
    matches = find_matches(templates_features, video_features, sr, WINDOW_SIZE, HOP_SIZE, output_folder)
    
    if not matches:
        print("未找到任何匹配的时间戳")
        return
    
    for i, start_time in enumerate(matches):
        start_str = seconds_to_timestr(start_time)
        end_str = seconds_to_timestr(min(start_time + CLIP_DURATION, duration))
        print(f"正在下载片段 {i+1}: {start_str} - {end_str}")
        download_twitch(twitch_url, output_folder, start_str, end_str, select_audio=False)

    print(f"共找到 {len(matches)} 个匹配片段，已下载至 {output_folder}")

# 使用示例
if __name__ == "__main__":
    twitch_url = "https://www.twitch.tv/videos/2386208922"
    audio_path = "E:\\mande\\202503_PLAN\\audio\\2386208922_065010.400_065020.000.aac"
    template_folder = "E:\\mande\\202503_PLAN\\audio_template"
    output_folder = "E:\\mande\\202503_PLAN\\clips"
    process_twitch_segments(twitch_url, audio_path, template_folder, output_folder)

开始处理 Twitch 视频: https://www.twitch.tv/videos/2386208922
输出文件夹: E:\mande\202503_PLAN\clips\2386208922
正在加载模板文件: 2386208922_065014.500_065015.000.aac
模板 2386208922_065014.500_065015.000.aac 的 STFT 形状: (129, 184), 时长: 0.533秒
模板加载完成，共加载 1 个模板
开始提取音频特征: E:\mande\202503_PLAN\audio\2386208922_065010.400_065020.000.aac
音频特征提取完成，STFT 形状: (129, 2771), 时长: 8.04秒
开始进行滑动窗口匹配...
Segment 形状: (129, 184), 模板形状: (129, 184)
Segment 时间范围: 0.000s - 0.534s


  audio, sr = librosa.load(filepath, sr=SAMPLE_RATE)
  y, sr = librosa.load(audio_path, sr=SAMPLE_RATE)


模板 0 的 DTW 距离: 3701.2806034088135
窗口 0.00s 的最小距离: 3701.2806034088135
Segment 形状: (129, 184), 模板形状: (129, 184)
Segment 时间范围: 0.010s - 2205.534s
模板 0 的 DTW 距离: 3173.3258361816406
窗口 0.01s 的最小距离: 3173.3258361816406
找到匹配：0.010秒, 距离: 3173.3258361816406
找到 1 个初步匹配项，开始去重...
去重完成，最终匹配数: 1
匹配结果已记录到: E:\mande\202503_PLAN\clips\2386208922\matches_record.txt
正在下载片段 1: 00:00:00.010 - 00:00:01.010
下载完成！
[twitch:vod] Extracting URL: https://www.twitch.tv/videos/2386208922
[twitch:vod] 2386208922: Downloading stream metadata GraphQL
[twitch:vod] 2386208922: Downloading video access token GraphQL
[twitch:vod] 2386208922: Downloading m3u8 information
[twitch:vod] 2386208922: Downloading storyboard metadata JSON
[info] v2386208922: Downloading 1 format(s): 1080p60
[info] v2386208922: Downloading 1 time ranges: 0.0-1.0
[download] E:\mande\202503_PLAN\clips\2386208922\2386208922_000000.010_000001.010.mp4 has already been downloaded

[download] 100% of   13.80KiB

共找到 1 个匹配片段，已下载至 E:\mande\202503_PLAN\clips

我写了一个python函数download_twitch(twitch_url, save_directory,start_time,end_time, select_audio=False)用于下载twitch的视频。你负责完成其他部分。有这些参数：twitch_url，
    audio_path：分析目标路径，
    template_folder里面有几个0.5秒以内的模板音频文件（格式也可以是mp4），是射击游戏中冲击炮弹发射的瞬间的脉冲声音（可能不算是完全高频，有点闷的冲击波的感觉），我想要识别这一特征然后检测另一个audio_path长音频文件，识别出现这种发射的片段，然后返回一个列表，记录这些片段的时间轴，每个片段0.8秒左右。在本地的output_folder文件夹新建一个数字的文件夹，数字是url最后一个/的后面的字符串，在这个文件夹里调用download_twitch下载视频。同时为了中断后也能保存，请在识别出片段时间戳时就在新建的文件夹里，写入一个txt文件实时保存时间戳列表。请给我完整的python函数，只用给我这个代码即可，详细的讲解用注释的方式写在代码中，谢谢你。

In [16]:
import os
import librosa
import numpy as np
from scipy.signal import correlate, find_peaks
# from matplotlib import pyplot as plt

def find_impact_segments(twitch_url, audio_path, template_folder, output_folder):
    # 从 twitch_url 中提取视频 ID
    video_id = twitch_url.split('/')[-1]
    print(f"提取的视频 ID: {video_id}")
    
    # 创建保存目录
    save_directory = os.path.join(output_folder, video_id)
    os.makedirs(save_directory, exist_ok=True)
    print(f"创建保存目录: {save_directory}")
    
    # 加载音频文件
    y, sr = librosa.load(audio_path, sr=None)
    print(f"加载音频文件: {audio_path}, 采样率: {sr}")
    
    # 获取模板文件列表
    template_files = [f for f in os.listdir(template_folder) if f.endswith(('.mp4'))]
    print(f"在 {template_folder} 中找到 {len(template_files)} 个模板文件: {template_files}")
    
    # 初始化时间戳列表
    segment_length = 100 * sr  # 每段100秒
    template_files = [f for f in os.listdir(template_folder) if f.endswith(('.mp4'))]
    detected_times = []
    
    # 保存时间戳到文件
    with open(os.path.join(save_directory, 'timestamps.txt'), 'w', encoding='utf-8') as f:
        # 分段进行 防止内存爆炸
        for template_file in template_files:
            template_path = os.path.join(template_folder, template_file)
            # 加载模板音频
            template, sr_template = librosa.load(template_path, sr=None)
            print(f"加载模板文件: {template_path}, 采样率: {sr_template}")
            
            # 检查采样率并重采样
            if sr_template != sr:
                template = librosa.resample(template, orig_sr=sr_template, target_sr=sr)
                print(f"模板重采样到目标采样率: {sr}")
            
            # 分段处理
            for start_idx in range(0, len(y), segment_length):
                end_idx = min(start_idx + segment_length, len(y))
                segment = y[start_idx:end_idx]
                segment_time_offset = start_idx / sr  # 段的起始时间

                # 计算互相关
                corr = correlate(segment, template, mode='valid')
                print(f"完成 {template_file} 的互相关计算，长度: {len(corr)}")
                
                # 计算模板能量和阈值
                template_energy = np.sum(template**2)
                threshold = 0.65 * template_energy # 关键参数 数字越大越严格
                print(f"模板 {template_file} 的能量: {template_energy}, 阈值: {threshold}")
            
                # 寻找峰值
                peaks, _ = find_peaks(corr, height=threshold, distance=10, prominence=0.1)  # 添加 distance=10
                # plt.plot(corr)
                # plt.axhline(y=threshold, color='r', linestyle='--')
                # plt.show()
                print(f"检测到 {len(peaks)} 个峰值")
            
                # 转换为时间戳并去重
                times = peaks / sr + segment_time_offset  # 加上段偏移时间
                unique_times = []
                for t in times:
                    # 只记录与上一个时间戳间隔大于 0.01 秒的时间戳
                    if not unique_times or abs(t - unique_times[-1]) > 0.1:
                        unique_times.append(t)
                        f.write(f"{seconds_to_hms(t)}\n")
                        detected_times.append(t)
                        print(f"检测到时间戳: {seconds_to_hms(t)}\n")
    
    print(f"总共检测到 {len(detected_times)} 个时间戳")
    return detected_times

def download_impact_segments(twitch_url, detected_times_path, save_directory, length=0.8):
    # 读取时间戳文件
    with open(detected_times_path, 'r', encoding='utf-8') as f:
        detected_times = [hms_to_seconds(line.strip()) for line in f]
    # 根据时间戳下载视频片段
    for i, start_time in enumerate(detected_times):
        end_time = start_time + length
        start_time_str = seconds_to_hms(start_time)
        end_time_str = seconds_to_hms(end_time)
        # 下载片段
        download_twitch(twitch_url, save_directory, start_time_str, end_time_str)
if __name__ == "__main__":
    twitch_url = "https://www.twitch.tv/videos/2386208922"
    audio_path = "E:\\mande\\202503_PLAN\\audio\\2386208922.mp4"
    template_folder = "E:\\mande\\202503_PLAN\\audio_template"
    output_folder = "E:\\mande\\202503_PLAN\\clips"
    detected_times_path = "E:\\mande\\202503_PLAN\\clips\\2386208922\\timestamps.txt"
    
    # print("开始检测冲击声音片段...")
    # detected_times = find_impact_segments(twitch_url, audio_path, template_folder, output_folder)
    # 创建保存目录（确保传递给 download_impact_segments）
    # save_directory = os.path.join(output_folder, twitch_url.split('/')[-1])
    # print("开始下载检测到的片段...")
    # download_impact_segments(twitch_url, detected_times_path, save_directory)
    # print("程序执行完成！")

In [14]:
# 处理问题视频
problem_folder = "E:\\mande\\202503_PLAN\\clips\\2386208922\\problem"
txt_path = "E:\\mande\\202503_PLAN\\clips\\2386208922\\timestamps2.txt"

def update_txt(problem_folder):
    problem_files = [f for f in os.listdir(problem_folder) if f.endswith(('.mp4'))]
    # 读取txt文件，加入list
    detected_times=[]
    with open(txt_path, 'r', encoding='utf-8') as f:
        detected_times = [line.strip() for line in f]
    # 写新的txt文件
    with open(txt_path+'.txt', 'w', encoding='utf-8') as f:
        for file in problem_files:
            start_time = file.split('_')[1]
            h = start_time[:2]
            m = start_time[2:4]
            s = start_time[4:6]
            ms = start_time[7:]
            # print('start_time:',start_time)
            start_time = f"{h.zfill(2)}:{m.zfill(2)}:{s.zfill(2)}.{ms.zfill(3)}"
            # print('start_time:',start_time)
            new_start_time = seconds_to_hms(int(hms_to_seconds(start_time)))
            detected_times = [new_start_time if t == start_time else t for t in detected_times]
        for t in detected_times:
            f.write(f"{t}\n")
update_txt(problem_folder)

In [None]:
def redownload_segments(twitch_url, output_folder, txt_path, length=1):
    print("开始检测冲击声音片段...")
    save_directory = os.path.join(output_folder, twitch_url.split('/')[-1])
    print("开始下载检测到的片段...")
    download_impact_segments(twitch_url, txt_path, save_directory, length)
    print("程序执行完成！")

# 使用示例
if __name__ == "__main__":
    twitch_url = "https://www.twitch.tv/videos/2386208922"
    output_folder = "E:\\mande\\202503_PLAN\\clips"
    txt_path = "E:\\mande\\202503_PLAN\\clips\\2386208922\\timestamps2.txt.txt"
    
    redownload_segments(twitch_url, output_folder, txt_path, length=1)

开始检测冲击声音片段...
开始下载检测到的片段...
2386208922_023123.928_023124.728.mp4 起始位置已存在，跳过下载。
2386208922_024103.337_024104.137.mp4 起始位置已存在，跳过下载。
2386208922_024143.077_024143.877.mp4 起始位置已存在，跳过下载。
2386208922_024157.876_024158.676.mp4 起始位置已存在，跳过下载。
2386208922_024245.978_024246.778.mp4 起始位置已存在，跳过下载。
2386208922_025335.256_025336.056.mp4 起始位置已存在，跳过下载。
2386208922_030107.148_030107.948.mp4 起始位置已存在，跳过下载。
2386208922_030110.000_030110.800.mp4 起始位置已存在，跳过下载。
2386208922_030223.049_030223.849.mp4 起始位置已存在，跳过下载。
2386208922_030417.895_030418.695.mp4 起始位置已存在，跳过下载。
2386208922_035315.206_035316.006.mp4 起始位置已存在，跳过下载。
2386208922_035315.438_035316.238.mp4 起始位置已存在，跳过下载。
2386208922_040919.303_040920.103.mp4 起始位置已存在，跳过下载。
2386208922_040922.913_040923.713.mp4 起始位置已存在，跳过下载。
2386208922_041030.000_041030.800.mp4 起始位置已存在，跳过下载。
2386208922_041030.000_041030.800.mp4 起始位置已存在，跳过下载。
2386208922_041220.000_041220.800.mp4 起始位置已存在，跳过下载。
2386208922_043112.959_043113.759.mp4 起始位置已存在，跳过下载。
2386208922_043125.772_043126.572.mp4 起始位置已存在，跳过下载。
238