In [1]:
from openai import OpenAI
import configparser
import gradio as gr
import os
import pandas as pd
from datetime import datetime
import json

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
MODEL = "qwq-plus"#"qwen3-235b-a22b"
key = os.getenv("DASHSCOPE_API_KEY")
CLIENT = OpenAI(
    # 若没有配置环境变量，请用百炼API Key将下行替换为：api_key="sk-xxx",
    api_key=key,
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

In [None]:
class Character:
    def __init__(self,name:str,prompt:str,initial:str=""):
        self.prompt = prompt
        self.name = name
        self.initial = initial
        self.history = [{
            "role":"system",
            "content":self.prompt,
            "current_seq":-1
        }]
        self.current_seq = -1
        if len(initial)>0:
            self.current_seq += 1
            self.history.append({"role":"assistant","content":initial,"current_seq":self.current_seq})
        self.last_output = initial
        self.used_tokens = 0

    def build_ipt_history(self):
        res = []
        for i in self.history:
            res.append({"role":i["role"],"content":i["content"]})
        return res
        
    def chat_to(self,s:str,enable_search:bool=False,stream:bool=False,temperature:float=0.3):
        self.history.append({"role":"user","content":s,"current_seq":self.current_seq})
        self.current_seq += 1
        extra_body = {}
        stream_kwargs = {}
        if enable_search:
            extra_body["enable_search"]=True
        if stream:
            stream_kwargs["stream"]=True
            stream_kwargs["stream_options"]={"include_usage": True}
        else:
            extra_body["enable_thinking"]=False
        stream_kwargs["extra_body"]=extra_body
        completion = CLIENT.chat.completions.create(
            model=MODEL,
            messages=self.build_ipt_history(),
            temperature=temperature,
            **stream_kwargs
        )
        if stream:
            res = ''
            for chunk in completion:
                try:
                    tmp = json.loads(chunk.model_dump_json())
                    res+=tmp["choices"][0]['delta']['content']
                except Exception as e:
                    continue
            try:
                self.used_tokens += json.loads(chunk.model_dump_json())["usage"]["total_tokens"]
            except Exception as e:
                print(e)
                self.used_tokens = 0
        else:
            res = completion.choices[0].message.content
            try:
                self.used_tokens += completion.usage.prompt_tokens+completion.usage.completion_tokens
            except Exception as e:
                print(e)
                self.used_tokens = 0
        self.last_output = res
        self.history.append({"role":"assistant","content":res,"current_seq":self.current_seq})
        
        return res
    
class ChatBetweenAI:
    def __init__(self,*args):
        self.npcs = {}
        self.initial_npc = None #注意：initial_npc是先“发话”的那个人，他初始化时不应该有initial参数
        for i in args:
            if type(i)==Character:
                self.npcs[i.name] = i
                if len(i.initial)>0:
                    self.initial_npc = i.name
        self.history = []
        self.change_log = []
        self.TOKEN_FOR_SPLIT_DIFFERENT_NPC ="<split_for_different_npc>"
        self.TOKEN_FOR_SPLIT_NPC_AND_CONTENT = "<split_for_npc_and_content>"
        self.TOKEN_FOR_SPLIT_NPC_BEFORE_CHANGE = "<split_for_npc_and_begore_change>"
        self.TOKEN_FOR_SPLIT_CHANGE_BEFORE_AND_AFTER = "<split_for_change_before_and_after>"
        self.seq_history_length_dict = {}

    def get_used_token(self):
        return sum([i.used_tokens for i in self.npcs.values()])
    
    def argue(self,rnd=5,enable_search:bool=False,stream:bool=False,temperature:float=0.3):
        """
        两个AI交流
        暂时只支持2个NPC
        """
        if len(self.npcs.keys())==2: # first:第一个说话的人， second:第二个说话的人，第一个传入AI进行推理的人
            first = self.npcs[self.initial_npc]
            second_name = [i for i in list(self.npcs.keys()) if i != self.initial_npc][0]
            second = self.npcs[second_name]
            # 第一轮，first不用传入任何对话
            if len(self.history)==0:
                self.history.append({"role":self.initial_npc,"content":first.initial,"role_seq":first.current_seq})
                res = second.chat_to(first.initial)
                self.history.extend([
                    {"role":second.name,"content":res,"role_seq":second.current_seq}
                ])
                self.seq_history_length_dict[0]=len(self.history)
                rnd -= 1
            for i in range(rnd):
                self.oneRoundArgueBetweenTwoAI(first,second,enable_search,stream,temperature)
                
    def oneRoundArgueBetweenTwoAI(self,first:Character,second:Character,enable_search:bool=False,stream:bool=False,temperature:float=0.3):
        first_ipt = second.last_output
        first_output = first.chat_to(first_ipt,enable_search,stream,temperature)
        second_output = second.chat_to(first_output,enable_search,stream,temperature)
        self.history.extend([
            {"role":first.name,"content":first_output,"role_seq":first.current_seq},
            {"role":second.name,"content":second_output,"role_seq":second.current_seq}
        ])
        self.seq_history_length_dict[second.current_seq]=len(self.history)
    
    def change_history(self,idx:int,s:str):
        if self.history[idx]["role"] == "system":
            assert "不能修改历史prompt！"
        origin = self.history[idx]["content"]
        self.history[idx]["content"] = s
        origin_role = self.history[idx]["role"]
        self.change_log.append({"role":origin_role,"origin":origin,"updated":s,"role_seq":self.history[idx]["role_seq"]})
        # 修改Character对象中地历史
        role_seq = self.history[idx]["role_seq"]
        npc_idx = [i for i,j in enumerate(self.npcs[origin_role].history) if j["role"]=="assistant"][role_seq]
        self.npcs[origin_role].history[npc_idx]["content"] = s
        if len(self.npcs[origin_role].history) == npc_idx+1:
            self.npcs[origin_role].last_output = s
        # 非initial的npc需要改“user”
        if self.initial_npc == origin_role:
            for npc in self.npcs.keys():
                if npc!=self.initial_npc:
                    ipt_idx = [i for i,j in enumerate(self.npcs[npc].history) if j["role"]=="user"][role_seq]
                    self.npcs[npc].history[ipt_idx]["content"] = s
                    
                    

    def truncate(self,last_seq:int):
        """
        暂时只支持2个NPC
        最后一个部分为“initial_npc”的发言

        顺序：initial_npc(current_seq=0)-> second(current_seq=0)->(一轮开始)first(current_seq=1)->second(current_seq=1)(一轮结束)
        """
        if self.npcs[self.initial_npc].current_seq<=last_seq:
            return
        self.history = self.history[0:self.seq_history_length_dict[last_seq]]
        for _,i in self.npcs.items():
            i.history = [n for n in i.history if n["current_seq"]<=last_seq and not (n["current_seq"]==last_seq and n["role"]!="assistant")]
            i.last_output = [n for n in i.history if n["role"]=="assistant"][-1]["content"]
            i.current_seq = last_seq

    def add_system_prompt(self,prompt_dict:dict):
        for k,v in prompt_dict.items():
            if k not in self.npcs.keys():
                assert f"{k}不存在！"
        for k,v in prompt_dict.items():
            self.npcs[k].history.append({"role":"system","content":v,"current_seq":self.npcs[k].current_seq})

    def history_to_txt(self,history):
        # import和export还需加入current_seq
        content = self.TOKEN_FOR_SPLIT_DIFFERENT_NPC.join([self.TOKEN_FOR_SPLIT_NPC_AND_CONTENT.join([str(i) for i in i.values()]) for i in history]) # 我不能确定AI输出的内容是否会有特殊字符，故而使用特殊字符进行分割
        return content

    def change_log_2_txt(self):
        content = []
        for k,d in enumerate(self.change_log):
            role = d["role"]
            before = d["origin"]
            after = d["updated"]
            role_seq = d["role_seq"]
            content.append(role+self.TOKEN_FOR_SPLIT_NPC_BEFORE_CHANGE+before+self.TOKEN_FOR_SPLIT_CHANGE_BEFORE_AND_AFTER+after+self.TOKEN_FOR_SPLIT_CHANGE_BEFORE_AND_AFTER+str(role_seq)) # 我不能确定AI输出的内容是否会有特殊字符，故而使用特殊字符进行分割
        return self.TOKEN_FOR_SPLIT_DIFFERENT_NPC.join(content)
    
    def txt_2_change_log(self,s:str):
        if len(s)==0:
            return []
        content = s.split(self.TOKEN_FOR_SPLIT_DIFFERENT_NPC)
        res = []
        for i in content:
            tmp = i.split(self.TOKEN_FOR_SPLIT_NPC_BEFORE_CHANGE)
            role = tmp[0]
            chage_before_and_after = tmp[1].split(self.TOKEN_FOR_SPLIT_CHANGE_BEFORE_AND_AFTER)
            change_before = chage_before_and_after[0]
            change_after = chage_before_and_after[1]
            role_seq = chage_before_and_after[2]
            res.append({"role":role,"origin":change_before,"updated":change_after,"role_seq":role_seq})
        return res
    
    def export_history(self,name:str=""):
        export_file_name = "_".join([i for i in self.npcs.keys()])
        if len(name) > 0:
            export_file_name = name
        else:
            export_file_name += datetime.now().strftime("%Y%m%d%H%M%S")
        content = self.history_to_txt(self.history)
        with open(export_file_name+".txt","w",encoding="utf-8") as f:
            f.write(content)

    def export_npc(self,name:str=""):
        export_file_name = "_".join([i for i in self.npcs.keys()])
        if len(name) > 0:
            export_file_name = name
        else:
            export_file_name += datetime.now().strftime("%Y%m%d%H%M%S")
        res_data = pd.DataFrame(columns=["prompt","name","initial","history","last_output"])
        for _,v in self.npcs.items():
            tmp = pd.DataFrame(pd.Series({"prompt":v.prompt,"name":v.name,"initial":v.initial,"history":self.history_to_txt(v.history),"last_output":v.last_output})).T
            res_data = pd.concat([res_data,tmp]).reset_index(drop=True)
        res_data.to_csv(export_file_name+".csv",index=None,encoding="utf-8")

    def export_change_log(self,name:str=""):
        export_file_name = "_".join([i for i in self.npcs.keys()])
        if len(name) > 0:
            export_file_name = name
        else:
            export_file_name += datetime.now().strftime("%Y%m%d%H%M%S")
        content = self.change_log_2_txt()
        with open(export_file_name+".txt","w",encoding="utf-8") as f:
            f.write(content)

    def export(self,filename:str):
        self.export_npc(filename+"_NPC")
        self.export_history(filename+"_history")
        self.export_change_log(filename+"_change_log")
        
    def txt_to_history(self,s:str,tpe:str = "Chat"):
        if len(s)==0:
            return []
        res = s.split(self.TOKEN_FOR_SPLIT_DIFFERENT_NPC)
        res_list = []
        for i in res:
            tmp = i.split(self.TOKEN_FOR_SPLIT_NPC_AND_CONTENT)
            if tpe == "Chat":
                res_list.append({"role":tmp[0],"content":tmp[1],"role_seq":int(tmp[2])})
            else:
                res_list.append({"role":tmp[0],"content":tmp[1],"current_seq":int(tmp[2])})
        return res_list

    def import_history(self,filename:str):
        with open(filename,encoding="utf-8") as f:
            s = f.read()
        # history备份
        #self.export_history()
        self.history = []
        try:
            self.history=self.txt_to_history(s)
            for i,x in enumerate(self.history):
                self.seq_history_length_dict[x["role_seq"]]=i+1
        except Exception as e:
            assert f"格式错误:{e}"
        
    def import_npc(self,filename:str):
        # npc备份
        #self.export_npc()
        npc_df = pd.read_csv(filename,encoding="utf-8")
        npc_df = npc_df.fillna("")
        for i in npc_df.index:
            npc_para = dict(npc_df.iloc[i,:])
            self.npcs[npc_para["name"]] = Character(name=npc_para["name"],prompt=npc_para["prompt"],initial=npc_para["initial"])
            self.npcs[npc_para["name"]].last_output = npc_para["last_output"]
            self.npcs[npc_para["name"]].history = self.txt_to_history(npc_para["history"],tpe="NPC")
            self.npcs[npc_para["name"]].current_seq = max([i["current_seq"] for i in self.npcs[npc_para["name"]].history if i["role"]=="assistant"])
            if len(npc_para["initial"])>0:
                self.initial_npc = npc_para["name"]

    def import_change_log(self,filename:str):
        #self.export_change_log()
        with open(filename,encoding="utf-8") as f:
            data = f.read()
        self.change_log = self.txt_2_change_log(data)
    def import_(self,filename:str):
        NPC_file = filename+"_NPC.csv"
        history_file = filename+"_history.txt"
        changelog_file = filename+"_change_log.txt"
        if not (os.path.exists(NPC_file) and os.path.exists(history_file) and os.path.exists(changelog_file)):
            assert "缺失存档，请检查存档名称！"
        self.import_npc(NPC_file)
        self.import_change_log(changelog_file)
        self.import_history(history_file)

    def export_dialogue_to_csv(self,file_name:str):
        step = len(self.npcs.keys())
        res = pd.DataFrame(columns=list(self.npcs.keys()))
        idx = 0
        while idx<len(self.history):
            this_round = self.history[idx:idx+step]
            tmp = {k:None for k in self.npcs.keys()}
            for i in this_round:
                tmp[i["role"]]=i["content"]
            res = pd.concat([res,pd.DataFrame(pd.Series(tmp)).T],axis=0)
            idx+=step
        res.to_csv(file_name+".csv",index=False,encoding="utf-8")

In [23]:
The_BOSS = Character(name="The BOSS",prompt="",initial="")
Skull_Face = Character(name="Skull Face",prompt="",initial="Hello")
Chat = ChatBetweenAI(The_BOSS,Skull_Face)

In [24]:
Chat.import_("SAV")

In [25]:
Chat.export_dialogue_to_csv("Dialogue_result")

In [1]:
import os
import dashscope
from dashscope.audio.tts_v2 import VoiceEnrollmentService,SpeechSynthesizer