In [1]:
import multiprocessing as mp
from pathlib import Path
import os
import json
import pandas as pd
from tqdm import tqdm
from lhotse import CutSet
from mylhotse.aishell2 import prepare_aishell2
import logging
from util import *
import numpy as np
import faiss, gc
from lhotse_util import from_strategy_to_cuts
from lhotse.cut import append_cuts
import librosa
from bitarray import bitarray

  from .autonotebook import tqdm as notebook_tqdm
Device set to use cuda:0


In [2]:
# directory paths to save audio and transcript files
IN_DIR = "../datasets/LongSpeechSource/iOS"
# IN_DIR = "/mnt/d/voicedata/CommenVoice/delta"
# directory paths to save metadata and processed aduio files
OUT_DIR = '../datasets/LongSpeech_p2'

In [3]:
config = json.load(open(os.path.join(OUT_DIR, 'metadata.json')))
AVG_DURATION = config['avg_duration']
SAMPLE_RATE = config['sample_rate']
OUT_FILE_NAME = config['source']
prev_amount = config['amount']
print(prev_amount)
task = "asr"

16124


In [4]:
manifests = prepare_aishell2(corpus_dir=IN_DIR, output_dir=OUT_DIR, num_jobs=15)

Process aishell2 audio, it takes about 55  minutes using 40 cpu jobs.:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:

cuts = CutSet()
for part in manifests.keys():
    rs = manifests[part]['validated']['recordings']
    ss = manifests[part]['validated']['supervisions']
    cut = CutSet.from_manifests(recordings=rs, supervisions=ss)
    cuts += cut

In [5]:
resampled_cuts = cuts.to_eager()
resampled_cuts.to_jsonl(os.path.join(OUT_DIR, "commonvoice_raw_cuts.jsonl"))

In [6]:
def build_feature(cuts: CutSet, batch_size: int = 100, dim: int = 384):
    cut_list = cuts.to_eager()
    n = len(cut_list)

    vec_mm = np.memmap(f"{OUT_DIR}/vecs.f32", dtype="float32", mode="w+", shape=(n, dim))
    dur_mm = np.memmap(f"{OUT_DIR}/durs.f32", dtype="float32", mode="w+", shape=(n,))

    string_ids = []

    ptr = 0
    for i in tqdm(range(0, n, batch_size), desc="Get Embedding"):
        cut_batch = cut_list[i:i+batch_size]

        texts = [c.supervisions[0].text if c.supervisions else "" for c in cut_batch]
        durations = [c.duration for c in cut_batch]
        string_ids.extend([c.id for c in cut_batch])

        vec_np = get_sentence_embeddings(texts).astype("float32")
        B = len(cut_batch)

        vec_mm[ptr:ptr+B] = vec_np
        dur_mm[ptr:ptr+B] = durations
        ptr += B

    vec_mm.flush(); dur_mm.flush()

    return vec_mm, dur_mm, string_ids

def build_hnsw_index(vec_mm: np.memmap,
                     dim: int = 384,
                     m: int = 32,
                     ef_c: int = 200,
                     n_threads: int = mp.cpu_count(),
                     out_path: str = "cache_hnsw.faiss"):

    faiss.omp_set_num_threads(n_threads)
    faiss.normalize_L2(vec_mm)

    index = faiss.IndexHNSWFlat(dim, m)
    index.hnsw.efConstruction = ef_c
    index.metric_type = faiss.METRIC_INNER_PRODUCT

    index.add(vec_mm)
    faiss.write_index(index, os.path.join(OUT_DIR,out_path))
    return os.path.join(OUT_DIR,out_path)

def get_speaker_embedding_ids(ids, neighs, cuts):
    """
    获取邻居的说话人ID
    Returns:
        speaker_embeddings: (batch_num, feature_dim)
    """
    speaker_embeddings = []
    for idx in neighs:
        if idx == -1:
            break
        real_id = ids[idx]
        cut_pth = cuts[real_id].recording.sources[0].source
        audio, sr = librosa.load(cut_pth)
        speaker_embeddings.append(get_speaker_embedding(audio, sr).flatten())

    spk_emb_np = np.array(speaker_embeddings)
    pc1 = PCA(n_components=1, svd_solver="auto").fit_transform(spk_emb_np).ravel()
    return np.argsort(pc1)

