# Dataset type 1

整合关键词信息做语音识别

In [1]:
import json
import os
import pathlib

HOME_DIR = pathlib.Path(os.path.expanduser("~"))

def make_dataset(source_file, dst_file):
    with open(source_file, 'r', encoding='utf8') as f:
        data = json.load(f)
        
    INSTRUCTION_PROMPT = """请结合以下可能出现的关键词，做语音转文本。可能出现的关键词为：{keywords}"""
    # INSTRUCTION_PROMPT = """Transcibe speech to text according to keywords may appear in the utterance. Possible key words are: {keywords}"""
    # INSTRUCTION_PROMPT = """Transcript Speech to Text: """

    INPUT_WRAPPER = """<audio>{audio_path}</audio>{query}"""

    if "/data/rym/datasets/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/data/rym/datasets/OpenSLR"
    elif "/nfs/speech/corpus/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/nfs/speech/corpus/OpenSLR"
    else:
        raise ValueError("wrong input audio filepath")

    if "text" in data[0]:
        label_name = "text"
    elif "transcript" in data[0]:
        label_name = "transcript"
    else:
        raise ValueError("wrong label name")


    dataset_to_write = []
    for item in data:
        keywords_lst = [s.lower() for s in item['keywords']]
        query = INSTRUCTION_PROMPT.format(keywords=', '.join(keywords_lst))
        audio_path = item["audio_path"].replace(replace_path, str(HOME_DIR / "datasets/OpenSLR"))
        assert pathlib.Path(audio_path).exists()
        user_input = INPUT_WRAPPER.format(audio_path=audio_path, query=query)
        dataset_to_write.append(
            {
                "conversations":
                    [
                        {"from": "user", "value": user_input},
                        {"from": "assistant", "value": item[label_name]}
                    ]
            }
        )

    with open(dst_file, 'w', encoding='utf8') as f:
        json.dump(dataset_to_write, f, indent=2, ensure_ascii=False)
    
make_dataset("data/slidespeech_30k_filtered_train_en_instruction.json", "data/slidespeech_30k_filtered_train/train.json")
make_dataset("data/slidespeech_30k_filtered_train_en_instruction_dev.json", "data/slidespeech_30k_filtered_train/dev.json")
make_dataset("data/slidespeech_30k_filtered_train_en_instruction_test.json", "data/slidespeech_30k_filtered_train/test.json")
    

In [3]:
import json
import os
import pathlib

HOME_DIR = pathlib.Path(os.path.expanduser("~"))

def make_dataset(source_file, dst_file):
    with open(source_file, 'r', encoding='utf8') as f:
        data = json.load(f)
        
    # INSTRUCTION_PROMPT_WITH_KEYWORDS = """请结合以下可能出现的关键词，做语音转文本。可能出现的关键词为：{keywords}"""
    # INSTRUCTION_PROMPT_WITHOUT_KEYWORDS = """语音转文本"""
    INSTRUCTION_PROMPT_WITH_KEYWORDS = """Transcibe speech to text according to keywords may appear in the utterance. Possible keywords are: {keywords}"""
    INSTRUCTION_PROMPT_WITHOUT_KEYWORDS = """Transcribe speech to text"""
    # INSTRUCTION_PROMPT = """Transcript Speech to Text: """

    INPUT_WRAPPER = """<audio>{audio_path}</audio>{query}"""

    if "/data/rym/datasets/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/data/rym/datasets/OpenSLR"
    elif "/nfs/speech/corpus/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/nfs/speech/corpus/OpenSLR"
    else:
        raise ValueError("wrong input audio filepath")

    if "text" in data[0]:
        label_name = "text"
    elif "transcript" in data[0]:
        label_name = "transcript"
    else:
        raise ValueError("wrong label name")


    dataset_to_write = []
    for item in data:
        keywords_lst = [s.lower() for s in item['keywords']]
        if keywords_lst == [""]:
            query = INSTRUCTION_PROMPT_WITHOUT_KEYWORDS
        else:
            query = INSTRUCTION_PROMPT_WITH_KEYWORDS.format(keywords=', '.join(keywords_lst))
        audio_path = item["audio_path"].replace(replace_path, str(HOME_DIR / "datasets/OpenSLR"))
        assert pathlib.Path(audio_path).exists()
        user_input = INPUT_WRAPPER.format(audio_path=audio_path, query=query)
        dataset_to_write.append(
            {
                "conversations":
                    [
                        {"from": "user", "value": user_input},
                        {"from": "assistant", "value": item[label_name]}
                    ]
            }
        )

    with open(dst_file, 'w', encoding='utf8') as f:
        json.dump(dataset_to_write, f, indent=2, ensure_ascii=False)
    
