### 测试case

#### 加载模型

In [1]:
import torch 
from modelscope import AutoModelForCausalLM,AutoTokenizer 
import os

model_path = '/root/autodl-tmp/LLaMA-Factory-main/models/llama3_lora_mask'
model = AutoModelForCausalLM.from_pretrained(model_path,torch_dtype=torch.float16,device_map ="cuda")
tokenizer = AutoTokenizer.from_pretrained(model_path,torch_dtype=torch.float16,device_map ="cuda") 

Loading checkpoint shards:   0%|          | 0/9 [00:00<?, ?it/s]

In [2]:
from data_utils import convert_pitches_to_numbers,div_zone,pitch2zone,cal_zone_range
from metrics import cal_md

#### 自定义采样策略

In [3]:
# 自定义采样策略，采样三个候选项并记录每步的概率和累积概率
def custom_sampling(logits, num_samples=3, temperature=0.8, top_p=0.9):
    # 应用 temperature 和 top-p 策略
    logits = logits / temperature
    sorted_logits, sorted_indices = torch.sort(logits, descending=True)
    cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
    sorted_indices_to_remove = cumulative_probs > top_p
    sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
    sorted_indices_to_remove[:, 0] = 0  # 保留至少一个 token

    # 抑制被过滤的 token
    filtered_logits = logits.clone()
    filtered_logits[:,sorted_indices[sorted_indices_to_remove]] = -torch.inf

    # 在过滤后的分布上采样三个 token
    probabilities = torch.softmax(filtered_logits, dim=-1)
    sampled_tokens = torch.multinomial(probabilities, num_samples=num_samples, replacement=False)
    
    # 获取对应 token 的概率
    sampled_probs = probabilities.gather(1, sampled_tokens)
    
    return sampled_tokens, sampled_probs


In [4]:

# 自定义生成函数
def custom_generate(input_ids,true_zones,zone_bound,lmt, max_new_tokens=256, eos_token_id=None):
    generated_ids = input_ids
    total_probabilities = []  # 记录每步的候选项及其概率信息

    cnt_yinfu=0
    for step in range(max_new_tokens):
        # 获取当前步的 logits
        outputs = model(generated_ids)
        logits = outputs.logits[:, -1, :]

        # 使用自定义采样策略获取3个候选项及其概率
        next_tokens, token_probs = custom_sampling(logits, num_samples=3)
        
        # 记录当前步候选项及其概率
        step_probabilities = []
        for i in range(3):
            token_id = next_tokens[0, i].item()
            prob = token_probs[0, i].item()
            step_probabilities.append((token_id, prob))
        
        total_probabilities.append(step_probabilities)

        # 在这里加一下限制，选择一个符合或者比较符合音区的，继续生成序列
        has_yinfu=0
        for i in range(3):
            temp_chosen = next_tokens[0, i].unsqueeze(0) #确保能选到是音符
            if eos_token_id is not None and temp_chosen == eos_token_id: #终止符号优先级最高  第一个字符都是空格
                chosen_token = temp_chosen
                break
            '''
            if step==0:
                chosen_token=torch.tensor([220]).cuda()#空格
                break
            '''
            ls=convert_pitches_to_numbers(tokenizer.decode(temp_chosen))
            if(len(ls)==0):#不是音符
                if i==2 and has_yinfu==0:# 
                    chosen_token=torch.tensor([220]).cuda()#空格
                    break
                continue #直接跳过  3个可选，后面肯定会遇到有音符的
            has_yinfu=1
            chosen_token=temp_chosen
            chosen_pitch=ls[0] #选择的音高
            zone_id=true_zones[cnt_yinfu]#应该填这个音区
            pitch_range=cal_zone_range(zone_bound,zone_id)
            if(pitch_range[0]-lmt<= chosen_pitch <=pitch_range[1]+lmt):
                #print("第{}次接受".format(i))
                cnt_yinfu+=1
                break #选到了，可以提前结束
            

        generated_ids = torch.cat([generated_ids, chosen_token.unsqueeze(0)], dim=1)

        # 如果生成了 eos_token 则终止
        if eos_token_id is not None and chosen_token == eos_token_id:
            break

    '''
    # 输出每步的候选项及其概率
    for step, candidates in enumerate(total_probabilities):
        print(f"Step {step + 1}:")
        for token_id, prob in candidates:
            token_str = tokenizer.decode([token_id])
            print(f"  Token: {token_str}, Probability: {prob:.4f}")
        
        # 计算并输出总的累积概率（仅选择第一个候选项的路径）
        if step == 0:
            cumulative_prob = total_probabilities[0][0][1]
        else:
            cumulative_prob *= total_probabilities[step][0][1]
        print(f"  Cumulative Probability up to this step: {cumulative_prob:.4f}\n")
    '''

    return generated_ids