def greedy_cluster(index_path: str,
                   vec_mm: np.memmap,
                   dur_mm: np.memmap,
                   ids,
                   cuts,
                   bucket_min: int = 480,
                   bucket_avg: int = 600,
                   k_neigh: int = 256,
                   ef_s: int = 96):
    index = faiss.read_index(index_path)

    params = faiss.SearchParametersHNSW()
    params.efSearch = ef_s

    N = len(vec_mm)
    assigned = bitarray(N)
    assigned.setall(False)

    order = np.argsort(-dur_mm)
    buckets = []

    for seed in tqdm(order, desc="Clustering (Optimized)"):
        if assigned[seed]:
            continue

        cluster = []
        total_dur = 0

        unassigned_indices_list = assigned.search(bitarray('0'))
        unassigned_indices = np.fromiter(unassigned_indices_list, dtype=np.int64)


        if len(unassigned_indices) > 0:
            selector = faiss.IDSelectorArray(unassigned_indices)
            params.sel = selector

            _, neighs = index.search(vec_mm[seed : seed + 1], k_neigh, params=params)

            #speaker_order = get_speaker_embedding_ids(ids, neighs[0].tolist(), cuts)
            #print(speaker_order)

            for idx in neighs[0]:
                if idx == -1:
                    break
                if assigned[idx]:
                    print("Warning: Already assigned index", idx)
                    continue

                cluster.append(int(idx))
                assigned[idx] = True
                total_dur += dur_mm[idx]
                if total_dur >= bucket_avg:
                    break

            if total_dur < bucket_min:
                for i in cluster:
                    assigned[i] = False
            else:
                total_dur = dur_mm[cluster].sum()
                buckets.append((cluster, total_dur))

    final_buckets = [b for b in buckets if b[1] >= bucket_min]
    final_clusters = [c for c, _ in final_buckets]
    final_duration = sum(sec for _, sec in final_buckets)

    loss = 1 - final_duration / dur_mm.sum()
    print(f"桶数 {len(final_clusters)}, 最终时长 {final_duration:.2f}s, 总时长 {dur_mm.sum():.2f}s, 丢弃比例 {loss:.2%}")

    strategy = []
    for cluster in final_clusters:
        strategy.append([ids[i] for i in cluster])

    return strategy

In [7]:
"""
mock_strategy = [
    ["common_voice_en_43199993-0", "common_voice_en_42736613-1", "common_voice_en_42798328-2"],
    ["common_voice_en_43204215-3", "common_voice_en_42706055-4", "common_voice_en_43139615-5"]
]
"""

vec_mm, dur_mm, string_ids = build_feature(resampled_cuts)
index_path = build_hnsw_index(vec_mm)
real_strategy = greedy_cluster(index_path, vec_mm, dur_mm, string_ids, resampled_cuts)

In [8]:
def map_newid_cutset(cutset: CutSet, start_id: int = 0) -> CutSet:
    """
    Map the ids of a CutSet to a new id starting from start_id.
    """
    new_cuts = []
    for i, cut in enumerate(cutset):
        new_cut = cut.with_id(f"{start_id + i:06d}")
        new_cuts.append(new_cut)
    return CutSet.from_cuts(new_cuts), start_id + len(new_cuts)

def build_grouped_cuts(
        source_cuts: CutSet,
        strategy,
        start_id: int = 0
    ):

    src = {c.id: c for c in source_cuts}

    grouped = []
    next_id = start_id
    for cluster_ids in strategy:
        cuts = [src[cid].resample(SAMPLE_RATE) for cid in cluster_ids]
        merged = append_cuts(cuts).with_id(f"{next_id:06d}")
        grouped.append(merged)
        next_id += 1

    return CutSet.from_cuts(grouped), next_id

Get Embedding: 100%|██████████| 18557/18557 [06:38<00:00, 46.54it/s]
Clustering (Optimized): 100%|██████████| 1855619/1855619 [6:20:14<00:00, 81.33it/s]   


桶数 14783, 最终时长 8826442.81s, 总时长 9507185.00s, 丢弃比例 7.16%


In [9]:
#grouped_cuts = from_strategy_to_cuts(resampled_cuts.to_eager(), real_strategy)
#grouped_cuts , new_amount = map_newid_cutset(grouped_cuts, start_id=prev_amount)