make_dataset("data/slidespeech_30k_filtered_train_en_instruction.json", "data/slidespeech_30k_filtered_train_en_instruction/train.json")
make_dataset("data/slidespeech_30k_filtered_train_en_instruction_dev.json", "data/slidespeech_30k_filtered_train_en_instruction/dev.json")
make_dataset("data/slidespeech_30k_filtered_train_en_instruction_test.json", "data/slidespeech_30k_filtered_train_en_instruction/test.json")
    

# Dataset type 2

做关键词筛选

In [18]:
import json

with open("data/slidespeech_30k.json", 'r', encoding='utf8') as f:
    data = json.load(f)

with open("data/context_filter/slidespeech_30k_filter_keywords.json", 'r', encoding='utf8') as f:
    filtered_data = json.load(f)
    
for i, item in enumerate(data):
    if filtered_data[i]["filtered_keywords"] != [""]:
        item['keywords'] = filtered_data[i]["filtered_keywords"]
    else:
        item['keywords'] = [""]
    
with open("data/slidespeech_30k_filtered_train_en_instruction.json", 'w', encoding='utf8') as f:
    json.dump(data, f, indent=2, ensure_ascii=False)
    

In [1]:
import json
import os
import pathlib

HOME_DIR = pathlib.Path(os.path.expanduser("~"))

def make_keywords_filter_dataset(source_file, dst_file):
    with open(source_file, 'r', encoding='utf8') as f:
        data = json.load(f)
        
    # INSTRUCTION_PROMPT = """请结合以下可能出现的关键词，做语音转文本。可能出现的关键词为：{keywords}"""
    # INSTRUCTION_PROMPT = """Transcibe speech to text according to keywords may appear in the utterance. Possible key words are: {keywords}"""
    # INSTRUCTION_PROMPT = """Transcript Speech to Text: """
    INSTRUCTION_PROMPT = """Select key words that may appear in the speech from the following keywords list: {keywords}"""

    INPUT_WRAPPER = """<audio>{audio_path}</audio>{query}"""

    if "/data/rym/datasets/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/data/rym/datasets/OpenSLR"
    elif "/nfs/speech/corpus/OpenSLR" in data[0]["audio_path"]:
        replace_path = "/nfs/speech/corpus/OpenSLR"
    else:
        raise ValueError("wrong input audio filepath")


    dataset_to_write = []
    for item in data:
        keywords_lst = [s.lower() for s in item['keywords']]
        query = INSTRUCTION_PROMPT.format(keywords=', '.join(keywords_lst))
        audio_path = item["audio_path"].replace(replace_path, str(HOME_DIR / "datasets/OpenSLR"))
        assert pathlib.Path(audio_path).exists()
        user_input = INPUT_WRAPPER.format(audio_path=audio_path, query=query)
        dataset_to_write.append(
            {
                "conversations":
                    [
                        {"from": "user", "value": user_input},
                        {"from": "assistant", "value": ', '.join(item["filtered_keywords"]) if item["filtered_keywords"] != [""] else "none"}
                    ]
            }
        )

    with open(dst_file, 'w', encoding='utf8') as f:
        json.dump(dataset_to_write, f, indent=2, ensure_ascii=False)
    
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_test.json", "data/slidespeech_30k_filter_en_instruction/test.json")
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_dev.json", "data/slidespeech_30k_filter_en_instruction/dev.json")
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords.json", "data/slidespeech_30k_filter_en_instruction/train.json")
    
make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_test.json", "data/slidespeech_L95_filter_en_instruction/test.json")
make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_dev.json", "data/slidespeech_L95_filter_en_instruction/dev.json")
make_keywords_filter_dataset("data/context_filter/slidespeech_L95_filter_keywords.json", "data/slidespeech_L95_filter_en_instruction/train.json")
    

In [2]:
import json

with open("data/slidespeech_L95_all.json", 'r', encoding='utf8') as f:
    data = json.load(f)

dataset_to_write = []
for item in data:
    filtered_keywords = []
    for k in item["keywords"]:
        if k.lower() in item["transcript"].split():
            filtered_keywords.append(k.lower())
    item["keywords"] = [k.lower() for k in item["keywords"]]
    item["filtered_keywords"] = filtered_keywords

with open("data/context_filter/slidespeech_L95_filter_keywords.json", 'w', encoding='utf8') as f:
    json.dump(dataset_to_write, f,indent=2, ensure_ascii=False)
    


# Dataset type 3

做关键词筛选+语音识别的多任务

In [6]:
import json
import os
import pathlib

HOME_DIR = pathlib.Path(os.path.expanduser("~"))