In [5]:
def inference(msg,true_zones,zone_bound,lmt):
    # 将消息模板转换为模型输入格式
    prompt = tokenizer.apply_chat_template(msg, tokenize=False, add_generation_prompt=True)
    # 定义终止条件
    terminators = [
        tokenizer.eos_token_id,
        tokenizer.convert_tokens_to_ids("<|eot_id|>")
    ]

    # 对 prompt 进行编码
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")


    # 使用自定义生成函数
    outputs = custom_generate(
        input_ids=input_ids,
        true_zones=true_zones,
        zone_bound=zone_bound,
        lmt=lmt,
        max_new_tokens=256,
        eos_token_id=tokenizer.eos_token_id,
    )

    # 解码并打印生成的文本
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    #print("\nGenerated Text:\n", generated_text)

    return generated_text

#### 读取case数据 测试

In [98]:
import json
with open("case4_italy.json", 'r') as infile:
    song_pitch_list = json.load(infile)


In [99]:
song_pitch_list

[{'pitch': [62, 62, 66, 69, 74, 77, 74, 69]},
 {'pitch': [67, 71, 66, 69, 64, 66, 67, 65]},
 {'pitch': [62, 62, 66, 69, 74, 77, 74, 69]},
 {'pitch': [71, 74, 69, 62, 74, 74, 69, 69]},
 {'pitch': [69, 66, 62, 64, 66, 67, 69, 66]},
 {'pitch': [67, 64, 60, 62, 64, 66, 67, 65]},
 {'pitch': [65, 67, 65, 69, 66, 69]},
 {'pitch': [65, 64, 60, 74, 74, 69]},
 {'pitch': [69, 66, 69, 69, 66, 69]},
 {'pitch': [65, 64, 60, 69, 65, 67]},
 {'pitch': [65, 67, 65, 69, 66, 69]},
 {'pitch': [65, 64, 60, 74, 74, 69]}]

In [100]:
lyrics=[
    "随时有大事件发生",
    "活像是我们的护荫",
    "无比韵律清新口吻",
    "不必记愁归家最好",
    "即使离别也觉清楚",
    "今晚能聚众一起好",
    "如我还可用心",
    "独有愉快音讯",
    "跟随风吹笛声",
    "放下烦恼畅泳",
    "无数人得到奖",
    "沿着愉快轨进"
]

### 歌词转化为音区序列

In [91]:
import os
import json
import zhconv
# 返回某个字对应的声调
def tone(x : str) -> int:
    #utils_dir = os.path.dirname(os.path.abspath(__file__))
    #fp = utils_dir + "/tone.json"
    fp="tone.json"
    with open(fp,'r',encoding='utf-8') as jf:
        tone_js = json.load(jf)
    # 词典中有些字只有繁体字没有简体 以防万一先转化
    if x == '.' or x == ',' or x == '。' or x == '，': 
        return 0
    value = tone_js.get(x)
    if value :
        return int(value)
    else:
        x = zhconv.convert(x,"zh-hant")
        value = tone_js.get(x)
        if value :
            return int(value)
        else:
            return 0
        

# 返回某个声调对应的音高
def tone2zone(x : int) -> int:
    tone_md = {0: 0, 1: 4, 2:4, 3: 3, 4: 1, 5: 3, 6: 2,-1: 0}
    return tone_md[x]

In [101]:
lyrics_zone_list = [[tone2zone(tone(char)) for char in line] for line in lyrics]
lyrics_zone_list

[[1, 1, 2, 2, 2, 2, 3, 4],
 [2, 2, 2, 3, 1, 2, 2, 3],
 [1, 1, 2, 2, 4, 4, 4, 3],
 [4, 4, 3, 1, 4, 4, 3, 3],
 [4, 3, 1, 2, 3, 3, 4, 4],
 [4, 3, 1, 2, 3, 4, 4, 3],
 [1, 3, 1, 4, 2, 4],
 [2, 2, 1, 3, 4, 3],
 [4, 1, 4, 4, 2, 4],
 [3, 2, 1, 3, 3, 2],
 [1, 3, 1, 4, 3, 4],
 [2, 2, 1, 3, 4, 3]]

In [102]:
map_0243={1:0,2:2,3:4,4:3}
lyrics_0243_list = [[map_0243[char] for char in line] for line in lyrics_zone_list]
lyrics_0243_list

