In [5]:
# 安装依赖
!pip install inaSpeechSegmenter 'https://github.com/Numenorean/ShazamAPI/archive/master.zip' loguru zhconv
!apt-get install aria2 ffmpeg

ERROR: Invalid requirement: "'https://github.com/Numenorean/ShazamAPI/archive/master.zip'"
'apt-get' 不是内部或外部命令，也不是可运行的程序
或批处理文件。


In [6]:
# 下载原始录播，自己找想要的录播网下载即可。这里只是示例
!wget https://github.com/nilaoda/BBDown/releases/download/1.5.4/BBDown_1.5.4_20221019_linux-x64.zip
!unzip BBDown_1.5.4_20221019_linux-x64.zip
!chmod +x BBDown
!./BBDown https://www.bilibili.com/video/BV1tY411r7GU/ --use-aria2c -F '<ownerMid>'

'wget' 不是内部或外部命令，也不是可运行的程序
或批处理文件。
'unzip' 不是内部或外部命令，也不是可运行的程序
或批处理文件。
'chmod' 不是内部或外部命令，也不是可运行的程序
或批处理文件。
系统找不到指定的文件。


In [1]:
import gc
import subprocess
from math import ceil
from os import makedirs, rename, listdir, system, remove
from os.path import basename, splitext, dirname, exists, join, isfile
from tempfile import gettempdir

import ShazamAPI
from inaSpeechSegmenter import Segmenter
from keras.backend import clear_session
from loguru import logger
from zhconv import convert

MAX_SEGMENT_TIME = 5400


@logger.catch
def timestamp2sec(timestamp):
    timestamp = timestamp.split(':')
    timestamp.reverse()
    seconds = 0
    for i in range(len(timestamp)):
        seconds += int(float(timestamp[i])) * pow(60, i)
    return seconds


@logger.catch
def get_length(filename: str) -> str:
    if not filename:
        return '0'
    result = subprocess.run(' '.join([
        'ffprobe',
        '-v',
        'error',
        '-sexagesimal',
        '-show_entries',
        'format=duration',
        '-of',
        'default=noprint_wrappers=1:nokey=1',
        f'"{filename}"'
    ]),
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        shell=True)
    return str(result.stdout)[2:-5]


@logger.catch
def get_length_using_copied_audio(filename: str):
    temp_audio_file = join(
        gettempdir(),
        'get_length_acodec_temp.mp4'
    )
    try:
        remove(temp_audio_file)
    except OSError:
        pass
    subprocess.call(' '.join([
        'ffmpeg',
        '-i',
        f'"{filename}"',
        '-reset_timestamps',
        '1',
        '-vn',
        '-acodec',
        'copy',
        temp_audio_file,
    ]),
        shell=True)
    result = get_length(temp_audio_file)
    remove(temp_audio_file)
    return result


@logger.catch
def get_segment_process_length_array(filename: str) -> list:
    file_length = timestamp2sec(get_length(filename))
    if file_length == 0:
        logger.warning(f'直接使用 ffprobe 获取 {filename} 长度失败。')
        logger.warning('尝试使用 ffmpeg 处理后重新获取长度，该步骤需要一定的时间，请耐心等待。')
        file_length = timestamp2sec(get_length_using_copied_audio(filename))
    if MAX_SEGMENT_TIME > file_length:
        return [[None, None]]
    logger.info(f'当前文件长度为 {file_length}，大于最大处理长度，将分成小段处理。')
    result = [[x * MAX_SEGMENT_TIME, (x + 1) * MAX_SEGMENT_TIME] for x in
              range(ceil(file_length / MAX_SEGMENT_TIME))]
    result[0][0] = None
    result[-1][1] = None
    return result


@logger.catch
def segment_wrapper(media: str, batch_size: int = 512, energy_ratio: float = 0.02) -> list:
    result = []
    for i in get_segment_process_length_array(media):
        result += segment(media, batch_size, energy_ratio, start_sec=i[0], stop_sec=i[1])
        gc.collect()
        clear_session()
    return result


@logger.catch
def segment(media: str, batch_size: int = 32, energy_ratio: float = 0.02, start_sec: int = None, stop_sec: int = None):
    logger.info(f'开始为 {media} 分段，起止时间为 {start_sec} - {stop_sec}。')

    if start_sec:
        start_sec -= 600

    return Segmenter(
        vad_engine='sm',  # 'smn': 'speech', 'music', 'noise' (better) ; 'sm': 'speech', 'music'
        detect_gender=False,  # 性别确认
        energy_ratio=energy_ratio,  # ?
        batch_size=batch_size  # 根据显卡情况决定
    )(media, start_sec=start_sec, stop_sec=stop_sec)