def make_keywords_filter_dataset(keywords_file, transcript_file, dst_file):
    with open(keywords_file, 'r', encoding='utf8') as f:
        kw_data = json.load(f)
    with open(transcript_file, 'r', encoding='utf8') as f:
        transcript_data = json.load(f)
        
    # INSTRUCTION_PROMPT = """请结合以下可能出现的关键词，做语音转文本。可能出现的关键词为：{keywords}"""
    # INSTRUCTION_PROMPT = """Transcibe speech to text according to keywords may appear in the utterance. Possible key words are: {keywords}"""
    # INSTRUCTION_PROMPT = """Transcript Speech to Text: """
    # INSTRUCTION_PROMPT = """Select key words that may appear in the speech from the following keywords list: {keywords}"""
    INSTRUCTION_PROMPT = """First select keywords that may appear in the speech from given keywords list. Then Transcribe speech to text according to selected keywords. Keywords are: {keywords}"""
    RESPONSE_FORMAT = """Selected keywords are: {keywords}.\nTranscription: {transcript}"""
    
    INPUT_WRAPPER = """<audio>{audio_path}</audio>{query}"""

    if "/data/rym/datasets/OpenSLR" in kw_data[0]["audio_path"]:
        replace_path = "/data/rym/datasets/OpenSLR"
    elif "/nfs/speech/corpus/OpenSLR" in kw_data[0]["audio_path"]:
        replace_path = "/nfs/speech/corpus/OpenSLR"
    else:
        raise ValueError("wrong input audio filepath")

    if "text" in transcript_data[0]:
        label_name = "text"
    elif "transcript" in data[0]:
        label_name = "transcript"
    else:
        raise ValueError("wrong label name")

    dataset_to_write = []
    for i in range(len(kw_data)):
        kw_item = kw_data[i]
        transcript_item = transcript_data[i]
        keywords_lst = [s.lower() for s in kw_item['keywords']]
        query = INSTRUCTION_PROMPT.format(keywords=', '.join(keywords_lst))
        audio_path = kw_item["audio_path"].replace(replace_path, str(HOME_DIR / "datasets/OpenSLR"))
        assert pathlib.Path(audio_path).exists()
        user_input = INPUT_WRAPPER.format(audio_path=audio_path, query=query)
        filtered_keywords = ', '.join(kw_item["filtered_keywords"]) if kw_item["filtered_keywords"] != [""] else "none"
        label = RESPONSE_FORMAT.format(keywords=filtered_keywords, transcript=transcript_item[label_name])
        dataset_to_write.append(
            {
                "conversations":
                    [
                        {"from": "user", "value": user_input},
                        {"from": "assistant", "value": label}
                    ]
            }
        )


    with open(dst_file, 'w', encoding='utf8') as f:
        json.dump(dataset_to_write, f, indent=2, ensure_ascii=False)
    
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_test.json", "data/slidespeech_30k_filter_en_instruction/test.json")
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_dev.json", "data/slidespeech_30k_filter_en_instruction/dev.json")
# make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords.json", "data/slidespeech_30k_filter_en_instruction/train.json")
    
make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_test.json", "data/slidespeech_test.json", "data/slidespeech_30k_multitask_train_en_instruction/test.json")
make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords_dev.json", "data/slidespeech_dev.json", "data/slidespeech_30k_multitask_train_en_instruction/dev.json")
make_keywords_filter_dataset("data/context_filter/slidespeech_30k_filter_keywords.json", "data/slidespeech_30k.json", "data/slidespeech_30k_multitask_train_en_instruction/train.json")
    

In [20]:
import string
import json

def remove_punctuation(text):
    # 创建一个翻译表，用于移除所有标点符号
    translator = str.maketrans('', '', ''.join(set(string.punctuation) - set("'")))
    # 使用translate方法移除标点符号
    return text.translate(translator)

def process_jsonl(input_file, output_file):
    dataset_to_write = []
    
    with open(input_file, 'r', encoding='utf-8') as f_in:
        for line in f_in.readlines():
            data_item = json.loads(line)
            clean_text = remove_punctuation(data_item['text'])
            data_item['text'] = clean_text
            dataset_to_write.append(data_item)
    
    with open(output_file, 'w', encoding='utf8') as f_out:
        json.dump(dataset_to_write, f_out, indent=2, ensure_ascii=False)
            
# 调用函数并传入文件路径
# input_file_path = '/nfs/speech/yxzhang/modelscope-agent/modelscope-agent/slidespeech/workflow_test_3.jsonl'
# output_file_path = '/nfs/speech/yxzhang/modelscope-agent/modelscope-agent/slidespeech/baseline_result/vasr_result/no_punc/workflow_test_3.txt'
input_file_path = '/data/ymrong/Projects/ms-swift/data/slidespeech_dev.jsonl'
output_file_path = '/data/ymrong/Projects/ms-swift/data/slidespeech_dev.json'
process_jsonl(input_file_path, output_file_path)