[[0, 0, 2, 2, 2, 2, 4, 3],
 [2, 2, 2, 4, 0, 2, 2, 4],
 [0, 0, 2, 2, 3, 3, 3, 4],
 [3, 3, 4, 0, 3, 3, 4, 4],
 [3, 4, 0, 2, 4, 4, 3, 3],
 [3, 4, 0, 2, 4, 3, 3, 4],
 [0, 4, 0, 3, 2, 3],
 [2, 2, 0, 4, 3, 4],
 [3, 0, 3, 3, 2, 3],
 [4, 2, 0, 4, 4, 2],
 [0, 4, 0, 3, 4, 3],
 [2, 2, 0, 4, 3, 4]]

#### 处理成指定输入输出格式

In [11]:
from data_utils import msk_pitch_str
import random

### 检测不和谐的位置，mask

In [86]:
def detect_msk_pos(song_pitch_list,lyrics_zone_list):
    #mask_index=[]
    #计算原来歌曲的分区  需要填写的音区
    zone_bound=div_zone(song_pitch_list)
    song_ori_zones=[pitch2zone(p,zone_bound) for p in song_pitch_list]#旋律原本的音区

    print("song_ori_zones",[map_0243[i] for i in song_ori_zones])
    print("lyrics_zone_list",[map_0243[i] for i in lyrics_zone_list])
    #print("song_ori_zones",song_ori_zones)
    #print("lyrics_zone_list",lyrics_zone_list)
    
    mask_index = [i for i in range(min(len(song_ori_zones), len(lyrics_zone_list))) if song_ori_zones[i] != lyrics_zone_list[i]]

    return mask_index


In [103]:
for i,record in enumerate(song_pitch_list):
    mask_line_idx=detect_msk_pos(record['pitch'],lyrics_zone_list[i])
    print(mask_line_idx)
    print()

song_ori_zones [0, 0, 2, 2, 3, 3, 3, 2]
lyrics_zone_list [0, 0, 2, 2, 2, 2, 4, 3]
[4, 5, 6, 7]

song_ori_zones [2, 3, 2, 4, 0, 2, 2, 0]
lyrics_zone_list [2, 2, 2, 4, 0, 2, 2, 4]
[1, 7]

song_ori_zones [0, 0, 2, 2, 3, 3, 3, 2]
lyrics_zone_list [0, 0, 2, 2, 3, 3, 3, 4]
[7]

song_ori_zones [3, 3, 4, 0, 3, 3, 4, 4]
lyrics_zone_list [3, 3, 4, 0, 3, 3, 4, 4]
[]

song_ori_zones [3, 4, 0, 2, 4, 4, 3, 4]
lyrics_zone_list [3, 4, 0, 2, 4, 4, 3, 3]
[7]

song_ori_zones [3, 4, 0, 2, 4, 3, 3, 4]
lyrics_zone_list [3, 4, 0, 2, 4, 3, 3, 4]
[]

song_ori_zones [0, 4, 0, 3, 2, 3]
lyrics_zone_list [0, 4, 0, 3, 2, 3]
[]

song_ori_zones [2, 2, 0, 3, 3, 4]
lyrics_zone_list [2, 2, 0, 4, 3, 4]
[3]

song_ori_zones [3, 0, 3, 3, 0, 3]
lyrics_zone_list [3, 0, 3, 3, 2, 3]
[4]

song_ori_zones [4, 2, 0, 3, 4, 3]
lyrics_zone_list [4, 2, 0, 4, 4, 2]
[3, 5]

song_ori_zones [0, 4, 0, 3, 2, 3]
lyrics_zone_list [0, 4, 0, 3, 4, 3]
[4]

song_ori_zones [2, 2, 0, 3, 3, 4]
lyrics_zone_list [2, 2, 0, 4, 3, 4]
[3]



In [59]:

def generate_mask_msg(record,lyrics_line_zone):
  
    pitch = record["pitch"]

    pitch_length = len(pitch)
    
    mask_index=detect_msk_pos(pitch,lyrics_line_zone)

    input_str=msk_pitch_str(pitch,mask_index)

    # 定义指令和用户输入
    msg_tpl = [
        {"role": "system", "content": "你是一个专业的作曲家"},
        {"role": "instruction",
         "content": "请你根据给定的旋律，填写旋律中[M]的内容使前后连贯，注意输出的output的长度要和[M]的个数严格匹配，下面有{}个[M]".format(len(mask_index))}
    ]

    # 用户音高序列输入
    user_input = input_str
    msg = msg_tpl + [{"role": "user", "content": user_input}]

    return msg,mask_index