@logger.catch
def extract_music(
        segmentation,  # 切片信息
        music_segment_threshold: int = 60,  # 合并前音乐切片的最短时间
        segment_connect: int = 3,  # 两个分割小于这个时间则合并分割
        music_segment_threshold_final: int = 90,  # 合并后音乐切片的最短时间
):
    if segmentation is None:
        logger.warning('切片信息为空，可能在分割的时候发生了异常，请检查。')
        return
    # 将结果中被认为没有声音且在 2s 内的片段分割的切分合并。
    for i in range(len(segmentation) - 2, 0, -1):
        if segmentation[i][0] == 'noEnergy' and \
                segmentation[i][2] - segmentation[i][1] < 2 and \
                segmentation[i - 1][0] == segmentation[i + 1][0]:
            segmentation[i - 1] = (segmentation[i - 1][0], segmentation[i - 1][1], segmentation[i + 1][2])

    r = list(filter(lambda x: x[0] == 'music' and x[2] - x[1] > music_segment_threshold, segmentation))

    # 合并过短的分割
    for i in range(len(r) - 1, 0, -1):
        if r[i][1] - r[i - 1][2] < segment_connect:
            r[i - 1] = (r[i - 1][0], r[i - 1][1], r[i][2])
            del r[i]

    rf = list(map(lambda x: (x[0], max(0, x[1] + 1), x[2] + 2), filter(lambda x: x[2] - x[1] > music_segment_threshold_final, r)))
    return [
        [
            f'{str(int(x[1] // 3600)).zfill(2)}:{str(int(x[1] % 3600 // 60)).zfill(2)}:{str(int(x[1] % 60)).zfill(2)}',
            f'{str(int(x[2] // 3600)).zfill(2)}:{str(int(x[2] % 3600 // 60)).zfill(2)}:{str(int(x[2] % 60)).zfill(2)}',
        ] for x in rf
    ]


@logger.catch
def extract_mah_stuff(
        media,
        segmented_stamps,
        result_ext=None,  # 如果不提供则保留原始格式
        output_dir=None  # 如果不提供则使用原始文件所在目录下的 segmented
):
    if segmented_stamps is None:
        return

    if output_dir is None:
        output_dir = join(dirname(media), 'segmented')
    if not exists(output_dir):
        makedirs(output_dir)
    logger.info(f'将分割结果写入到 {output_dir}。')
    filename = basename(media)
    filename_without_ext, file_ext = splitext(filename)
    if result_ext is None:
        result_ext = file_ext

    logger.info(f'共 {len(segmented_stamps)} 段内容被识别。')
    for i in range(len(segmented_stamps)):
        system(
            f'ffmpeg -ss {segmented_stamps[i][0]} '
            f'-to {segmented_stamps[i][1]} '
            f'-i "{media}" '
            f'-c:v copy '
            f'-c:a copy '
            f'"{join(output_dir, filename_without_ext)}_{i}{result_ext}"'
        )
    return output_dir




@logger.catch
def shazam(mp3, stop_at_first_match=True):
    logger.info(f'开始识别 {mp3} 。')
    recognize_generator = ShazamAPI.Shazam(
        open(mp3, 'rb').read(),
        #lang='cn',
        #time_zone='Asia/Shanghai'
    ).recognizeSong()

    matches = []
    try:
        while True:
            match = next(recognize_generator)
            if match[1].get('matches') and len(match[1].get('matches')) > 0 and match[1].get('track'):
                matches.append(match)
                if stop_at_first_match: raise StopIteration()
    except StopIteration:
        pass
    return matches


@logger.catch
def legalize_filename(file_name):
    return file_name.replace(':', ' ').replace('"', '').replace(r'/', '').replace(r'?', '').replace(r'*', '')


@logger.catch
def shazam_title(match):
    return legalize_filename(match[1]['track']['title']) + '_' + legalize_filename(match[1]['track']['subtitle'])


@logger.catch
def recognize_song(song_dir: str, dist_dir: str):
    if song_dir is None:
        return
    if not exists(dist_dir):
        makedirs(dist_dir)
    logger.info(f'移动结果到 {dist_dir}。')
    for file in listdir(song_dir):
        file_path = join(song_dir, file)
        if not isfile(file_path):
            continue
        recognize_result = shazam(file_path)
        if recognize_result:
            title = convert(shazam_title(recognize_result[0]), 'zh-cn')
            logger.info(f'识别结果为: {title} 。')
            filename = basename(file_path)
            filename_without_ext, file_ext = splitext(filename)
            rename(file_path, join(dist_dir, filename_without_ext) + '_' + title + file_ext)
        else:
            logger.warning(f'识别失败。')