grouped_cuts, new_amount = build_grouped_cuts(resampled_cuts, real_strategy, start_id=prev_amount)
grouped_cuts.to_jsonl(os.path.join(OUT_DIR, "grouped_raw_cuts.jsonl"))
print(new_amount)

In [10]:
def json_from_commonvoice_to_allaudios(one_cut, lang = "en"):
    """
    Convert a single Commonvoice json record to a list of LongSpeech metadata.
    """
    sources = []
    speakers = set()
    total_dur = 0
    transcripts = []
    slices = []

    for subcut in one_cut["tracks"]:
        total_dur += subcut["cut"]["duration"]
        full_pth = subcut["cut"]["recording"]["sources"][0]["source"]
        slices.append([subcut["cut"]["start"], subcut["cut"]["duration"]])
        sources.append(full_pth.split("clips")[-1])
        [speakers.add(s["speaker"]) for s in subcut["cut"]["supervisions"] if s["speaker"]]
        transcript_param = " ".join([s["text"] for s in subcut["cut"]["supervisions"] if s["text"]])
        if transcript_param != "":
            transcripts.append(transcript_param)
        else:
            print(subcut)

    return {
        "id": one_cut["id"],
        "source_ds": "CommonVoice",
        "duration_sec": total_dur,
        "audio_auto": False,
        "text_auto": False,
        "language": lang,
        "num_speakers": len(speakers),
        "num_switches": len(transcripts),
        "slice": slices,
        "transcribe": " ".join(transcripts),
        "components": sources,
    }


def convert_record(source_jsonl_path: str, target_jsonl_path: str, map_fn, lang: str):
    with open(source_jsonl_path, "r", encoding="utf-8") as src_f, \
         open(target_jsonl_path, "a", encoding="utf-8") as tgt_f:
        for line in src_f:
            item = json.loads(line)
            new_item = map_fn(item, lang)
            tgt_f.write(json.dumps(new_item, ensure_ascii=False) + "\n")


16071


In [11]:
def _save_one(cut, out_dir):

    os.environ["LHOTSE_AUDIO_DURATION_MISMATCH_TOLERANCE"] = "1.5"
    dst = Path(out_dir) / f"{cut.id}.wav"
    cut.save_audio(dst, format="wav")
    return cut.id





In [18]:
convert_record(os.path.join(OUT_DIR, "grouped_raw_cuts.jsonl"),
               os.path.join(OUT_DIR, OUT_FILE_NAME),
               json_from_commonvoice_to_allaudios, lang)


In [19]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from worker import save_one_worker

KeyboardInterrupt: 

In [24]:


def save_audios_from_cutset(cutset, out_dir, num_jobs=None):
    if num_jobs is None:
        num_jobs = os.cpu_count()

    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    context = mp.get_context("spawn")

    with ProcessPoolExecutor(max_workers=num_jobs, mp_context=context) as pool:
        futures = [
            pool.submit(save_one_worker, cut, out_dir) 
            for cut in tqdm(cutset, desc="1. 提交任务中")
        ]
        for _ in tqdm(
        as_completed(futures),
        total=len(futures),
        desc=f"Saving WAVs ({num_jobs} workers)"
        ):
            pass

In [27]:
#os.environ["LHOTSE_AUDIO_DURATION_MISMATCH_TOLERANCE"] =   "1.5"
mp.set_start_method('spawn', force=True)
save_audios_from_cutset(grouped_cuts, os.path.join(OUT_DIR, 'wavs'))

In [28]:
"""
def with_new_features(cuts: CutSet, batch_size = 100) -> CutSet:
    cutset_list = cuts.split_lazy(OUT_DIR, batch_size)
    new_cutset_list = []
    for i, cutset in enumerate(tqdm(cutset_list, desc="Processing cuts")):
        text_list = [cut.supervisions[0].text if cut.supervisions else "" for cut in cutset]
        id_list = [cut.id for cut in cutset]
        duration = [cut.duration for cut in cutset]
        semantic_np = get_sentence_embeddings(
            text_list
        )

        updated_cuts = []
        for cut, embedding in zip(cutset, semantic_np):
            cut = cut.with_custom("semantic_emb", embedding.tolist())  # 如果是 numpy array
            updated_cuts.append(cut)


        new_cutset_list.append(CutSet.from_cuts(updated_cuts))
    merged_cuts = combine(*new_cutset_list)
    return merged_cuts
"""