In [14]:
import pathlib
import json
import codecs
import re


def convert_jsonl_to_txt(result_file):
    if isinstance(result_file, str):
        result_file = pathlib.Path(result_file)
    result_dir = result_file.parents[0]
        
    audio_paths = []
    hyps = []
    refs = []
    with result_file.open(mode='rt', encoding='utf8') as f:
        for line in f.readlines():
            data_item = json.loads(line)
            audio_paths.append(re.match(pattern=r"<audio>(.*?)</audio>", string=data_item["query"]).group(1))
            hyps.append(data_item["response"])
            refs.append(data_item["label"])
    
    hyp_writer =  codecs.open(str(result_dir / "test.hyp"), mode='w', encoding='utf8')
    ref_writer = codecs.open(str(result_dir / "test.ref"), mode='w', encoding='utf8')
    for i, item in enumerate(audio_paths):
        hyp_writer.write(f'{item} {hyps[i]}' + '\n')
        ref_writer.write(f'{item} {refs[i]}' + '\n')
        
    hyp_writer.close()
    ref_writer.close()

result_file = pathlib.Path("/data/ymrong/output/qwen2-audio-7b-instruct/v11-20241011-105159/checkpoint-1450/infer_result/20241011-154337.jsonl")
convert_jsonl_to_txt(result_file)

hyp_file = result_file.parents[0] / "test.hyp"
ref_file = result_file.parents[0] / "test.ref"
wer_file = result_file.parents[0] / "test.wer"
import subprocess

result = subprocess.run(["python", "/data/ymrong/Projects/wenet/tools/compute-wer.py", str(ref_file), str(hyp_file), ">", str(wer_file)], capture_output=True, text=True)
print(result.stdout)


In [28]:
import string
import json
import re

def contains_chinese(text):
    pattern = re.compile(r'[\u4e00-\u9fff]')
    return bool(pattern.search(text))

def remove_punctuation(text):
    # 创建一个翻译表，用于移除所有标点符号
    translator = str.maketrans('', '', ''.join(set(string.punctuation) - set("'")))
    # 使用translate方法移除标点符号
    return text.translate(translator)

def process_jsonl(input_file, output_file):
    dataset_to_write = []
    
    with open(input_file, 'r', encoding='utf-8') as f_in:
        for line in f_in.readlines():
            data_item = json.loads(line)
            clean_text = remove_punctuation(data_item['text'])
            data_item['text'] = clean_text
            dataset_to_write.append(data_item)
    
    with open(output_file, 'w', encoding='utf8') as f_out:
        json.dump(dataset_to_write, f_out, indent=2, ensure_ascii=False)

hyps = []
refs = []
wavs = []
with open("qwen2-audio-instruct-result/20241014-214929.jsonl", 'r', encoding='utf8') as f:
    for line in f.readlines():
        data_item = json.loads(line)
        if contains_chinese(data_item["response"]):
            continue
        if "The transcript of the speech is:" in data_item["response"]:
            hyp = data_item["response"].split(":")[1]
            hyp = remove_punctuation(hyp)
            hyps.append(hyp.strip("' "))
            refs.append(data_item["label"])
            wavs.append(re.match(r"<audio>(.*?)</audio>", data_item["query"]).group(1))

with open("qwen2-audio-instruct-result/test.hyp", 'w', encoding='utf8') as f:
    for i, h in enumerate(hyps):
        f.write(wavs[i] + '\t' + h + '\n')
with open("qwen2-audio-instruct-result/test.ref", 'w', encoding='utf8') as f:
    for i, r in enumerate(refs):
        f.write(wavs[i] + '\t' + r + '\n')

In [10]:
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.preprocessing import MultiLabelBinarizer
import json

labels = []
preds = []
with open("/data/ymrong/output/slidespeech_30k_filter_lora_en_instruction/qwen2-audio-7b-instruct/v6-20241021-112621/checkpoint-449-merged/infer_result/20241021-122310.jsonl", 'r', encoding='utf8') as f:
    for line in f.readlines():
        l = json.loads(line)
        labels.append(l['label'])
        preds.append(l['response'])
        
labels = [item.split(',') for item in labels]
preds = [item.split(',') for item in preds]

mlb = MultiLabelBinarizer()
y_true = mlb.fit_transform(labels)
y_pred = mlb.transform(preds)

precision = precision_score(y_true, y_pred, average='micro')
recall = recall_score(y_true, y_pred, average='micro')
f1 = f1_score(y_true, y_pred, average='micro')

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
        