In [None]:
def generate_mask_data(record,lyrics_line_zone):  
    mask_record={}
    msg,mask_index=generate_mask_msg(record,lyrics_line_zone)
    mask_record['mask_idx']=mask_index

    #计算音区的分区  需要填写的音区
    zone_bound=div_zone(record['pitch'])
    mask_true_ls=[record['pitch'][i] for i in mask_index]
    ori_zones=[pitch2zone(p,zone_bound) for p in mask_true_ls]#旋律原本的音区
    true_zones=[lyrics_line_zone[p] for p in mask_index]

    mask_record['require_zones']=true_zones
    mask_record['msg']=msg

    return mask_record

In [104]:
mask_data=[]

for i,record in enumerate(song_pitch_list):
    mask_record=generate_mask_data(record,lyrics_zone_list[i])
    mask_data.append(mask_record)


song_ori_zones [0, 0, 2, 2, 3, 3, 3, 2]
lyrics_zone_list [0, 0, 2, 2, 2, 2, 4, 3]
song_ori_zones [2, 3, 2, 4, 0, 2, 2, 0]
lyrics_zone_list [2, 2, 2, 4, 0, 2, 2, 4]
song_ori_zones [0, 0, 2, 2, 3, 3, 3, 2]
lyrics_zone_list [0, 0, 2, 2, 3, 3, 3, 4]
song_ori_zones [3, 3, 4, 0, 3, 3, 4, 4]
lyrics_zone_list [3, 3, 4, 0, 3, 3, 4, 4]
song_ori_zones [3, 4, 0, 2, 4, 4, 3, 4]
lyrics_zone_list [3, 4, 0, 2, 4, 4, 3, 3]
song_ori_zones [3, 4, 0, 2, 4, 3, 3, 4]
lyrics_zone_list [3, 4, 0, 2, 4, 3, 3, 4]
song_ori_zones [0, 4, 0, 3, 2, 3]
lyrics_zone_list [0, 4, 0, 3, 2, 3]
song_ori_zones [2, 2, 0, 3, 3, 4]
lyrics_zone_list [2, 2, 0, 4, 3, 4]
song_ori_zones [3, 0, 3, 3, 0, 3]
lyrics_zone_list [3, 0, 3, 3, 2, 3]
song_ori_zones [4, 2, 0, 3, 4, 3]
lyrics_zone_list [4, 2, 0, 4, 4, 2]
song_ori_zones [0, 4, 0, 3, 2, 3]
lyrics_zone_list [0, 4, 0, 3, 4, 3]
song_ori_zones [2, 2, 0, 3, 3, 4]
lyrics_zone_list [2, 2, 0, 4, 3, 4]


In [105]:
mask_data