1. 提交任务中: 100%|██████████| 14783/14783 [00:00<00:00, 34360.30it/s]
Saving WAVs (12 workers):  34%|███▎      | 4977/14783 [4:01:01<5:27:28,  2.00s/it] 

In [1]:
from data_validator import DatasetValidator
data_validator = DatasetValidator('../datasets/LongSpeech_p2')
ill_waveforms = data_validator.get_ill_waveforms()
ill_waveforms

  from .autonotebook import tqdm as notebook_tqdm


qwq


 85%|████████▍ | 13607/16071 [00:56<00:11, 206.48it/s]

Error reading ../datasets/LongSpeech_p2/wavs/013562.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013563.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013564.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013565.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013566.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013567.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013568.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013569.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013570.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013571.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013572.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013573.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013574.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013575.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013576.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013577.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013578.wav: 
Error reading 

 85%|████████▍ | 13658/16071 [00:56<00:10, 227.79it/s]

Error reading ../datasets/LongSpeech_p2/wavs/013613.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013614.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013615.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013616.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013617.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013618.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013619.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013620.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013621.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013622.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013623.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013624.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013625.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013626.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013627.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013628.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013629.wav: 
Error reading 

 85%|████████▌ | 13682/16071 [00:56<00:11, 214.89it/s]

Error reading ../datasets/LongSpeech_p2/wavs/013664.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013665.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013666.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013667.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013668.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013669.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013670.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013671.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013672.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013673.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013674.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013675.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013676.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013677.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013678.wav: 
Error reading ../datasets/LongSpeech_p2/wavs/013679.wav: fmt chunk and/or data chunk missing
Error reading ../datasets/LongSpeech_

100%|██████████| 16071/16071 [02:51<00:00, 93.93it/s] 


['013562',
 '013563',
 '013564',
 '013565',
 '013566',
 '013567',
 '013568',
 '013569',
 '013570',
 '013571',
 '013572',
 '013573',
 '013574',
 '013575',
 '013576',
 '013577',
 '013578',
 '013579',
 '013580',
 '013581',
 '013582',
 '013583',
 '013584',
 '013585',
 '013586',
 '013587',
 '013588',
 '013589',
 '013590',
 '013591',
 '013592',
 '013593',
 '013594',
 '013595',
 '013596',
 '013597',
 '013598',
 '013599',
 '013600',
 '013601',
 '013602',
 '013603',
 '013604',
 '013605',
 '013606',
 '013607',
 '013608',
 '013609',
 '013610',
 '013611',
 '013612',
 '013613',
 '013614',
 '013615',
 '013616',
 '013617',
 '013618',
 '013619',
 '013620',
 '013621',
 '013622',
 '013623',
 '013624',
 '013625',
 '013626',
 '013627',
 '013628',
 '013629',
 '013630',
 '013631',
 '013632',
 '013633',
 '013634',
 '013635',
 '013636',
 '013637',
 '013638',
 '013639',
 '013640',
 '013641',
 '013642',
 '013643',
 '013644',
 '013645',
 '013646',
 '013647',
 '013648',
 '013649',
 '013650',
 '013651',
 '013652',

In [5]:
len(ill_waveforms)

129

In [8]:
grouped_cuts = CutSet.from_jsonl(os.path.join(OUT_DIR, "grouped_raw_cuts.jsonl"))

In [9]:
new_cut_set = grouped_cuts.filter(lambda cut: cut.id in ill_waveforms)

In [10]:
def save_audios_from_cutset(cutset, out_dir, num_jobs=1):
    """
    Save audios from a CutSet to the specified directory.
    """
    for cut in tqdm(cutset):
        cut.save_audio(os.path.join(out_dir, f"{cut.id}.wav"))

In [13]:
os.environ["LHOTSE_AUDIO_DURATION_MISMATCH_TOLERANCE"] =   "1.5"
save_audios_from_cutset(new_cut_set,os.path.join(OUT_DIR, 'wavs'))

100%|██████████| 129/129 [50:43<00:00, 23.60s/it]