In [2]:
# 切割，将这里的文件名修改为自己下载的文件名字。名字里不要有中文、标点等。
# raw_file_path = [r'D:\lubo\527.mp4', r'D:\lubo\330.mp4', r'D:\lubo\331.mp4'] #理论上可以无限加
raw_file_path = [r'D:\lubo\527.mp4'] # 修改为列表，并用反斜杠表示路径
seg_out_dir = r'.\convert2music' 
recognized_dir = r'.\recognized'

for path in raw_file_path:
    extracted_info = extract_music(segment_wrapper(path, batch_size=512))
    logger.info(f'分段情况：{extracted_info}。')
    this_seg_out_dir = extract_mah_stuff(path, extracted_info, output_dir=seg_out_dir)
    recognize_song(this_seg_out_dir, recognized_dir)

[32m2023-06-08 18:46:24.846[0m | [1mINFO    [0m | [36m__main__[0m:[36msegment[0m:[36m104[0m - [1m开始为 D:\lubo\527.mp4 分段，起止时间为 None - None。[0m
  return np.vstack(
  return np.vstack(
  data = (data - np.mean(data, axis=1).reshape((len(data), 1))) / np.std(data, axis=1).reshape((len(data), 1))
  x = asanyarray(arr - arrmean)


280/280 - 92s - 92s/epoch - 330ms/step


[32m2023-06-08 18:48:15.529[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1m分段情况：[['00:04:59', '00:09:08'], ['00:09:27', '00:13:56'], ['00:15:20', '00:19:04'], ['00:21:31', '00:24:03'], ['00:25:37', '00:30:04'], ['00:37:48', '00:41:19'], ['00:42:12', '00:46:23'], ['00:46:49', '00:50:13'], ['00:52:15', '00:54:20']]。[0m
[32m2023-06-08 18:48:15.530[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract_mah_stuff[0m:[36m165[0m - [1m将分割结果写入到 .\convert2music。[0m
[32m2023-06-08 18:48:15.530[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract_mah_stuff[0m:[36m171[0m - [1m共 9 段内容被识别。[0m
[32m2023-06-08 18:48:17.278[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecognize_song[0m:[36m223[0m - [1m移动结果到 .\recognized。[0m
[32m2023-06-08 18:48:17.278[0m | [1mINFO    [0m | [36m__main__[0m:[36mshazam[0m:[36m188[0m - [1m开始识别 .\convert2music\527_0.mp4 。[0m
[32m2023-06-08 18:48:23.642[0m | [1mINFO    [0m | [36m__main__[0m:[36mrecogn

In [None]:
times = [['00:00:00', '00:02:34'], 
         ['00:04:30', '00:06:19'], 
         ['00:07:34', '00:11:57'], 
         ['00:13:53', '00:18:21'], 
         ['00:19:47', '00:21:29'], 
         ['00:23:21', '00:25:28'], 
         ['00:26:41', '00:31:18'], 
         ['00:33:27', '00:37:51'], 
         ['00:38:45', '00:41:51'],
         ['00:42:23', '00:45:52'], 
         ['00:47:21', '00:52:39'], 
         ['00:52:53', '00:57:45'], 
         ['00:59:18', '01:02:38'], 
         ['01:05:54', '01:10:11'], 
         ['01:10:40', '01:13:32'], 
         ['01:14:13', '01:18:36'],
         ['01:24:47', '01:27:39'], 
         ['01:24:47', '01:27:39'], 
         ['01:28:23', '01:32:24'], 
         ['01:32:50', '01:37:24'],
         ['01:39:14', '01:43:12'], ['01:44:16', '01:47:57'], ['01:49:48', '01:52:20'], ['01:57:11', '01:59:18'], ['01:59:23', '02:02:54'], ['02:03:50', '02:07:26'], ['02:15:49', '02:20:30'], ['02:21:55', '02:26:20'], ['02:29:40', '02:34:11'], ['02:34:23', '02:38:53'], ['02:40:30', '02:44:06'], ['02:47:57', '02:50:11']]
this_seg_out_dir = extract_mah_stuff(raw_file_path, times, output_dir=seg_out_dir)
recognize_song(this_seg_out_dir, recognized_dir)

In [None]:
# 将切片挪到谷歌网盘，然后从谷歌网盘下载。  https://drive.google.com/drive/my-drive
from google.colab import drive
drive.mount('/content/drive')

!mkdir /content/drive/MyDrive/594461
!mv /content/recognized /content/drive/MyDrive/594461
!mv /content/convert2music /content/drive/MyDrive/594461