[{'mask_idx': [4, 5, 6, 7],
  'require_zones': [2, 2, 3, 4],
  'msg': [{'role': 'system', 'content': '你是一个专业的作曲家'},
   {'role': 'instruction',
    'content': '请你根据给定的旋律，填写旋律中[M]的内容使前后连贯，注意输出的output的长度要和[M]的个数严格匹配，下面有4个[M]'},
   {'role': 'user', 'content': ' <D4> <D4> <F#4> <A4> [M] [M] [M] [M]'}]},
 {'mask_idx': [1, 7],
  'require_zones': [2, 3],
  'msg': [{'role': 'system', 'content': '你是一个专业的作曲家'},
   {'role': 'instruction',
    'content': '请你根据给定的旋律，填写旋律中[M]的内容使前后连贯，注意输出的output的长度要和[M]的个数严格匹配，下面有2个[M]'},
   {'role': 'user', 'content': ' <G4> [M] <F#4> <A4> <E4> <F#4> <G4> [M]'}]},
 {'mask_idx': [7],
  'require_zones': [3],
  'msg': [{'role': 'system', 'content': '你是一个专业的作曲家'},
   {'role': 'instruction',
    'content': '请你根据给定的旋律，填写旋律中[M]的内容使前后连贯，注意输出的output的长度要和[M]的个数严格匹配，下面有1个[M]'},
   {'role': 'user', 'content': ' <D4> <D4> <F#4> <A4> <D5> <F5> <D5> [M]'}]},
 {'mask_idx': [],
  'require_zones': [],
  'msg': [{'role': 'system', 'content': '你是一个专业的作曲家'},
   {'role': 'instruction',
 

#### 模型推理

In [None]:
import random
import re
#阈值
max_md=1
max_lmt=2
start_lmt=0
max_cnt=5


new_data=[]

for num,record in enumerate(song_pitch_list):
    new_record=dict()
    cnt=0 #记录生成了多少次
    #msg,mask_index=generate_mask_msg(record)
    msg=mask_data[num]['msg']
    mask_index=mask_data[num]['mask_idx']

    new_record['pitch']=record['pitch']
    new_record['mask_idx']=mask_index

    # 不用polish的句子跳过
    if len(mask_index)==0:
        continue

    print("第{}个polish".format(num))

    lmt=start_lmt
    #计算音区的分区  需要填写的音区
    zone_bound=div_zone(record['pitch'])
    mask_true_ls=[record['pitch'][i] for i in mask_index]
    ori_zones=[pitch2zone(p,zone_bound) for p in mask_true_ls]#旋律原本的音区
    true_zones=mask_data[num]['require_zones']

    #开始多次生成，直到满足条件或到达次数
    while 1:
        if cnt>=max_cnt: 
            break
        cnt+=1
        new_record[cnt]=dict()

        output=inference(msg,true_zones,zone_bound,lmt)
        output=re.search(r'(?<=assistant)(.*)', output, re.DOTALL).group(1).strip()
        mask_ls=convert_pitches_to_numbers(output)

        new_record[cnt]['output']=mask_ls
        

        #填充mask构造回原来的序列
        ori_ls=[record['pitch']]
        polish_ls=[[i for i in record['pitch']]]

        index_error=0
        for i,idx in enumerate(mask_index):
            try: 
                polish_ls[0][idx]=mask_ls[i]
            except IndexError:
                print("IndexError，ignore this data")
                index_error=1
                break
        if index_error==1:
            continue #跳过继续
            
        new_record[cnt]['polish']=polish_ls[0]

        md=cal_md(polish_ls,ori_ls)
        new_record[cnt]['md']=md

        
        polish_zones=[pitch2zone(p,zone_bound) for p in mask_ls]
        

        if md<max_md:
            new_record['accept']=True
            print("mask位置",mask_index)
            print("原pitch序列",record['pitch'])
            print("现pitch序列",polish_ls)
            print("原错的音区序列",[map_0243[i] for i in ori_zones])
            print("要求的音区序列",[map_0243[i] for i in true_zones])
            print("修改后的音区序列",[map_0243[i] for i in polish_zones])
            print()
            break

        #放宽lmt
        if cnt%2==0: #2轮加一次，每次加2
            if lmt+(cnt)/2*2<=max_lmt:
                lmt=lmt+(cnt)/2*2


    if cnt==max_cnt:
        new_record['accept']=False

    new_data.append(new_record)
    #print("第{}个完成".format(num))



第0个polish
第1个polish
mask位置 [1, 7]
原pitch序列 [67, 71, 66, 69, 64, 66, 67, 65]
现pitch序列 [[67, 69, 66, 69, 64, 66, 67, 66]]
原错的音区序列 [3, 0]
要求的音区序列 [2, 4]
修改后的音区序列 [4, 2]

第2个polish
mask位置 [7]
原pitch序列 [62, 62, 66, 69, 74, 77, 74, 69]
现pitch序列 [[62, 62, 66, 69, 74, 77, 74, 73]]
原错的音区序列 [2]
要求的音区序列 [4]
修改后的音区序列 [4]

第4个polish
mask位置 [7]
原pitch序列 [69, 66, 62, 64, 66, 67, 69, 66]
现pitch序列 [[69, 66, 62, 64, 66, 67, 69, 65]]
原错的音区序列 [4]
要求的音区序列 [3]
修改后的音区序列 [2]

第7个polish
第8个polish
mask位置 [4]
原pitch序列 [69, 66, 69, 69, 66, 69]
现pitch序列 [[69, 66, 69, 69, 67, 69]]
原错的音区序列 [0]
要求的音区序列 [2]
修改后的音区序列 [2]

第9个polish
第10个polish
mask位置 [4]
原pitch序列 [65, 67, 65, 69, 66, 69]
现pitch序列 [[65, 67, 65, 69, 67, 69]]
原错的音区序列 [2]
要求的音区序列 [4]
修改后的音区序列 [4]

第11个polish
mask位置 [3]
原pitch序列 [65, 64, 60, 74, 74, 69]
现pitch序列 [[65, 64, 60, 72, 74, 69]]
原错的音区序列 [3]
要求的音区序列 [4]
修改后的音区序列 [3]

