<a href="https://colab.research.google.com/github/njucs/notebook/blob/master/LMM_Design_Patterns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## LLM Design Patterns
- 模型响应太慢
    - 分层缓存 - Layered Caching
- 需要与已有系统整合
    - 工具利用 - Tool Use
    - 规则融合 - Blending Rules Based & Generative
- 问题过于复杂
    - 多步规划 - Planning
    - 多重解析 - Discriminate→Re-Compose→ReSolve→Re-Decompose (Dr3)
    - 记忆认知- Memory Cognition For LLM's
- 需要集合多个视角解决问题
    - 专家集成 - Multiplexing AI Agents For A Panel Of Experts
    - 群体智能 - Swarm Of Generative AI Agents
- 多功能性和适应性不足
    - 多任务微调 - Fine-Tuning LLM's For Multiple Tasks
    - 模块化组合 - Modular Monolith LLM Approach With Composability
- 输出质量和用户体验不可控
    - 知识库驱动 - Utilizing Knowledge Graphs with LLM's
    - 双模评估 - Red & Blue Team Dual-Model Evaluation

### 分层缓存 Layered Caching
![图片描述](https://drive.google.com/uc?id=1XntvJMky5BuZwp16Xo_-YlUJBkinHNH0)

In [3]:
class Cache:
    def __init__(self):
        self.cache = {}

    def check(self, query):
        # 检查缓存中是否存在该查询
        return query in self.cache

    def store(self, query, result):
        # 将查询结果存储到缓存中
        self.cache[query] = result

    def get(self, query):
        # 从缓存中获取查询结果
        return self.cache.get(query)

class LLM:
    def process(self, query):
        # 模拟 LLM 处理查询的过程
        if "初始" in query:
            return "这是初始查询的结果"
        elif "后续" in query:
            return "这是后续查询的结果"
        else:
            return "这是一个新查询的结果"

def process_query(query, cache, llm):
    # 初始查询直接使用强大的LLM处理
    if not cache.check(query):
        result = llm.process(query)
        cache.store(query, result)
        return result

    # 后续查询先检查缓存
    if cache.check(query):
        return cache.get(query)

    # 新查询使用专有的小LLM处理
    result = llm.process(query)
    cache.store(query, result)
    return result

# 主程序
if __name__ == "__main__":
    # 初始化缓存和LLM
    cache = Cache()
    llm = LLM()

    # 处理查询
    query = "初始查询"
    result = process_query(query, cache, llm)
    print("初始查询结果:", result)

    query = "后续查询"
    result = process_query(query, cache, llm)
    print("后续查询结果:", result)

    query = "新查询"
    result = process_query(query, cache, llm)
    print("新查询结果:", result)

初始查询结果: 这是初始查询的结果
后续查询结果: 这是后续查询的结果
新查询结果: 这是一个新查询的结果


In [1]:
import time
import datetime
import random

# 模拟一个强大的LLM，如GPT/Gemini
class PowerfulLLM:
    def __init__(self, name):
        self.name = name

    def generate_response(self, input_text):
         """模拟强大的LLM生成回复"""
         print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 强大的LLM '{self.name}' 处理输入：'{input_text}'")
         time.sleep(random.uniform(1, 2))  # 模拟生成时间
         return f"强大的LLM '{self.name}' 的响应： '{input_text}' 处理后的结果"

# 模拟一个专有的小LLM
class SpecializedSmallLLM:
    def __init__(self, name):
         self.name = name

    def fine_tune(self, input_text):
         """模拟小LLM的微调"""
         print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 专有的小LLM '{self.name}' 根据输入进行微调：'{input_text}'")
         time.sleep(random.uniform(0.5, 1.0))
         return f"专有的小LLM '{self.name}' 微调完成"

    def generate_response(self, input_text):
        """模拟小LLM生成回复"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 专有的小LLM '{self.name}' 处理输入：'{input_text}'")
        time.sleep(random.uniform(0.3, 0.8))
        return f"专有的小LLM '{self.name}' 的响应：'{input_text}' 处理后的结果"


# 模拟缓存
class Cache:
    def __init__(self):
        self.cache = {} # 使用字典存储

    def check_cache(self, input_text):
       """模拟检查缓存，如果缓存命中，返回结果，否则返回None"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 检查缓存：'{input_text}'")
       time.sleep(random.uniform(0.2, 0.5))
       if input_text in self.cache:
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 缓存命中：'{input_text}'")
          return self.cache[input_text]
       else:
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 缓存未命中：'{input_text}'")
          return None

    def store_cache(self, input_text, result):
        """模拟存储缓存"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 存储缓存：输入：'{input_text}', 结果: '{result}'")
        self.cache[input_text] = result

# 初始化组件
powerful_llm = PowerfulLLM("GPT/Gemini")
specialized_llm = SpecializedSmallLLM("专有的小LLM")
cache = Cache()

# 主流程
def process_query(query_type, query_text):
    """根据查询类型处理输入"""
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到查询：类型: '{query_type}', 内容: '{query_text}'")

    if query_type == "initial":
        # 初始查询直接使用强大的LLM
        llm_response = powerful_llm.generate_response(query_text)
        cache.store_cache(query_text, llm_response) # 同时存储到缓存
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出：{llm_response}")
        return llm_response
    elif query_type == "follow-up":
        # 后续查询先检查缓存
        cached_response = cache.check_cache(query_text)
        if cached_response:
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出(缓存命中)：{cached_response}")
          return cached_response
        else:
          # 如果没有命中缓存，使用强大的LLM，并存入缓存
          llm_response = powerful_llm.generate_response(query_text)
          cache.store_cache(query_text, llm_response)
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出(缓存未命中)：{llm_response}")
          return llm_response

    elif query_type == "new":
        # 新查询使用专有的小LLM，先进行微调
        specialized_llm.fine_tune(query_text)
        llm_response = specialized_llm.generate_response(query_text)
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出(新查询，使用专有LLM)：{llm_response}")
        return llm_response

    else:
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 无效查询类型：'{query_type}'")
        return "无效查询类型"


# 示例输入
queries = [
    ("initial", "今天天气怎么样？"),
    ("follow-up", "今天天气怎么样？"),  # 缓存命中
    ("follow-up", "明天的天气呢？"),  # 缓存未命中
    ("new", "翻译：你好"),
    ("invalid", "未知查询类型")
]

for query_type, query_text in queries:
  process_query(query_type, query_text)
  time.sleep(1) # 加入时间间隔


02:52:29: 接收到查询：类型: 'initial', 内容: '今天天气怎么样？'
02:52:29: 强大的LLM 'GPT/Gemini' 处理输入：'今天天气怎么样？'
02:52:31: 存储缓存：输入：'今天天气怎么样？', 结果: '强大的LLM 'GPT/Gemini' 的响应： '今天天气怎么样？' 处理后的结果'
02:52:31: 系统输出：强大的LLM 'GPT/Gemini' 的响应： '今天天气怎么样？' 处理后的结果

02:52:32: 接收到查询：类型: 'follow-up', 内容: '今天天气怎么样？'
02:52:32: 检查缓存：'今天天气怎么样？'
02:52:32: 缓存命中：'今天天气怎么样？'
02:52:32: 系统输出(缓存命中)：强大的LLM 'GPT/Gemini' 的响应： '今天天气怎么样？' 处理后的结果

02:52:33: 接收到查询：类型: 'follow-up', 内容: '明天的天气呢？'
02:52:33: 检查缓存：'明天的天气呢？'
02:52:34: 缓存未命中：'明天的天气呢？'
02:52:34: 强大的LLM 'GPT/Gemini' 处理输入：'明天的天气呢？'
02:52:35: 存储缓存：输入：'明天的天气呢？', 结果: '强大的LLM 'GPT/Gemini' 的响应： '明天的天气呢？' 处理后的结果'
02:52:35: 系统输出(缓存未命中)：强大的LLM 'GPT/Gemini' 的响应： '明天的天气呢？' 处理后的结果

02:52:36: 接收到查询：类型: 'new', 内容: '翻译：你好'
02:52:36: 专有的小LLM '专有的小LLM' 根据输入进行微调：'翻译：你好'
02:52:37: 专有的小LLM '专有的小LLM' 处理输入：'翻译：你好'
02:52:38: 系统输出(新查询，使用专有LLM)：专有的小LLM '专有的小LLM' 的响应：'翻译：你好' 处理后的结果

02:52:39: 接收到查询：类型: 'invalid', 内容: '未知查询类型'
02:52:39: 无效查询类型：'invalid'


### 工具利用 Tool Use
![图片描述](https://drive.google.com/uc?id=1UFLljvksBOc_MhxkYr25vxABvPKU2WN4)

### 规则融合 Blending Rules Based & Generative
![图片描述](https://drive.google.com/uc?id=1yFkf7Xtq8x_IURgP6oBD61Istj5rg7JY)

In [4]:
class RuleEngine:
    def __init__(self):
        self.rules = {}  # 初始化规则库

    def add_rule(self, rule_id, rule):
        """向规则库中添加规则"""
        self.rules[rule_id] = rule

    def check_rule(self, input_data):
        """检查输入数据是否命中规则"""
        for rule_id, rule in self.rules.items():
            if rule(input_data):
                return True, rule_id
        return False, None

    def generate_new_rule(self, input_data):
        """生成新的规则（模拟LLM或微调模型）"""
        # 这里可以使用复杂的模型生成新规则
        new_rule = lambda x: x == input_data
        return new_rule

    def update_rules(self, new_rule):
        """更新规则库"""
        rule_id = len(self.rules) + 1
        self.add_rule(rule_id, new_rule)

    def validate_rule(self, rule_id, input_data):
        """验证规则的有效性"""
        if self.rules[rule_id](input_data):
            return True
        else:
            return False

    def log_and_adjust(self, rule_id, input_data):
        """记录日志并调整规则"""
        print(f"规则 {rule_id} 处理输入 {input_data}")
        # 这里可以记录日志并根据反馈调整规则

def main():
    # 初始化规则引擎
    engine = RuleEngine()

    # 添加预填充规则
    engine.add_rule(1, lambda x: x.startswith("初始"))
    engine.add_rule(2, lambda x: x.startswith("后续"))

    # 输入数据
    input_data = "初始查询"

    # 检查规则命中情况
    hit, rule_id = engine.check_rule(input_data)
    if hit:
        print(f"规则 {rule_id} 命中")
    else:
        print("规则未命中")

        # 生成新规则
        new_rule = engine.generate_new_rule(input_data)

        # 更新规则库
        engine.update_rules(new_rule)

        # 验证新规则
        valid = engine.validate_rule(len(engine.rules), input_data)
        if valid:
            print("新规则有效")
        else:
            print("新规则无效")

        # 记录日志并调整规则
        engine.log_and_adjust(len(engine.rules), input_data)

if __name__ == "__main__":
    main()

规则 1 命中


In [5]:
import time
import datetime
import random

# 模拟原始规则库
class OriginalRuleBase:
   def __init__(self, name):
      self.name = name
      self.rules = ["rule1: 默认规则 1", "rule2: 默认规则 2"] # 初始预填规则

   def get_all_rules(self):
      """获取所有规则"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 原始规则库 '{self.name}' 获取所有规则")
      return self.rules

   def update_rules(self, new_rule):
      """更新规则库"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 原始规则库 '{self.name}' 更新规则，新规则: '{new_rule}'")
      self.rules.append(new_rule)

# 模拟规则引擎
class RuleEngine:
    def __init__(self, name, original_rule_base):
        self.name = name
        self.rules = original_rule_base.get_all_rules() # 获取预填规则
        self.rule_counter = 0

    def apply_rules(self, input_text):
      """模拟规则引擎应用规则，如果匹配，返回True 和应用的规则，否则返回 False 和 None"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则引擎 '{self.name}' 应用规则：输入：'{input_text}'")
      time.sleep(random.uniform(0.3, 0.8)) # 模拟规则应用耗时
      for rule in self.rules:
        if rule in input_text:
            print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则命中，规则：'{rule}'")
            self.rule_counter += 1
            return True, rule
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则未命中")
      return False, None

    def adjust_rules(self, new_rule):
        """ 模拟调整规则"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则引擎 '{self.name}' 调整规则，新规则：'{new_rule}'")
        self.rules.append(new_rule)


# 模拟生成式模型
class GenerativeModel:
    def __init__(self, name):
       self.name = name

    def generate_rule(self, input_text):
      """模拟生成式模型，从输入生成规则"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 生成式模型 '{self.name}' 生成规则，基于输入：'{input_text}'")
      time.sleep(random.uniform(0.5, 1.5))  # 模拟生成规则耗时
      return f"基于 '{input_text}' 生成的规则"


# 模拟规则执行验证器
class RuleValidator:
   def __init__(self, name):
       self.name = name

   def validate(self, generated_rule):
       """模拟规则执行验证，返回规则是否有效"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则执行验证器 '{self.name}' 验证规则: '{generated_rule}'")
       time.sleep(random.uniform(0.3, 0.8))
       if "错误" not in generated_rule:  # 简单模拟，如果包含错误字眼，则无效
           print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则 '{generated_rule}' 有效")
           return True
       else:
           print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则 '{generated_rule}' 无效")
           return False


# 模拟日志和反馈机制
class LogAndFeedback:
    def __init__(self, name):
         self.name = name
         self.logs = []

    def log(self, input_text, validation_result, rule=None):
      """记录日志和反馈信息"""
      log_entry = {
            "timestamp": datetime.datetime.now(),
            "input_text": input_text,
            "validation_result": validation_result,
            "rule": rule
        }
      self.logs.append(log_entry)
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 日志和反馈机制 '{self.name}' 记录日志: {log_entry}")

    def adjust_based_on_feedback(self, rule_engine):
      """根据反馈信息调整规则(简单模拟)"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 日志和反馈机制 '{self.name}' 根据反馈调整规则")
      if self.logs and any (not entry["validation_result"] for entry in self.logs): # 简单模拟，如果有无效规则，调整规则引擎
          rule_to_adjust = self.logs[-1]["rule"]
          if rule_to_adjust:
             rule_engine.adjust_rules(f"调整后的规则: '{rule_to_adjust}'")
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}:  根据反馈调整了规则引擎")
      else:
          print(f"{datetime.datetime.now().strftime('%H:%M:%S')}:  没有发现需要调整规则")


# 初始化组件
original_rule_base = OriginalRuleBase("原始规则库")
rule_engine = RuleEngine("规则引擎", original_rule_base)
generative_model = GenerativeModel("生成式模型")
rule_validator = RuleValidator("规则执行验证器")
log_feedback = LogAndFeedback("日志与反馈机制")


# 模拟人工校对
def manual_review(generated_rule):
    """模拟人工校对，可以返回一个新的规则或者修正后的规则"""
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 人工校对：'{generated_rule}'")
    time.sleep(random.uniform(0.5,1))
    if "错误" in generated_rule:
        return f"人工校对后的规则: '{generated_rule.replace('错误','正确')}'"
    else:
        return generated_rule


# 主流程
def process_input(input_text):
    """处理输入的整体流程"""
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到输入：'{input_text}'")

    # 1. 规则引擎应用规则
    rule_hit, applied_rule = rule_engine.apply_rules(input_text)


    if rule_hit:
        # 规则命中，直接输出
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规则命中, 直接输出结果")
        log_feedback.log(input_text, True, applied_rule) # 记录命中规则的反馈
        return f"规则引擎处理结果: '{input_text}'"
    else:
         # 规则未命中，生成新的规则
        generated_rule = generative_model.generate_rule(input_text)
        # 进行人工校对
        reviewed_rule = manual_review(generated_rule)

        # 规则执行验证
        validation_result = rule_validator.validate(reviewed_rule)
        if validation_result:
          rule_engine.adjust_rules(reviewed_rule)  # 更新规则引擎
        log_feedback.log(input_text, validation_result, reviewed_rule if validation_result else None)  # 记录反馈信息
        log_feedback.adjust_based_on_feedback(rule_engine)   # 调整规则引擎


        if validation_result:
            #  输出
            print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 新规则验证通过，输出结果")
            return f"规则引擎处理结果（新规则）：'{input_text}'"
        else:
             print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 新规则验证未通过，输出结果")
             return f"规则引擎处理结果(新规则未通过)：'{input_text}'"

# 示例输入
inputs = [
    "这是一个rule1 示例",
    "这是一个rule2 示例",
    "这是一个新的输入，需要生成一个新规则",
    "这个输入会生成一个错误的规则", # 测试生成错误规则的情况
    "这个输入会触发规则调整", # 测试规则调整
    "这是一个测试输入", # 测试调整后规则的命中
]

for input_text in inputs:
    process_input(input_text)
    time.sleep(1)

03:28:31: 原始规则库 '原始规则库' 获取所有规则

03:28:31: 接收到输入：'这是一个rule1 示例'
03:28:31: 规则引擎 '规则引擎' 应用规则：输入：'这是一个rule1 示例'
03:28:32: 规则未命中
03:28:32: 生成式模型 '生成式模型' 生成规则，基于输入：'这是一个rule1 示例'
03:28:33: 人工校对：'基于 '这是一个rule1 示例' 生成的规则'
03:28:34: 规则执行验证器 '规则执行验证器' 验证规则: '基于 '这是一个rule1 示例' 生成的规则'
03:28:35: 规则 '基于 '这是一个rule1 示例' 生成的规则' 有效
03:28:35: 规则引擎 '规则引擎' 调整规则，新规则：'基于 '这是一个rule1 示例' 生成的规则'
03:28:35: 日志和反馈机制 '日志与反馈机制' 记录日志: {'timestamp': datetime.datetime(2025, 1, 7, 3, 28, 35, 154169), 'input_text': '这是一个rule1 示例', 'validation_result': True, 'rule': "基于 '这是一个rule1 示例' 生成的规则"}
03:28:35: 日志和反馈机制 '日志与反馈机制' 根据反馈调整规则
03:28:35:  没有发现需要调整规则
03:28:35: 新规则验证通过，输出结果

03:28:36: 接收到输入：'这是一个rule2 示例'
03:28:36: 规则引擎 '规则引擎' 应用规则：输入：'这是一个rule2 示例'
03:28:36: 规则未命中
03:28:36: 生成式模型 '生成式模型' 生成规则，基于输入：'这是一个rule2 示例'
03:28:37: 人工校对：'基于 '这是一个rule2 示例' 生成的规则'
03:28:38: 规则执行验证器 '规则执行验证器' 验证规则: '基于 '这是一个rule2 示例' 生成的规则'
03:28:39: 规则 '基于 '这是一个rule2 示例' 生成的规则' 有效
03:28:39: 规则引擎 '规则引擎' 调整规则，新规则：'基于 '这是一个rule2 示例' 生成的规则'
03:28:39: 日志和

### 多步规划 Planning
![图片描述](https://drive.google.com/uc?id=12y2AVBs81nfgPYwcYeJY-Sl8IHN7Hxrn)


In [2]:
# 确保你已经安装了必要的库，如 transformers 和 torch。如果你还没有安装它们，可以通过以下命令安装
!pip install transformers torch



In [3]:
# 导入所需的库
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer

# 定义任务规划函数
def plan_task(input_text):
    # 简单的任务规划，这里假设所有输入都需要情感分析
    print("任务规划: 需要进行情感分析.")
    return "sentiment_analysis"

# 定义模型选择函数
def select_model(task_type):
    if task_type == "sentiment_analysis":
        model_name = "nlptown/bert-base-multilingual-uncased-sentiment"
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSequenceClassification.from_pretrained(model_name)
        classifier = pipeline('sentiment-analysis', model=model, tokenizer=tokenizer)
        print(f"选择了模型: {model_name}")
        return classifier
    else:
        raise ValueError("不支持的任务类型")

# 定义任务执行函数
def execute_task(classifier, input_text):
    result = classifier(input_text)
    print(f"任务执行结果: {result}")
    return result

# 定义响应生成函数
def generate_response(result):
    sentiment = result[0]['label']
    score = result[0]['score']

    if 'positive' in sentiment.lower():
        response = f"这段文本看起来是一个积极的评价，置信度为 {score:.2f}."
    elif 'negative' in sentiment.lower():
        response = f"这段文本看起来是一个消极的评价，置信度为 {score:.2f}."
    else:
        response = "无法明确判断这段文本的情感倾向."

    print(f"最终响应: {response}")
    return response

# 主函数，整合以上步骤
def main(input_text):
    try:
        # 任务规划
        task_type = plan_task(input_text)

        # 模型选择
        classifier = select_model(task_type)

        # 任务执行
        result = execute_task(classifier, input_text)

        # 响应生成
        final_response = generate_response(result)

        return final_response

    except Exception as e:
        print(f"发生错误: {e}")

# 测试主函数
if __name__ == "__main__":
    user_input = "我非常喜欢这个产品！它真的改变了我的生活。"  # 示例输入
    main(user_input)

任务规划: 需要进行情感分析.


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/39.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/953 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/872k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/669M [00:00<?, ?B/s]

Device set to use cpu


选择了模型: nlptown/bert-base-multilingual-uncased-sentiment
任务执行结果: [{'label': '5 stars', 'score': 0.656829833984375}]
最终响应: 无法明确判断这段文本的情感倾向.


In [1]:
import time
import datetime
import random

# 模拟外部工具或模型，例如天气API，计算器，翻译器等
class ExternalTool:
    def __init__(self, name, description):
        self.name = name
        self.description = description

    def execute(self, input_data):
         """ 模拟执行工具，返回结果"""
         print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 执行工具 '{self.name}'，输入: '{input_data}'")
         time.sleep(random.uniform(0.5, 1.5))  # 模拟执行时间
         if self.name == "天气API":
             return f"当前天气是晴朗，温度25度"
         elif self.name == "计算器":
            try:
                return str(eval(input_data))
            except:
                return "计算错误"
         elif self.name == "翻译器":
             return f"翻译后的结果: '你好' is 'hello'"
         else:
            return f"工具 '{self.name}' 无法处理输入 '{input_data}'"


#  定义可用的工具
available_tools = [
    ExternalTool("天气API", "查询当前天气情况"),
    ExternalTool("计算器", "执行数学计算"),
    ExternalTool("翻译器", "进行文本翻译"),
]

# 任务规划器
def task_planner(user_request):
    """
    任务规划器，将用户请求分解为子任务。
    这里使用简单的规则来演示，在实际应用中，可能需要更复杂的逻辑。
    """
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 规划任务，用户请求：'{user_request}'")
    tasks = []
    if "天气" in user_request:
       tasks.append({"tool": "天气API", "input": None})
    if "计算" in user_request or "+" in user_request or "-" in user_request or "*" in user_request or "/" in user_request:
      tasks.append({"tool": "计算器", "input": user_request.split("计算")[-1].strip() if "计算" in user_request else user_request})
    if "翻译" in user_request:
      tasks.append({"tool": "翻译器", "input":"你好"}) # 这里翻译器模拟翻译一个固定的词
    if not tasks:
        return None # 如果没有匹配的任务，返回None

    return tasks


# 模型选择器，根据任务选择合适的工具
def model_selector(task):
    """
     根据任务选择合适的工具。
    """
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 选择模型，任务: {task}")
    for tool in available_tools:
        if tool.name == task['tool']:
            return tool
    return None


# 任务执行器
def task_executor(tool, input_data):
    """
    执行选择的工具，并返回结果。
    """
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 执行任务，使用工具 '{tool.name}'，输入：'{input_data}'")
    if tool:
      return tool.execute(input_data)
    else:
        return "没有合适的工具执行该任务"


# 响应生成器
def response_generator(results, user_request):
    """
    根据任务结果生成响应。
    """
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 生成响应，结果: {results}，用户请求：'{user_request}'")
    response = ""
    if len(results) == 0:
       return "无法处理该请求，请尝试其他请求。"
    if len(results) == 1:
        response = f"根据您的请求： '{user_request}'，结果是：{results[0]}"
    else:
        response = f"根据您的请求：'{user_request}'， 结果是："
        for i, result in enumerate(results):
            response += f"任务 {i+1}: {result}; "

    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 生成的响应：{response}")
    return response

# 主流程
def process_user_request(user_request):
    """
    处理用户请求的主流程。
    """
    tasks = task_planner(user_request)

    if not tasks:
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 无法识别用户请求，请提供更详细的指令")
      return "无法识别用户请求，请提供更详细的指令"

    results = []
    for task in tasks:
      tool = model_selector(task)
      result = task_executor(tool, task.get("input",None))
      results.append(result)
    response = response_generator(results, user_request)
    return response


# 示例用户请求
user_requests = [
    "今天天气怎么样？",
    "计算 123 + 456",
    "翻译 你好",
    "查询天气并计算 2 * 3",
    "我需要一个蛋糕"
]

for request in user_requests:
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 用户请求：{request}")
    response = process_user_request(request)
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统响应：{response}")


05:38:34: 用户请求：今天天气怎么样？
05:38:34: 规划任务，用户请求：'今天天气怎么样？'
05:38:34: 选择模型，任务: {'tool': '天气API', 'input': None}
05:38:34: 执行任务，使用工具 '天气API'，输入：'None'
05:38:34: 执行工具 '天气API'，输入: 'None'
05:38:35: 生成响应，结果: ['当前天气是晴朗，温度25度']，用户请求：'今天天气怎么样？'
05:38:35: 生成的响应：根据您的请求： '今天天气怎么样？'，结果是：当前天气是晴朗，温度25度
05:38:35: 系统响应：根据您的请求： '今天天气怎么样？'，结果是：当前天气是晴朗，温度25度

05:38:35: 用户请求：计算 123 + 456
05:38:35: 规划任务，用户请求：'计算 123 + 456'
05:38:35: 选择模型，任务: {'tool': '计算器', 'input': '123 + 456'}
05:38:35: 执行任务，使用工具 '计算器'，输入：'123 + 456'
05:38:35: 执行工具 '计算器'，输入: '123 + 456'
05:38:36: 生成响应，结果: ['579']，用户请求：'计算 123 + 456'
05:38:36: 生成的响应：根据您的请求： '计算 123 + 456'，结果是：579
05:38:36: 系统响应：根据您的请求： '计算 123 + 456'，结果是：579

05:38:36: 用户请求：翻译 你好
05:38:36: 规划任务，用户请求：'翻译 你好'
05:38:36: 选择模型，任务: {'tool': '翻译器', 'input': '你好'}
05:38:36: 执行任务，使用工具 '翻译器'，输入：'你好'
05:38:36: 执行工具 '翻译器'，输入: '你好'
05:38:37: 生成响应，结果: ["翻译后的结果: '你好' is 'hello'"]，用户请求：'翻译 你好'
05:38:37: 生成的响应：根据您的请求： '翻译 你好'，结果是：翻译后的结果: '你好' is 'hello'
05:38:37: 系统响应：根据您的请求： '翻译 你好'，结果是：翻译后的

### 多重解析 - Discriminate→Re-Compose→ReSolve→Re-Decompose (Dr3)
![图片描述](https://drive.google.com/uc?id=17N5FeE8SIUSpjO9n7cM-CfjjeLtmJ-t4)

In [None]:
# https://github.com/Gy915/Dr3.git

### 记忆认知 Memory Cognition For LLM’s
![图片描述](https://drive.google.com/uc?id=1aQRLlQER_MckkuKbwpwZdzQ-7Pum_VfD)


In [4]:
# 导入必要的库
import pandas as pd
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer

# 定义短期记忆库和长期记忆库
short_term_memory = []
long_term_memory = []

# 定义大语言模型函数
def llm(input_text):
    model_name = "t5-small"  # 使用T5模型作为示例
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    generator = pipeline('text2text-generation', model=model, tokenizer=tokenizer)
    output = generator(input_text, max_length=50, num_return_sequences=1)
    return output[0]['generated_text']

# 定义总结器函数
def summarizer(output_text):
    summary_model_name = "facebook/bart-large-cnn"
    summary_tokenizer = AutoTokenizer.from_pretrained(summary_model_name)
    summary_model = AutoModelForSeq2SeqLM.from_pretrained(summary_model_name)
    summary_generator = pipeline('summarization', model=summary_model, tokenizer=summary_tokenizer)
    summary = summary_generator(output_text, max_length=50, num_return_sequences=1)
    return summary[0]['summary_text']

# 定义短期记忆库更新函数
def update_short_term_memory(summary):
    short_term_memory.append(summary)

# 定义长期记忆库更新函数
def update_long_term_memory(summary):
    long_term_memory.append(summary)

# 定义记忆评估与衰退函数
def memory_assessment_and_decay():
    # 这里可以添加具体的记忆评估逻辑
    if len(short_term_memory) > 5:  # 假设短期记忆库最多存储5条记录
        short_term_memory.pop(0)

# 主函数
def main():
    # 获取用户输入
    user_input = input("请输入文本: ")

    # 大语言模型处理
    output_text = llm(user_input)
    print("大语言模型输出:", output_text)

    # 总结器处理
    summary = summarizer(output_text)
    print("总结器输出:", summary)

    # 更新短期记忆库
    update_short_term_memory(summary)

    # 更新长期记忆库
    update_long_term_memory(summary)

    # 记忆评估与衰退
    memory_assessment_and_decay()

    # 打印短期记忆库和长期记忆库
    print("短期记忆库:", short_term_memory)
    print("长期记忆库:", long_term_memory)

if __name__ == "__main__":
    main()

请输入文本: test


tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Device set to use cpu


大语言模型输出: test


config.json:   0%|          | 0.00/1.58k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

Device set to use cpu
Your min_length=56 must be inferior than your max_length=50.
Your max_length is set to 50, but your input_length is only 3. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=1)


总结器输出: The U.S. military is testing a new system that could be used in the future. The test would be used to test the capabilities of soldiers in the field. It would also be used as a test of the military's ability
短期记忆库: ["The U.S. military is testing a new system that could be used in the future. The test would be used to test the capabilities of soldiers in the field. It would also be used as a test of the military's ability"]
长期记忆库: ["The U.S. military is testing a new system that could be used in the future. The test would be used to test the capabilities of soldiers in the field. It would also be used as a test of the military's ability"]


In [5]:
import time
import datetime
import random

# 模拟大语言模型
class LargeLanguageModel:
    def __init__(self, name):
        self.name = name

    def generate_response(self, input_text, short_term_memory=None):
         """模拟大语言模型生成回复，包含对短期记忆的使用"""
         print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 大语言模型 '{self.name}' 处理输入：'{input_text}'")
         time.sleep(random.uniform(0.5, 1.5))
         if short_term_memory:
            print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 大语言模型使用短期记忆: '{short_term_memory}'")
            return f"根据输入 '{input_text}'和短期记忆 '{short_term_memory}'， 大语言模型回复：处理后的结果"

         return f"根据输入 '{input_text}'，大语言模型回复：处理后的结果"

    def generate_query_for_memory(self, input_text):
        """模拟生成用于查询短期记忆的查询语句"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 大语言模型生成查询语句用于查询短期记忆：'{input_text}'")
        return f"查询：与'{input_text}'相关的信息"

    def generate_information_for_memory(self,input_text):
        """模拟大语言模型生成可以存储到短期记忆中的信息"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 大语言模型生成可以存储到短期记忆的信息：'{input_text}'")
        return f"与输入 '{input_text}' 相关的信息"

# 短期记忆库
class ShortTermMemory:
    def __init__(self):
        self.memory = []

    def store(self, information):
        """存储信息"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 短期记忆库存储信息：'{information}'")
        self.memory.append(information)

    def query(self, query_text):
        """查询信息"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 短期记忆库查询信息：'{query_text}'")
        if self.memory:
            return random.choice(self.memory)  # 返回一个随机匹配的记忆片段
        return None

    def update(self, information):
       """更新信息"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 短期记忆库更新信息，新的信息：'{information}'")
       if self.memory:
          self.memory[-1] = information
       else:
         self.memory.append(information)

    def clear(self):
        """清理记忆"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 短期记忆库清理记忆")
        self.memory = []

# 长期记忆库（使用字典模拟）
class LongTermMemory:
    def __init__(self):
        self.memory = {}  # 使用字典，键为索引，值为信息

    def store(self, information):
      """存储信息，简单使用时间戳作为索引"""
      index = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
      self.memory[index] = information
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 长期记忆库存储信息，索引为 '{index}'， 内容为：'{information}'")

    def retrieve(self, query):
        """根据查询语句，检索长期记忆"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 长期记忆库检索信息：'{query}'")
        if self.memory:
          for key, value in self.memory.items():
            if query in value: # 使用简单匹配，实际可以使用更复杂的索引和检索方式
                return value
          return None  # 如果没有匹配的结果
        else:
            return None


    def index_and_query(self, query):
       """ 模拟索引和查询长期记忆，返回检索结果 """
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 长期记忆库索引并检索信息：'{query}'")
       result = self.retrieve(query)
       return result


# 总结器
class Summarizer:
    def __init__(self, name):
        self.name = name

    def summarize(self, output_text):
        """总结输出，并准备存储到长期记忆库中的信息"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 总结器 '{self.name}' 总结输出：'{output_text}'")
        time.sleep(random.uniform(0.5, 1.0))
        return f"总结后的信息：'{output_text}'"

    def retrieve_known_information(self,output_text):
        """ 模拟获取已知信息 """
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 总结器 '{self.name}' 尝试获取已知信息")
        time.sleep(random.uniform(0.5, 1.0))
        if "历史" in output_text: # 模拟已知一些历史信息
            return f"已知的历史信息： '这是一段历史'"
        else:
            return None

# 记忆评估器
class MemoryEvaluator:
    def __init__(self, name):
         self.name = name

    def evaluate(self, short_term_memory_info):
       """评估短期记忆的价值，并决定是否保留"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 记忆评估器 '{self.name}' 评估短期记忆信息：'{short_term_memory_info}'")
       time.sleep(random.uniform(0.5, 1.0))
       # 模拟评估逻辑，随机决定是否保留
       if random.random() > 0.3:  # 70%的概率保留
            print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 记忆评估器 '{self.name}'决定保留短期记忆信息。")
            return True
       else:
            print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 记忆评估器 '{self.name}'决定遗忘短期记忆信息。")
            return False



#  初始化组件
llm = LargeLanguageModel("大语言模型")
short_term_memory = ShortTermMemory()
long_term_memory = LongTermMemory()
summarizer = Summarizer("总结器")
memory_evaluator = MemoryEvaluator("记忆评估器")

# 主流程
def process_input(input_text):
    """处理输入的整体流程"""
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到输入: '{input_text}'")

    # 1. 使用大语言模型生成回复，尝试利用短期记忆
    memory_query = llm.generate_query_for_memory(input_text)
    short_term_memory_info = short_term_memory.query(memory_query)

    llm_output = llm.generate_response(input_text, short_term_memory_info)

    # 2. 总结输出，并准备存入长期记忆的信息，以及获取已知的长期记忆信息
    summary_text = summarizer.summarize(llm_output)
    known_information = summarizer.retrieve_known_information(llm_output)

    if known_information: # 如果有已知的长期记忆信息，输出
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 从长期记忆库获取已知信息: '{known_information}'")
    # 3.  存储新的总结信息到长期记忆库
    long_term_memory.store(summary_text)

    # 4.  记忆评估和更新
    short_term_memory_information = llm.generate_information_for_memory(input_text)
    retain_memory = memory_evaluator.evaluate(short_term_memory_information)

    if retain_memory:
       short_term_memory.update(short_term_memory_information)

    # 5. 输出
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出：{llm_output}")
    return llm_output

# 示例输入
inputs = [
    "今天天气怎么样？",
    "我记得上次你说过今天会下雨",
    "总结一下今天发生的事情",
    "我想了解更多关于历史的信息",
    "更新今天的计划",
    "关于今天，就先这样吧"
]

for input_text in inputs:
    process_input(input_text)
    # 模拟长期记忆库的查询，在每次处理输入之后
    long_term_query = llm.generate_query_for_memory(input_text)
    long_term_result = long_term_memory.index_and_query(long_term_query)
    if long_term_result:
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 长期记忆库的查询结果：'{long_term_result}'\n")
    else:
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 长期记忆库没有查询到相关结果\n")
    time.sleep(1) # 加入时间间隔


06:02:30: 接收到输入: '今天天气怎么样？'
06:02:30: 大语言模型生成查询语句用于查询短期记忆：'今天天气怎么样？'
06:02:30: 短期记忆库查询信息：'查询：与'今天天气怎么样？'相关的信息'
06:02:30: 大语言模型 '大语言模型' 处理输入：'今天天气怎么样？'
06:02:31: 总结器 '总结器' 总结输出：'根据输入 '今天天气怎么样？'，大语言模型回复：处理后的结果'
06:02:32: 总结器 '总结器' 尝试获取已知信息
06:02:33: 长期记忆库存储信息，索引为 '20250106060233011126'， 内容为：'总结后的信息：'根据输入 '今天天气怎么样？'，大语言模型回复：处理后的结果''
06:02:33: 大语言模型生成可以存储到短期记忆的信息：'今天天气怎么样？'
06:02:33: 记忆评估器 '记忆评估器' 评估短期记忆信息：'与输入 '今天天气怎么样？' 相关的信息'
06:02:33: 记忆评估器 '记忆评估器'决定保留短期记忆信息。
06:02:33: 短期记忆库更新信息，新的信息：'与输入 '今天天气怎么样？' 相关的信息'
06:02:33: 系统输出：根据输入 '今天天气怎么样？'，大语言模型回复：处理后的结果
06:02:33: 大语言模型生成查询语句用于查询短期记忆：'今天天气怎么样？'
06:02:33: 长期记忆库索引并检索信息：'查询：与'今天天气怎么样？'相关的信息'
06:02:33: 长期记忆库检索信息：'查询：与'今天天气怎么样？'相关的信息'
06:02:33: 长期记忆库没有查询到相关结果


06:02:34: 接收到输入: '我记得上次你说过今天会下雨'
06:02:34: 大语言模型生成查询语句用于查询短期记忆：'我记得上次你说过今天会下雨'
06:02:34: 短期记忆库查询信息：'查询：与'我记得上次你说过今天会下雨'相关的信息'
06:02:34: 大语言模型 '大语言模型' 处理输入：'我记得上次你说过今天会下雨'
06:02:35: 大语言模型使用短期记忆: '与输入 '今天天气怎么样？' 相关的信息'
06:02:35: 总结器 '总结器' 总结输出：'根据输入 '我记得上次你说过今天会下雨'和短期记忆 '与输入 '今天天气怎么样？' 相

### 专家集成 Multiplexing AI Agents For A Panel Of Experts

![图片描述](https://drive.google.com/uc?id=1UGAWW6her0p4DCdj9ScGZnzIHDUDi5g2)

In [6]:
# 导入必要的库
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import pandas as pd

# 定义通用大语言模型函数
def general_llm(input_text):
    model_name = "t5-small"  # 使用T5模型作为示例
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    generator = pipeline('text2text-generation', model=model, tokenizer=tokenizer)
    output = generator(input_text, max_length=50, num_return_sequences=1)
    return output[0]['generated_text']

# 定义领域分类器函数
def domain_classifier(input_text):
    # 假设我们有预定义的领域分类器
    domains = ["领域1", "领域2", "领域n"]
    classifier = pipeline("zero-shot-classification")
    result = classifier(input_text, candidate_labels=domains)
    return result['labels'][0]

# 定义小规模领域模型函数
def specific_domain_model(domain, input_text):
    if domain == "领域1":
        model_name = "specific-domain-model-1"
    elif domain == "领域2":
        model_name = "specific-domain-model-2"
    else:
        model_name = "specific-domain-model-n"

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
    generator = pipeline('text2text-generation', model=model, tokenizer=tokenizer)
    output = generator(input_text, max_length=50, num_return_sequences=1)
    return output[0]['generated_text']

# 主函数
def main():
    # 获取用户输入
    user_input = input("请输入文本: ")

    # 通用大语言模型处理
    general_output = general_llm(user_input)
    print("通用大语言模型输出:", general_output)

    # 上下文理解（领域分类）
    domain = domain_classifier(user_input)
    print("确定的领域:", domain)

    # 小规模领域模型处理
    specific_output = specific_domain_model(domain, user_input)
    print("小规模领域模型输出:", specific_output)

    # 综合输出
    final_output = f"综合输出: {general_output} | {specific_output}"
    print("综合输出:", final_output)

if __name__ == "__main__":
    main()

请输入文本: 查询一下南京周围主要城市的天气情况


Device set to use cpu
No model was supplied, defaulted to facebook/bart-large-mnli and revision d7645e1 (https://huggingface.co/facebook/bart-large-mnli).
Using a pipeline without specifying a model name and revision in production is not recommended.


通用大语言模型输出: 


config.json:   0%|          | 0.00/1.15k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Device set to use cpu


确定的领域: 领域1


OSError: specific-domain-model-1 is not a local folder and is not a valid model identifier listed on 'https://huggingface.co/models'
If this is a private repository, make sure to pass a token having permission to this repo either by logging in with `huggingface-cli login` or by passing `token=<your_token>`

In [7]:
import time
import datetime
import random


# 模拟通用大语言模型
class GeneralLanguageModel:
    def __init__(self, name):
        self.name = name

    def understand_context(self, input_text):
        """模拟通用大语言模型理解上下文，并确定输入属于哪个领域"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 通用大语言模型 '{self.name}' 理解上下文：'{input_text}'")
        time.sleep(random.uniform(0.5, 1))
        # 这里使用简单的规则来演示，实际应用中，可以使用更复杂的模型进行领域分类
        if "天气" in input_text:
            return "天气"
        elif "计算" in input_text or "+" in input_text or "-" in input_text or "*" in input_text or "/" in input_text:
            return "计算"
        elif "翻译" in input_text:
             return "翻译"
        else:
            return "通用"  # 如果不属于任何特定领域，返回通用领域

# 模拟特定领域的小模型
class DomainSpecificModel:
    def __init__(self, name, domain):
        self.name = name
        self.domain = domain

    def process_input(self, input_text):
        """模拟特定领域模型处理输入，返回特定领域的输出"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 领域模型 '{self.name}' 处理输入：'{input_text}'")
        time.sleep(random.uniform(0.3, 0.8))
        if self.domain == "天气":
            return f"天气领域模型输出： 今天天气晴朗，温度28度。"
        elif self.domain == "计算":
           try:
             return f"计算领域模型输出： 计算结果是：{eval(input_text)}"
           except:
              return "计算错误"
        elif self.domain == "翻译":
          return f"翻译领域模型输出：翻译结果是：'你好' is 'hello'"

        else:
            return f"通用领域模型输出: 无法处理输入：'{input_text}'"



#  定义可用的领域模型
available_domain_models = {
    "天气": DomainSpecificModel("天气模型", "天气"),
    "计算": DomainSpecificModel("计算模型", "计算"),
    "翻译": DomainSpecificModel("翻译模型", "翻译"),
    "通用": DomainSpecificModel("通用模型", "通用")
}

# 模拟综合输出
class OutputSynthesizer:
  def __init__(self, name):
    self.name = name

  def synthesize(self, outputs):
    """综合不同模型的输出"""
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 综合输出器 '{self.name}' 综合输出：'{outputs}'")
    time.sleep(random.uniform(0.3, 0.7))
    if not outputs:
       return "没有任何输出"
    if len(outputs) == 1:
       return f"综合输出: {outputs[0]}"
    else:
      return f"综合输出:  {', '.join([f'{idx+1}: {output}' for idx, output in enumerate(outputs)])}"

# 初始化组件
general_model = GeneralLanguageModel("通用大语言模型")
output_synthesizer = OutputSynthesizer("综合输出器")

# 主流程
def process_input(input_text):
    """处理输入的整体流程"""
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到输入：'{input_text}'")

    # 1. 使用通用大模型理解上下文
    domain = general_model.understand_context(input_text)

    # 2.  使用特定领域的模型处理输入
    if domain in available_domain_models:
        domain_model = available_domain_models[domain]
        output = domain_model.process_input(input_text if domain == "计算" else input_text)
    else:
        output = available_domain_models["通用"].process_input(input_text)


    # 3.  综合输出
    final_output = output_synthesizer.synthesize([output])

    # 4. 输出最终结果
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 系统输出：{final_output}\n")
    return final_output


# 示例输入
inputs = [
    "今天天气怎么样？",
    "计算 123 + 456",
    "翻译 你好",
    "我需要一个蛋糕",
    "今天天气如何？计算 5 * 3",
    "翻译 '再见'",
    "请问现在几点了？"
]

for input_text in inputs:
    process_input(input_text)
    time.sleep(1) # 加入时间间隔


06:06:39: 接收到输入：'今天天气怎么样？'
06:06:39: 通用大语言模型 '通用大语言模型' 理解上下文：'今天天气怎么样？'
06:06:40: 领域模型 '天气模型' 处理输入：'今天天气怎么样？'
06:06:40: 综合输出器 '综合输出器' 综合输出：'['天气领域模型输出： 今天天气晴朗，温度28度。']'
06:06:41: 系统输出：综合输出: 天气领域模型输出： 今天天气晴朗，温度28度。


06:06:42: 接收到输入：'计算 123 + 456'
06:06:42: 通用大语言模型 '通用大语言模型' 理解上下文：'计算 123 + 456'
06:06:42: 领域模型 '计算模型' 处理输入：'计算 123 + 456'
06:06:43: 综合输出器 '综合输出器' 综合输出：'['计算错误']'
06:06:43: 系统输出：综合输出: 计算错误


06:06:44: 接收到输入：'翻译 你好'
06:06:44: 通用大语言模型 '通用大语言模型' 理解上下文：'翻译 你好'
06:06:45: 领域模型 '翻译模型' 处理输入：'翻译 你好'
06:06:46: 综合输出器 '综合输出器' 综合输出：'["翻译领域模型输出：翻译结果是：'你好' is 'hello'"]'
06:06:47: 系统输出：综合输出: 翻译领域模型输出：翻译结果是：'你好' is 'hello'


06:06:48: 接收到输入：'我需要一个蛋糕'
06:06:48: 通用大语言模型 '通用大语言模型' 理解上下文：'我需要一个蛋糕'
06:06:48: 领域模型 '通用模型' 处理输入：'我需要一个蛋糕'
06:06:49: 综合输出器 '综合输出器' 综合输出：'["通用领域模型输出: 无法处理输入：'我需要一个蛋糕'"]'
06:06:49: 系统输出：综合输出: 通用领域模型输出: 无法处理输入：'我需要一个蛋糕'


06:06:50: 接收到输入：'今天天气如何？计算 5 * 3'
06:06:50: 通用大语言模型 '通用大语言模型' 理解上下文：'今天天气如何？计算 5 * 3'
06:06:51: 领域模型 '天气模型' 处理输入：'今天天气如何？计算 5 * 3'
06:06:51: 综合输出器 '综合输出器

### 群体智能 Swarm Of Generative AI Agents

![图片描述](https://drive.google.com/uc?id=1Cuf13olacW6rNR0ZLBy4TGTWunfIQXRj)



In [9]:
# 导入必要的库
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
import pandas as pd

# 定义智能体类
class Agent:
    def __init__(self, name):
        self.name = name
        self.model_name = f"agent-{name}-model"
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name)
        self.pipeline = pipeline('text2text-generation', model=self.model, tokenizer=self.tokenizer)

    def process_input(self, input_text):
        # 处理输入并生成输出
        output = self.pipeline(input_text, max_length=50, num_return_sequences=1)
        return output[0]['generated_text']

# 定义共识层集成函数
def consensus_integration(agent_outputs):
    # 对多个智能体的输出进行集成
    consensus_output = " ".join(agent_outputs)
    return consensus_output

# 主函数
def main():
    # 获取用户输入
    user_input = input("请输入文本: ")

    # 智能体路由：选择合适的智能体
    agents = [Agent(f"智能体 {i}") for i in range(1, 6)]  # 创建多个智能体实例
    agent_outputs = []

    # 每个智能体处理输入
    for agent in agents:
        output = agent.process_input(user_input)
        print(f"{agent.name} 的输出: {output}")
        agent_outputs.append(output)

    # 共识层集成
    consensus_output = consensus_integration(agent_outputs)
    print("共识输出:", consensus_output)

    # 最终输出
    final_output = consensus_output
    print("最终输出:", final_output)

    # 反馈机制：性能反馈和调整改进共识集成策略
    feedback = input("请输入反馈: ")
    print("反馈:", feedback)

if __name__ == "__main__":
    main()

请输入文本: 南京天气


OSError: Incorrect path_or_model_id: 'agent-智能体 1-model'. Please provide either the path to a local folder or the repo_id of a model on the Hub.

In [8]:
import time
import datetime
import random

# 模拟智能体
class IntelligentAgent:
    def __init__(self, name, expertise):
        self.name = name
        self.expertise = expertise

    def process_input(self, input_text):
        """模拟智能体处理输入，根据专业领域返回不同的输出"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 智能体 '{self.name}' 处理输入：'{input_text}'")
        time.sleep(random.uniform(0.3, 0.8))
        if self.expertise == "天气":
            return f"智能体 '{self.name}': 天气预报是晴天"
        elif self.expertise == "计算":
            try:
                return f"智能体 '{self.name}': 计算结果：{eval(input_text)}"
            except:
                return f"智能体 '{self.name}': 计算错误"
        elif self.expertise == "翻译":
             return f"智能体 '{self.name}': 翻译结果：'你好' is 'hello'"
        elif self.expertise == "通用":
             return f"智能体 '{self.name}': 通用处理结果"
        else:
            return f"智能体 '{self.name}': 无法处理该输入"

# 模拟智能体路由
class AgentRouter:
    def __init__(self, name):
        self.name = name

    def route(self, input_text, available_agents):
        """模拟智能体路由，根据输入选择合适的智能体处理"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 智能体路由 '{self.name}' 路由输入：'{input_text}'")
        time.sleep(random.uniform(0.2, 0.5))
        # 这里使用简单的规则路由，实际应用中，可以使用更复杂的模型进行路由
        if "天气" in input_text:
            return [agent for agent in available_agents if agent.expertise == "天气"]
        elif "计算" in input_text or "+" in input_text or "-" in input_text or "*" in input_text or "/" in input_text:
            return [agent for agent in available_agents if agent.expertise == "计算"]
        elif "翻译" in input_text:
            return [agent for agent in available_agents if agent.expertise == "翻译"]
        else:
            return [agent for agent in available_agents if agent.expertise == "通用"]

# 模拟共识层集成
class ConsensusLayer:
    def __init__(self, name):
        self.name = name

    def aggregate(self, agent_outputs):
        """模拟共识层集成，将多个智能体的输出进行整合"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 共识层 '{self.name}' 集成输出：'{agent_outputs}'")
        time.sleep(random.uniform(0.3, 0.7))
        # 这里使用简单的规则进行整合，实际应用中，可以使用更复杂的模型进行整合
        if not agent_outputs:
           return "没有智能体的输出"
        elif len(agent_outputs) == 1:
            return agent_outputs[0]
        else:
            return f"共识层集成输出：{', '.join([f'{idx+1}: {output}' for idx, output in enumerate(agent_outputs)])}"


# 模拟反馈机制
class FeedbackMechanism:
  def __init__(self, name):
    self.name = name
    self.logs = []

  def record_feedback(self, final_output, input_text):
    """记录性能反馈"""
    feedback_entry = {
        "timestamp": datetime.datetime.now(),
        "input_text": input_text,
        "output": final_output,
       }
    self.logs.append(feedback_entry)
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 反馈机制 '{self.name}' 记录性能反馈：{feedback_entry}")


  def adjust_strategy(self):
    """根据反馈调整共识策略（简单模拟）"""
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 反馈机制 '{self.name}' 根据反馈调整共识策略")
    time.sleep(random.uniform(0.5, 1.0))
    if self.logs and any ("错误" in entry["output"] for entry in self.logs): # 如果有输出错误，就简单打印一条信息，实际可以根据反馈修改共识策略
       print (f"{datetime.datetime.now().strftime('%H:%M:%S')}: 反馈机制 '{self.name}' 发现有错误，建议调整共识策略")
       return True
    return False



# 初始化组件
agents = [
    IntelligentAgent("天气智能体", "天气"),
    IntelligentAgent("计算智能体", "计算"),
    IntelligentAgent("翻译智能体", "翻译"),
    IntelligentAgent("通用智能体", "通用")
]

router = AgentRouter("智能体路由")
consensus_layer = ConsensusLayer("共识层")
feedback = FeedbackMechanism("反馈机制")

# 主流程
def process_input(input_text):
    """处理输入的整体流程"""
    print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到输入：'{input_text}'")

    # 1. 智能体路由
    selected_agents = router.route(input_text, agents)

    # 2. 每个智能体处理输入
    agent_outputs = [agent.process_input(input_text if agent.expertise == "计算" else input_text) for agent in selected_agents]


    # 3. 共识层集成
    consensus_output = consensus_layer.aggregate(agent_outputs)

    # 4. 输出最终结果
    print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 最终输出: '{consensus_output}'")

    # 5. 反馈
    feedback.record_feedback(consensus_output, input_text)
    if feedback.adjust_strategy():
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}:  系统需要根据反馈调整策略，以便更好地处理后续输入\n")

    return consensus_output

# 示例输入
inputs = [
    "今天天气怎么样？",
    "计算 123 + 456",
    "翻译 你好",
    "我需要一个蛋糕",
    "今天天气如何？计算 5 * 3",
    "翻译 '再见'",
    "请问现在几点了？",
    "计算 1 / 0"  # 加入一个会计算错误的输入，来观察反馈机制
]

for input_text in inputs:
    process_input(input_text)
    time.sleep(1) # 加入时间间隔


06:09:14: 接收到输入：'今天天气怎么样？'
06:09:14: 智能体路由 '智能体路由' 路由输入：'今天天气怎么样？'
06:09:15: 智能体 '天气智能体' 处理输入：'今天天气怎么样？'
06:09:15: 共识层 '共识层' 集成输出：'["智能体 '天气智能体': 天气预报是晴天"]'
06:09:15: 最终输出: '智能体 '天气智能体': 天气预报是晴天'
06:09:15: 反馈机制 '反馈机制' 记录性能反馈：{'timestamp': datetime.datetime(2025, 1, 6, 6, 9, 15, 949235), 'input_text': '今天天气怎么样？', 'output': "智能体 '天气智能体': 天气预报是晴天"}
06:09:15: 反馈机制 '反馈机制' 根据反馈调整共识策略

06:09:17: 接收到输入：'计算 123 + 456'
06:09:17: 智能体路由 '智能体路由' 路由输入：'计算 123 + 456'
06:09:18: 智能体 '计算智能体' 处理输入：'计算 123 + 456'
06:09:18: 共识层 '共识层' 集成输出：'["智能体 '计算智能体': 计算错误"]'
06:09:19: 最终输出: '智能体 '计算智能体': 计算错误'
06:09:19: 反馈机制 '反馈机制' 记录性能反馈：{'timestamp': datetime.datetime(2025, 1, 6, 6, 9, 19, 33262), 'input_text': '计算 123 + 456', 'output': "智能体 '计算智能体': 计算错误"}
06:09:19: 反馈机制 '反馈机制' 根据反馈调整共识策略
06:09:19: 反馈机制 '反馈机制' 发现有错误，建议调整共识策略
06:09:19:  系统需要根据反馈调整策略，以便更好地处理后续输入


06:09:20: 接收到输入：'翻译 你好'
06:09:20: 智能体路由 '智能体路由' 路由输入：'翻译 你好'
06:09:21: 智能体 '翻译智能体' 处理输入：'翻译 你好'
06:09:21: 共识层 '共识层' 集成输出：'["智能体 '翻译智能体': 翻译结果：'你好' is 'hell

In [None]:
# -*- coding: utf-8 -*-
"""
# 智能体系统流程演示

本 Jupyter Notebook 演示了一个多智能体协同处理数据并进行反馈优化的系统流程。
该流程对应了图片中的结构，模拟了数据输入、智能体路由、智能体处理、共识层集成、
反馈机制等环节。

"""

import numpy as np
import random
from typing import List, Callable

# --- 1. 数据输入 ---
def generate_input_data():
    """
    生成模拟的输入数据，这里使用简单的随机数作为示例
    """
    return random.random() # 生成 0 到 1 之间的随机浮点数

# --- 2. 智能体定义 ---
class SmartAgent:
    def __init__(self, name: str, processing_function: Callable):
        """
        初始化智能体
        Args:
            name: 智能体名称
            processing_function: 处理输入数据的函数
        """
        self.name = name
        self.process = processing_function

    def process_data(self, data):
        """
        处理输入数据
        Args:
            data: 输入数据
        Returns:
            处理结果
        """
        return self.process(data)

# --- 3. 智能体路由 ---
def smart_agent_routing(data, agents: List[SmartAgent]):
    """
    智能体路由函数：这里使用简单的随机选择智能体的方法
    Args:
        data: 输入数据
        agents: 所有智能体的列表
    Returns:
        一个字典，键为智能体名称，值为处理结果
    """
    chosen_agents = random.sample(agents, random.randint(1, len(agents))) # 随机选择1到所有智能体
    results = {}
    for agent in chosen_agents:
        results[agent.name] = agent.process_data(data) # 调用智能体处理数据
    return results

# --- 4. 共识层集成 ---
def consensus_integration(agent_results: dict, strategy:str = 'average'):
    """
    共识层集成函数，这里使用平均值或加权平均数作为示例
    Args:
        agent_results: 各个智能体的处理结果字典
        strategy: 集成策略，默认为'average'，可选'weighted_average'
    Returns:
        共识结果
    """

    if strategy == 'average':
        results = list(agent_results.values())
        if results:
            return sum(results) / len(results)
        else:
            return 0

    elif strategy == 'weighted_average':
        weights = np.random.rand(len(agent_results))
        values = np.array(list(agent_results.values()))
        return np.dot(weights / weights.sum(), values)

    else:
        raise ValueError("Unsupported integration strategy")

# --- 5. 反馈机制 ---
def feedback_mechanism(output, expected_output):
    """
    反馈机制，计算输出和期望输出之间的误差
    Args:
        output: 系统输出
        expected_output: 期望输出
    Returns:
        误差值
    """
    return abs(output - expected_output)

def adjust_integration_strategy(error, current_strategy):
    """
    调整共识集成策略, 这里示例使用简单的规则
    Args:
        error: 系统误差
        current_strategy: 当前策略
    Returns:
        调整后的策略
    """
    if error > 0.2:
        if current_strategy == 'average':
           return 'weighted_average'
    return current_strategy #如果误差不大则保持当前策略

# --- 6. 模拟系统运行 ---
def run_system(num_iterations: int = 10, agents: List[SmartAgent] = None, initial_strategy='average'):
    """
    模拟系统运行
    Args:
        num_iterations: 迭代次数
        agents: 所有智能体的列表
        initial_strategy: 初始共识集成策略
    """
    if agents is None:
        agents = create_default_agents()  # 如果没有传递，则创建默认智能体
    strategy = initial_strategy
    errors = [] # 用于记录每次迭代的误差
    for i in range(num_iterations):
        input_data = generate_input_data()
        print(f"\nIteration {i+1}: Input Data = {input_data:.2f}")
        agent_results = smart_agent_routing(input_data, agents) # 智能体路由
        print("  Agent Results:", agent_results)
        output = consensus_integration(agent_results, strategy)  # 共识集成
        print(f"  Consensus Output: {output:.2f}")
        expected_output = input_data * 2  # 示例，期望输出是输入的两倍
        error = feedback_mechanism(output, expected_output)  # 反馈机制
        print(f"  Error: {error:.2f}")
        errors.append(error)
        strategy = adjust_integration_strategy(error, strategy) # 调整策略
        print(f"  Updated Strategy: {strategy}")

    print('\nErrors per iteration:', errors)
    print(f"Final strategy: {strategy}")

# --- 7. 定义默认智能体 ---
def create_default_agents():
    """
     创建一组默认智能体
    Returns:
        智能体列表
    """
    agents = [
        SmartAgent("Agent 1", lambda x: x * 1.1),
        SmartAgent("Agent 2", lambda x: x * 0.9),
        SmartAgent("Agent 3", lambda x: x * 1.2),
        SmartAgent("Agent 4", lambda x: x * 0.8),
    ]
    return agents


# --- 主函数 ---
if __name__ == '__main__':
    print("# Start Simulation #\n")
    run_system(num_iterations = 10) # 运行模拟系统

# Start Simulation #


Iteration 1: Input Data = 0.64
  Agent Results: {'Agent 1': 0.7081425234912809, 'Agent 4': 0.5150127443572953, 'Agent 2': 0.5793893374019571, 'Agent 3': 0.7725191165359427}
  Consensus Output: 0.64
  Error: 0.64
  Updated Strategy: weighted_average

Iteration 2: Input Data = 0.47
  Agent Results: {'Agent 4': 0.37839675794540073, 'Agent 3': 0.567595136918101, 'Agent 1': 0.520295542174926}
  Consensus Output: 0.51
  Error: 0.43
  Updated Strategy: weighted_average

Iteration 3: Input Data = 0.21
  Agent Results: {'Agent 2': 0.18924620598973976}
  Consensus Output: 0.19
  Error: 0.23
  Updated Strategy: weighted_average

Iteration 4: Input Data = 0.41
  Agent Results: {'Agent 2': 0.3720555525875513}
  Consensus Output: 0.37
  Error: 0.45
  Updated Strategy: weighted_average

Iteration 5: Input Data = 0.85
  Agent Results: {'Agent 1': 0.9367585309024407}
  Consensus Output: 0.94
  Error: 0.77
  Updated Strategy: weighted_average

Iteration 6: Input Data = 0.05
  Agen

### 多任务微调 Fine-Tuning LLM's For Multiple Tasks

![图片描述](https://drive.google.com/uc?id=1zCLbJZdu0oUcXvFMaPptPHPoD5VbdtyP)

In [11]:
# 导入必要的库
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer

# 定义多任务模型类
class MultiTaskModel:
    def __init__(self):
        # 初始化基座大语言模型
        self.base_model_name = "t5-small"
        self.tokenizer = AutoTokenizer.from_pretrained(self.base_model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(self.base_model_name)
        self.pipeline = pipeline('text2text-generation', model=self.model, tokenizer=self.tokenizer)

    def process_input(self, input_text):
        # 处理输入并生成输出
        output = self.pipeline(input_text, max_length=50, num_return_sequences=1)
        return output[0]['generated_text']

# 主函数
def main():
    # 创建多任务模型实例
    multi_task_model = MultiTaskModel()

    # 获取用户输入
    user_input = input("请输入文本: ")

    # 多任务模型处理
    shared_representation = multi_task_model.process_input(user_input)
    print("共享表示:", shared_representation)

    # 特定任务分发
    task_a_output = multi_task_model.process_input(f"任务A: {shared_representation}")
    task_b_output = multi_task_model.process_input(f"任务B: {shared_representation}")
    task_c_output = multi_task_model.process_input(f"任务C: {shared_representation}")

    # 输出结果
    print("任务A输出:", task_a_output)
    print("任务B输出:", task_b_output)
    print("任务C输出:", task_c_output)

if __name__ == "__main__":
    main()

Device set to use cpu


请输入文本: teste
共享表示: test test
任务A输出: Test test
任务B输出: Test test
任务C输出: Test test


In [10]:
import time
import datetime
import random

# 模拟基座大语言模型
class BaseLanguageModel:
    def __init__(self, name):
        self.name = name

    def fine_tune(self, task_data):
       """模拟基座大模型微调"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 基座大模型 '{self.name}' 根据任务数据进行微调：'{task_data}'")
       time.sleep(random.uniform(0.5, 1.5))
       return f"微调后的模型，可以处理不同的任务"


# 模拟多任务模型
class MultiTaskModel:
    def __init__(self, name, base_model):
        self.name = name
        self.base_model = base_model
        self.shared_representation = None # 共享表示层

    def process_input(self, task_data):
       """模拟多任务模型处理不同任务的输入，生成共享表示"""
       print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 多任务模型 '{self.name}' 处理输入数据：'{task_data}'")
       time.sleep(random.uniform(0.3, 0.8))
       # 模拟生成共享表示层
       self.shared_representation = f"共享表示层：处理后的数据：'{task_data}'"
       return self.shared_representation


# 模拟共享层
class SharedLayer:
    def __init__(self, name):
       self.name = name

    def get_shared_representation(self, shared_representation):
        """模拟共享层，接收多任务模型的输出，生成特定任务分发需要的表示"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 共享层 '{self.name}' 获取共享表示：'{shared_representation}'")
        time.sleep(random.uniform(0.2, 0.5))
        return f"特定任务分发表示： '{shared_representation}'"

# 模拟特定任务分发
class TaskDispatcher:
    def __init__(self, name):
        self.name = name

    def dispatch(self, shared_representation):
        """模拟任务分发，根据共享表示，将输入分配到不同的任务处理模块"""
        print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 任务分发器 '{self.name}' 分发任务，共享表示：'{shared_representation}'")
        time.sleep(random.uniform(0.2, 0.5))
        if "任务A" in shared_representation:
            return {"task": "任务A", "data": f"任务A的数据，基于 '{shared_representation}'"}
        elif "任务B" in shared_representation:
            return {"task": "任务B", "data": f"任务B的数据，基于 '{shared_representation}'"}
        elif "任务C" in shared_representation:
            return {"task": "任务C", "data": f"任务C的数据，基于 '{shared_representation}'"}
        else:
            return {"task":"未知", "data": f"未知任务，基于 '{shared_representation}'"}



# 模拟特定任务处理模块
class SpecificTaskProcessor:
    def __init__(self, name, task_name):
        self.name = name
        self.task_name = task_name

    def process_task(self, task_data):
      """模拟特定任务处理模块，处理特定任务数据，生成任务输出"""
      print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 特定任务处理器 '{self.name}' 处理任务 '{self.task_name}'，数据：'{task_data}'")
      time.sleep(random.uniform(0.3, 0.7))
      if self.task_name == "任务A":
         return "输出 A： 这是任务A的处理结果"
      elif self.task_name == "任务B":
          return "输出 B： 这是任务B的处理结果"
      elif self.task_name == "任务C":
          return "输出 C： 这是任务C的处理结果"
      else:
          return "未知输出:  无法处理该任务"


# 初始化组件
base_model = BaseLanguageModel("基座大语言模型")
multitask_model = MultiTaskModel("多任务模型", base_model)
shared_layer = SharedLayer("共享层")
task_dispatcher = TaskDispatcher("任务分发器")

task_processors = {
    "任务A": SpecificTaskProcessor("任务A处理器", "任务A"),
    "任务B": SpecificTaskProcessor("任务B处理器", "任务B"),
    "任务C": SpecificTaskProcessor("任务C处理器", "任务C"),
}

# 主流程
def process_input(inputs):
  """处理输入的整体流程"""
  print(f"\n{datetime.datetime.now().strftime('%H:%M:%S')}: 接收到输入：'{inputs}'")

  # 1.  微调基座大语言模型
  fine_tuned_model = base_model.fine_tune(inputs)

  # 2.  多任务模型处理输入，生成共享表示层
  shared_representation = multitask_model.process_input(inputs)

  # 3.  获取共享表示
  task_representation = shared_layer.get_shared_representation(shared_representation)

  # 4.  特定任务分发
  dispatched_task = task_dispatcher.dispatch(task_representation)

  # 5.  特定任务处理
  if dispatched_task["task"] in task_processors:
     processor = task_processors[dispatched_task["task"]]
     task_output = processor.process_task(dispatched_task["data"])
  else:
    task_output = SpecificTaskProcessor("未知任务处理器","未知").process_task(dispatched_task["data"])


  # 6.  输出最终结果
  print(f"{datetime.datetime.now().strftime('%H:%M:%S')}: 最终输出：'{task_output}'\n")
  return task_output


# 示例输入
inputs_list = [
   "任务A 数据: 这是一个任务A的数据",
    "任务B 数据: 这是一个任务B的数据",
    "任务C 数据: 这是一个任务C的数据",
     "未知任务：这是一个未知任务的数据"
]


for input_data in inputs_list:
  process_input(input_data)
  time.sleep(1) # 加入时间间隔


06:11:47: 接收到输入：'任务A 数据: 这是一个任务A的数据'
06:11:47: 基座大模型 '基座大语言模型' 根据任务数据进行微调：'任务A 数据: 这是一个任务A的数据'
06:11:48: 多任务模型 '多任务模型' 处理输入数据：'任务A 数据: 这是一个任务A的数据'
06:11:48: 共享层 '共享层' 获取共享表示：'共享表示层：处理后的数据：'任务A 数据: 这是一个任务A的数据''
06:11:49: 任务分发器 '任务分发器' 分发任务，共享表示：'特定任务分发表示： '共享表示层：处理后的数据：'任务A 数据: 这是一个任务A的数据'''
06:11:49: 特定任务处理器 '任务A处理器' 处理任务 '任务A'，数据：'任务A的数据，基于 '特定任务分发表示： '共享表示层：处理后的数据：'任务A 数据: 这是一个任务A的数据''''
06:11:50: 最终输出：'输出 A： 这是任务A的处理结果'


06:11:51: 接收到输入：'任务B 数据: 这是一个任务B的数据'
06:11:51: 基座大模型 '基座大语言模型' 根据任务数据进行微调：'任务B 数据: 这是一个任务B的数据'
06:11:52: 多任务模型 '多任务模型' 处理输入数据：'任务B 数据: 这是一个任务B的数据'
06:11:52: 共享层 '共享层' 获取共享表示：'共享表示层：处理后的数据：'任务B 数据: 这是一个任务B的数据''
06:11:53: 任务分发器 '任务分发器' 分发任务，共享表示：'特定任务分发表示： '共享表示层：处理后的数据：'任务B 数据: 这是一个任务B的数据'''
06:11:53: 特定任务处理器 '任务B处理器' 处理任务 '任务B'，数据：'任务B的数据，基于 '特定任务分发表示： '共享表示层：处理后的数据：'任务B 数据: 这是一个任务B的数据''''
06:11:54: 最终输出：'输出 B： 这是任务B的处理结果'


06:11:55: 接收到输入：'任务C 数据: 这是一个任务C的数据'
06:11:55: 基座大模型 '基座大语言模型' 根据任务数据进行微调：'任务C 数据: 这是一个任务C的数据'
06:11:56: 多任务模型 '多任务模型' 处理输入数据：'任务C 数据: 这是一个任务

### 模块化组合 Modular Monolith LLM Approach With Composability

![图片描述](https://drive.google.com/uc?id=1uTF3Lv457E4-hkeIBEdUvsrl7gCqA46Q)

In [24]:
import time
import datetime
import random
import os
import requests

API_KEY = os.getenv('SILICONFLOW_API_KEY', 'sk-aerqsdefgpftbyerwomejvrrhgfkpgrbouhzwklhwbruuitc')
BASE_URL = "https://api.siliconflow.cn/v1/chat/completions"

# 定义大模型调用类
class CallLLM:
    def __init__(self):
      self.url = BASE_URL
      self.headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }

    def call(self, model_name, prompt):
        payload = {
            "model": model_name,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": prompt
                        }
                    ]
                }
            ],
            "stream": False,
            "max_tokens": 512,
            "stop": ["null"],
            "temperature": 0.7,
            "top_p": 0.7,
            "top_k": 50,
            "frequency_penalty": 0.5,
            "n": 1,
            "response_format": {"type": "text"}
        }

        try:
            response = requests.post(self.url, json=payload, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"API 请求错误: {e}")
            return None

# 定义大模型类
class Model:
    def __init__(self, model_name, call_llm):
        self.model_name = model_name
        self.call_llm = call_llm

    def generate_response(self, input_text):
        response_data = self.call_llm.call(self.model_name, input_text)
        if response_data and "choices" in response_data and response_data["choices"]:
          return response_data["choices"][0]["message"]["content"]
        else:
            return "API调用出错或者没有返回文本"

# 模拟独立功能智能体
class IndependentAgent:
    def __init__(self, name, expertise, llm_model):
        self.name = name
        self.expertise = expertise
        self.llm_model = llm_model

    def process_input(self, input_text):
        """模拟独立智能体处理输入，根据专业领域返回不同的输出"""
        print(f">>> 独立智能体 '{self.name}' 处理输入：'{input_text}'")
        time.sleep(random.uniform(0.3, 0.8))
        if self.expertise == "古诗":
           related_prompt = f"请仅提取出以下内容中关于诗词写作的要求，其他部分忽略：{input_text}"
           related_input = self.llm_model.generate_response(related_prompt)
           prompt = f"请用古诗的形式表达以下内容：{related_input}"
           return f"独立智能体 '{self.name}' (古诗专家)： \n {self.llm_model.generate_response(prompt)}"
        elif self.expertise == "笑话":
             related_prompt = f"请仅提取出以下内容中关于讲笑话的要求，其他部分忽略：{input_text}"
             related_input = self.llm_model.generate_response(related_prompt)
             prompt = f"请讲一个关于 {related_input} 的笑话"
             return  f"独立智能体 '{self.name}' (笑话专家)：\n {self.llm_model.generate_response(prompt)}"
        elif self.expertise == "翻译":
           related_prompt = f"请仅提取出以下内容中关于翻译的要求，其他部分忽略：{input_text}"
           related_input = self.llm_model.generate_response(related_prompt)
           prompt = f"请将以下中文翻译成法语：{related_input}"
           return  f"独立智能体 '{self.name}' (翻译专家)：\n{self.llm_model.generate_response(prompt)}"
        else:
            return f"独立智能体 '{self.name}': 无法处理该输入"

# 模拟智能体选择器
class AgentSelector:
    def __init__(self, name, llm_model):
        self.name = name
        self.selection_strategy = "simple"  # 默认选择策略
        self.llm_model = llm_model

    def select_agents(self, input_text, available_agents):
        """模拟智能体选择，根据输入选择合适的智能体处理"""
        print(f">>> 智能体选择器 '{self.name}' 根据输入选择智能体：'{input_text}'")
        time.sleep(random.uniform(0.2, 0.5))
        if self.selection_strategy == "simple":
            prompt = f"请根据以下文本'{input_text}'，选择最合适的独立智能体，备选智能体包括：古诗智能体, 笑话智能体, 翻译智能体。请返回选择的智能体名称。"
            selected_agent_name = self.llm_model.generate_response(prompt)
            selected_agents = []
            for agent in available_agents:
                if agent.expertise in selected_agent_name:
                    selected_agents.append(agent)
            if selected_agents:
                return selected_agents
            else:
                return [random.choice(available_agents)]
        else:
          prompt = f"请根据以下文本'{input_text}'，选择一个或多个合适的独立智能体，备选智能体包括：古诗智能体, 笑话智能体, 翻译智能体。请返回选择的智能体名称"
          selected_agent_name = self.llm_model.generate_response(prompt)
          selected_agents = []
          for agent in available_agents:
             if agent.expertise in selected_agent_name:
                selected_agents.append(agent)
          if selected_agents:
                return selected_agents
          else:
               return [random.choice(available_agents)]

    def adjust_selection_strategy(self, feedback_result):
      """根据反馈结果调整选择策略（简单模拟）"""
      print(f">>> 智能体选择器 '{self.name}' 根据反馈调整选择策略")
      time.sleep(random.uniform(0.2,0.5))
      if "错误" in feedback_result:
         print(f">>> 智能体选择器 '{self.name}': 发现错误，将选择策略设置为更加复杂的选择策略(实际中可以使用更复杂的策略)")
         self.selection_strategy = "complex" # 这里只是简单地修改一个参数，实际可以使用更复杂的策略
      else:
         print(f">>> 智能体选择器 '{self.name}': 没有发现错误，维持当前的选择策略")


# 模拟输出集成器
class OutputAggregator:
    def __init__(self, name):
        self.name = name

    def aggregate(self, agent_outputs):
        """模拟输出集成，将多个智能体的输出进行整合"""
        print(f">>> 输出集成器 '{self.name}' 集成输出：'{agent_outputs}'")
        time.sleep(random.uniform(0.3, 0.7))
        if not agent_outputs:
           return "没有智能体的输出"
        elif len(agent_outputs) == 1:
            return agent_outputs[0]
        else:
            return f"集成输出：\n{', '.join([f'{idx+1}: {output}' for idx, output in enumerate(agent_outputs)])}"

# 模拟统一输出
class UnifiedOutput:
  def __init__(self, name, llm_model):
    self.name = name
    self.llm_model = llm_model

  def generate_unified_output(self, input_text, aggregated_output):
    """模拟生成统一的输出"""
    prompt = f"基于输入'{input_text}'以及多个智能体的输出：'{aggregated_output}'，综合生成一段针对输入的回复"
    unified_output = self.llm_model.generate_response(prompt)
    print(f">>> 最终输出: \n{unified_output}")
    return unified_output

# 模拟反馈机制
class FeedbackMechanism:
  def __init__(self, name):
    self.name = name
    self.logs = []

  def record_feedback(self, final_output, input_text):
      """记录性能反馈"""
      feedback_entry = {
          "timestamp": datetime.datetime.now(),
          "input_text": input_text,
          "output": final_output,
      }
      self.logs.append(feedback_entry)
      print(f"\n>>> 反馈机制 '{self.name}' 记录反馈：{feedback_entry}")

  def get_feedback_result(self):
     """简单根据是否有输出错误来返回反馈结果"""
     if self.logs and any("无法处理" in entry["output"] for entry in self.logs):
          return "输出包含错误"
     else:
          return "输出正确"

# 初始化组件
call_llm = CallLLM()
llm_model = Model("THUDM/glm-4-9b-chat", call_llm) # 使用同一个大模型
agents = [
    IndependentAgent("古诗智能体", "古诗", llm_model),
    IndependentAgent("笑话智能体", "笑话", llm_model),
    IndependentAgent("翻译智能体", "翻译", llm_model)
]

selector = AgentSelector("智能体选择器", llm_model)
aggregator = OutputAggregator("输出集成器")
unified_output = UnifiedOutput("统一输出器", llm_model)
feedback = FeedbackMechanism("反馈机制")

# 主流程
def process_input(input_text):
    """处理输入的整体流程"""
    print(f">>> 接收到输入：'{input_text}'")

    # 1. 智能体选择
    selected_agents = selector.select_agents(input_text, agents)


    # 2. 每个智能体处理输入
    agent_outputs = [agent.process_input(input_text) for agent in selected_agents]


    # 3. 输出集成
    aggregated_output = aggregator.aggregate(agent_outputs)

    # 4. 统一输出
    final_output = unified_output.generate_unified_output(input_text, aggregated_output)

    # 5. 反馈
    feedback.record_feedback(final_output, input_text)
    feedback_result = feedback.get_feedback_result()
    selector.adjust_selection_strategy(feedback_result)

    return final_output

# 示例输入
inputs = [
    "请用古诗的形式描述一下秋天的景色",
     "讲一个关于猫的笑话",
     "将今天天气真好翻译成法语",
     "请用古诗的形式描述一下猫，并且讲一个关于猫的笑话", # 测试多个智能体
     "请用古诗的形式描述一下猫, 讲一个关于猫的笑话并且将今天天气真好翻译成法语", # 测试多个智能体
]


for input_text in inputs:
    process_input(input_text)
    label = "例子分隔符"
    print(f"\n{'-'*10} {label} {'-'*10}\n")
    time.sleep(1)

>>> 接收到输入：'请用古诗的形式描述一下秋天的景色'
>>> 智能体选择器 '智能体选择器' 根据输入选择智能体：'请用古诗的形式描述一下秋天的景色'
>>> 独立智能体 '古诗智能体' 处理输入：'请用古诗的形式描述一下秋天的景色'
>>> 输出集成器 '输出集成器' 集成输出：'["独立智能体 '古诗智能体' (古诗专家)： \n \n秋风送爽入园林，\n落叶纷飞舞翩跹。\n碧水静流映晴空，\n红叶满山染秋颜。\n\n霜重露浓花渐稀，\n菊黄梅瘦映斜阳。\n诗心独醉寻幽径，\n秋意渐浓画中藏。"]'
>>> 最终输出: 


秋风送爽，园林深处，独立智能体与古诗智能体共同描绘出一幅秋日画卷。古诗专家以诗意的笔触，将秋天的景色娓娓道来：

秋风送爽入园林，
落叶纷飞舞翩跹。
碧水静流映晴空，
红叶满山染秋颜。

霜重露浓花渐稀，
菊黄梅瘦映斜阳。
诗心独醉寻幽径，
秋意渐浓画中藏。

此情此景，令人陶醉，仿佛置身于秋天的诗画之中。
>>> 反馈机制 '反馈机制' 记录反馈：{'timestamp': datetime.datetime(2025, 1, 8, 5, 2, 47, 314693), 'input_text': '请用古诗的形式描述一下秋天的景色', 'output': '\n\n秋风送爽，园林深处，独立智能体与古诗智能体共同描绘出一幅秋日画卷。古诗专家以诗意的笔触，将秋天的景色娓娓道来：\n\n秋风送爽入园林，\n落叶纷飞舞翩跹。\n碧水静流映晴空，\n红叶满山染秋颜。\n\n霜重露浓花渐稀，\n菊黄梅瘦映斜阳。\n诗心独醉寻幽径，\n秋意渐浓画中藏。\n\n此情此景，令人陶醉，仿佛置身于秋天的诗画之中。'}
>>> 智能体选择器 '智能体选择器' 根据反馈调整选择策略
>>> 智能体选择器 '智能体选择器': 没有发现错误，维持当前的选择策略

---------- 例子分隔符 ----------

>>> 接收到输入：'讲一个关于猫的笑话'
>>> 智能体选择器 '智能体选择器' 根据输入选择智能体：'讲一个关于猫的笑话'
>>> 独立智能体 '笑话智能体' 处理输入：'讲一个关于猫的笑话'
>>> 输出集成器 '输出集成器' 集成输出：'["独立智能体 '笑话智能体' (笑话专家)：\n \n有一天，一只猫走进了一家餐厅，对服务员说：“请

### 知识库驱动 Utilizing Knowledge Graphs with LLM's

![图片描述](https://drive.google.com/uc?id=112Epf5e4OteOGRREvCZUqQa92pSz2eoW)


In [19]:
# 直接拼接知识文本，相当于RAG

import os
import requests
import json

API_KEY = os.getenv('SILICONFLOW_API_KEY', 'sk-aerqsdefgpftbyerwomejvrrhgfkpgrbouhzwklhwbruuitc')
BASE_URL = "https://api.siliconflow.cn/v1/chat/completions"

# 定义大模型调用类
class CallLLM:
    def __init__(self):
      self.url = BASE_URL
      self.headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }

    def call(self, model_name, prompt):
        payload = {
            "model": model_name,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": prompt
                        }
                    ]
                }
            ],
            "stream": False,
            "max_tokens": 512,
            "stop": ["null"],
            "temperature": 0.7,
            "top_p": 0.7,
            "top_k": 50,
            "frequency_penalty": 0.5,
            "n": 1,
            "response_format": {"type": "text"}
        }

        try:
            response = requests.post(self.url, json=payload, headers=self.headers)
            response.raise_for_status()  # 检查 HTTP 错误状态码
            return response.json()  # 返回JSON
        except requests.exceptions.RequestException as e:
            print(f"API 请求错误: {e}")
            return None # API错误时，返回None

# 定义大模型类
class Model:
    def __init__(self, model_name, call_llm):
        self.model_name = model_name
        self.call_llm = call_llm # 依赖注入

    def generate_response(self, input_text):
        # 处理输入并生成中间输出
        response_data = self.call_llm.call(self.model_name, input_text)
        if response_data and "choices" in response_data and response_data["choices"]:
          return response_data["choices"][0]["message"]["content"]
        else:
            return "API调用出错或者没有返回文本"

def main():
    knowledge_graph = KnowledgeGraph()

    # 创建大模型调用器实例
    call_llm = CallLLM()
    llm_model = Model("THUDM/glm-4-9b-chat", call_llm)

    question = "张四丰是哪里人？"
    print("开始提问：", question)

    print("知识图谱更新前：")
    answer = llm_model.generate_response(question)
    print(answer)

    print("大模型回答错误，开始学习新知识...")

    text = """
    张四丰，1985年出生于中国浙江省杭州市的一个书香门第。
    自幼受到家庭浓厚文化氛围的熏陶，对传统文化和艺术有着深厚的兴趣。
    张四丰毕业于北京清华大学，主修信息技术工程专业，并在学期间辅修了中国哲学，这使得他不仅在技术领域有卓越的能力，同时对中国传统思想也有着深入的理解。
    他的毕业设计——一款结合虚拟现实与传统文化体验的应用程序，获得了当年的最佳创新奖。
    毕业后，张四丰加入了阿里巴巴集团，目前担任高级算法工程师一职。
    他在人工智能和大数据处理方面的工作受到了公司内外的高度评价，是团队中不可或缺的一员。
    除了工作上的成就，他还积极投身于公益事业，致力于通过科技改善偏远地区教育资源不足的问题。
    业余时间里，张四丰喜好广泛，尤其钟情于书法和太极拳。
    他常常在周末参加各种书法交流活动，并且在当地的一家武术馆教授太极拳，传承这一项中国传统的健身运动。
    此外，他也是一位旅行爱好者，足迹遍布世界各地，喜欢探索不同的文化和自然景观。
    """

    # 更新后的知识图谱可以用来辅助生成更好的提示词（prompt）给大模型
    updated_prompt = f"{question}\n基于以下知识:\n{text}"

    # LLM待补充: 实现从知识图谱中检索到相关的三元组，并拼接到updated_prompt

    print(updated_prompt)
    # 再次尝试回答问题，这次包含额外的知识作为上下文
    improved_answer = llm_model.generate_response(updated_prompt)
    print(f"再次回答问题: {improved_answer}")

if __name__ == "__main__":
    main()

开始提问： 张四丰是哪里人？
知识图谱更新前：

张四丰，本名张君宝，是明末清初的武术家、道士，被认为是太极拳的创始人之一。关于他的籍贯，有不同说法，但普遍认为他是湖北省麻城市人。不过，由于历史记载有限，具体的出生地仍有争议。
大模型回答错误，开始学习新知识...
张四丰是哪里人？
基于以下知识:

    张四丰，1985年出生于中国浙江省杭州市的一个书香门第。
    自幼受到家庭浓厚文化氛围的熏陶，对传统文化和艺术有着深厚的兴趣。
    张四丰毕业于北京清华大学，主修信息技术工程专业，并在学期间辅修了中国哲学，这使得他不仅在技术领域有卓越的能力，同时对中国传统思想也有着深入的理解。
    他的毕业设计——一款结合虚拟现实与传统文化体验的应用程序，获得了当年的最佳创新奖。
    毕业后，张四丰加入了阿里巴巴集团，目前担任高级算法工程师一职。
    他在人工智能和大数据处理方面的工作受到了公司内外的高度评价，是团队中不可或缺的一员。
    除了工作上的成就，他还积极投身于公益事业，致力于通过科技改善偏远地区教育资源不足的问题。
    业余时间里，张四丰喜好广泛，尤其钟情于书法和太极拳。
    他常常在周末参加各种书法交流活动，并且在当地的一家武术馆教授太极拳，传承这一项中国传统的健身运动。
    此外，他也是一位旅行爱好者，足迹遍布世界各地，喜欢探索不同的文化和自然景观。
    
再次回答问题: 
张四丰是浙江省杭州市人。根据提供的信息，他1985年出生于杭州，这是一个书香门第的家庭，自幼受到家庭文化氛围的熏陶。


In [8]:
import os
import requests
import json
import re

API_KEY = os.getenv('SILICONFLOW_API_KEY', 'sk-aerqsdefgpftbyerwomejvrrhgfkpgrbouhzwklhwbruuitc')
BASE_URL = "https://api.siliconflow.cn/v1/chat/completions"

# 定义大模型调用类
class CallLLM:
    def __init__(self):
      self.url = BASE_URL
      self.headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }

    def call(self, model_name, prompt):
        payload = {
            "model": model_name,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": prompt
                        }
                    ]
                }
            ],
            "stream": False,
            "max_tokens": 512,
            "stop": ["null"],
            "temperature": 0.7,
            "top_p": 0.7,
            "top_k": 50,
            "frequency_penalty": 0.5,
            "n": 1,
            "response_format": {"type": "text"}
        }

        try:
            response = requests.post(self.url, json=payload, headers=self.headers)
            response.raise_for_status()  # 检查 HTTP 错误状态码
            return response.json()  # 返回JSON
        except requests.exceptions.RequestException as e:
            print(f"API 请求错误: {e}")
            return None # API错误时，返回None

# 定义大模型类
class Model:
    def __init__(self, model_name, call_llm):
        self.model_name = model_name
        self.call_llm = call_llm # 依赖注入

    def generate_response(self, input_text):
        # 处理输入并生成中间输出
        response_data = self.call_llm.call(self.model_name, input_text)
        if response_data and "choices" in response_data and response_data["choices"]:
          return response_data["choices"][0]["message"]["content"]
        else:
            return "API调用出错或者没有返回文本"

class KnowledgeGraph:
    def __init__(self):
        self.graph = []

    def add_triple(self, subject, predicate, object):
        self.graph.append((subject, predicate, object))

    def query(self, text):
        related_triples = []
        for triple in self.graph:
            subject, predicate, obj = triple
            if subject in text:
                related_triples.append(f"({subject}, {predicate}, {obj})")
        return "\n".join(related_triples)

def extract_triples(text):
    # 这基于CallLLM从文本中提取三元组。
    call_llm = CallLLM()
    llm_model = Model("THUDM/glm-4-9b-chat", call_llm)
    prompt = f"请从以下文本中提取三元组，并以(subject, predicate, obj)数组形式返回: {text}"
    triples_str = llm_model.generate_response(prompt)

    # 定义一个正则表达式模式，用于匹配三元组
    pattern = re.compile(r'\(([^,]+),\s*([^,]+),\s*([^)]+)\)')

    try:
        # 使用正则表达式查找所有匹配的三元组
        matches = pattern.findall(triples_str)

        # 将匹配结果转换为三元组（元组），并去除可能存在的多余空格
        triples = [tuple(map(str.strip, match)) for match in matches]

        return triples
    except Exception as e:
        print(f"解析三元组时出错: {e}")
        return []

def main():
    knowledge_graph = KnowledgeGraph()

    # 创建大模型调用器实例
    call_llm = CallLLM()
    llm_model = Model("THUDM/glm-4-9b-chat", call_llm)

    question = "张四丰是哪里人？"
    print("开始提问：", question)

    print("\n知识图谱更新前：")
    answer = llm_model.generate_response(question)
    print(answer)

    print("\n大模型回答错误，开始学习新知识...")

    text = """
    张四丰，1985年出生于中国浙江省杭州市的一个书香门第。
    自幼受到家庭浓厚文化氛围的熏陶，对传统文化和艺术有着深厚的兴趣。
    张四丰毕业于北京清华大学，主修信息技术工程专业，并在学期间辅修了中国哲学，这使得他不仅在技术领域有卓越的能力，同时对中国传统思想也有着深入的理解。
    他的毕业设计——一款结合虚拟现实与传统文化体验的应用程序，获得了当年的最佳创新奖。
    毕业后，张四丰加入了阿里巴巴集团，目前担任高级算法工程师一职。
    他在人工智能和大数据处理方面的工作受到了公司内外的高度评价，是团队中不可或缺的一员。
    除了工作上的成就，他还积极投身于公益事业，致力于通过科技改善偏远地区教育资源不足的问题。
    业余时间里，张四丰喜好广泛，尤其钟情于书法和太极拳。
    他常常在周末参加各种书法交流活动，并且在当地的一家武术馆教授太极拳，传承这一项中国传统的健身运动。
    此外，他也是一位旅行爱好者，足迹遍布世界各地，喜欢探索不同的文化和自然景观。
    """

    triples = extract_triples(text)
    for triple in triples:
      knowledge_graph.add_triple(triple[0], triple[1], triple[2])
    print("\n知识已添加到知识图谱")

    # 更新后的知识图谱可以用来辅助生成更好的提示词（prompt）给大模型
    related_knowledge = knowledge_graph.query(question)
    updated_prompt = f"{question}\n基于以下知识:\n{related_knowledge}"
    print(updated_prompt)

    # 再次尝试回答问题，这次包含额外的知识作为上下文
    improved_answer = llm_model.generate_response(updated_prompt)
    print(f"\n再次回答问题: {improved_answer}")

if __name__ == "__main__":
    main()

开始提问： 张四丰是哪里人？

知识图谱更新前：

张四丰是中国湖北省十堰市郧西县人，他是明朝末年著名的道士、武术家、内丹家，也是道教武当派的创始人之一。他生于明嘉靖二十年（1540年），卒于明万历三十二年（1604年）。张四丰在道教史上有着重要的地位，他的武当派武术对后世影响深远。

大模型回答错误，开始学习新知识...
('张四丰', '出生于', '1985年')
('张四丰', '出生地', '中国浙江省杭州市')
('张四丰', '家庭背景', '书香门第')
('张四丰', '兴趣爱好', '传统文化和艺术')
('张四丰', '毕业', '北京清华大学')
('张四丰', '主修专业', '信息技术工程专业')
('张四丰', '辅修专业', '中国哲学')
('张四丰', '毕业设计获奖情况', '最佳创新奖')
('张四丰', '加入公司', '阿里巴巴集团')
('张四丰', '职位', '高级算法工程师')
('张四丰', '工作评价', '高度评价')
('张四丰', '公益事业方向', '改善偏远地区教育资源不足问题')
('张四丰', '业余爱好1', '书法')
('张四丰', '业余爱好2', '太极拳')
('张四丰', '行为活动1', '参加书法交流活动')
('张四丰', '行为活动2', '教授太极拳')

知识已添加到知识图谱
张四丰是哪里人？
基于以下知识:
(张四丰, 出生于, 1985年)
(张四丰, 出生地, 中国浙江省杭州市)
(张四丰, 家庭背景, 书香门第)
(张四丰, 兴趣爱好, 传统文化和艺术)
(张四丰, 毕业, 北京清华大学)
(张四丰, 主修专业, 信息技术工程专业)
(张四丰, 辅修专业, 中国哲学)
(张四丰, 毕业设计获奖情况, 最佳创新奖)
(张四丰, 加入公司, 阿里巴巴集团)
(张四丰, 职位, 高级算法工程师)
(张四丰, 工作评价, 高度评价)
(张四丰, 公益事业方向, 改善偏远地区教育资源不足问题)
(张四丰, 业余爱好1, 书法)
(张四丰, 业余爱好2, 太极拳)
(张四丰, 行为活动1, 参加书法交流活动)
(张四丰, 行为活动2, 教授太极拳)

再次回答问题: 
张四丰是浙江省杭州市人。


### 双模评估 Red & Blue Team Dual-Model Evaluation

![图片描述](https://drive.google.com/uc?id=16kGxyCgNB9pRpHx5LOBVcQ69rRXk0hJF)


In [26]:
# 导入必要的库
# from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
# import requests
import json
import os

API_KEY = os.getenv('SILICONFLOW_API_KEY', 'sk-aerqsdefgpftbyerwomejvrrhgfkpgrbouhzwklhwbruuitc')
BASE_URL = "https://api.siliconflow.cn/v1/chat/completions"

# 定义大模型调用类
class CallLLM:
    def __init__(self):
      self.url = BASE_URL
      self.headers = {
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json"
        }

    def call(self, model_name, prompt):
        payload = {
            "model": model_name,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": prompt
                        }
                    ]
                }
            ],
            "stream": False,
            "max_tokens": 512,
            "stop": ["null"],
            "temperature": 0.7,
            "top_p": 0.7,
            "top_k": 50,
            "frequency_penalty": 0.5,
            "n": 1,
            "response_format": {"type": "text"}
        }

        try:
            response = requests.post(self.url, json=payload, headers=self.headers)
            response.raise_for_status()  # 检查 HTTP 错误状态码
            return response.json()  # 返回JSON
        except requests.exceptions.RequestException as e:
            print(f"API 请求错误: {e}")
            return None # API错误时，返回None

# 定义大模型类
class Model:
    def __init__(self, model_name, call_llm):
        self.model_name = model_name
        self.call_llm = call_llm # 依赖注入

    def generate_response(self, input_text):
        # 处理输入并生成中间输出
        response_data = self.call_llm.call(self.model_name, input_text)
        if response_data and "choices" in response_data and response_data["choices"]:
          return response_data["choices"][0]["message"]["content"]
        else:
            return "API调用出错或者没有返回文本"


# 主函数
def main():

    # 创建大模型调用器实例
    call_llm = CallLLM()

    # 创建学生大模型实例
    student_model = Model("THUDM/chatglm3-6b", call_llm)

    # 创建老师大模型实例
    teacher_model = Model("THUDM/glm-4-9b-chat", call_llm)

    # 获取用户输入
    user_input = "为南京大学写一首诗。"

    # 学生大模型处理
    intermediate_output = student_model.generate_response(user_input)
    print("\n中间输出:", intermediate_output)

    # 老师大模型评估

    eval_prompt = """
    你是一位精通古诗词的专家，你的任务是评估学生AI模型创作的诗歌（包括古诗或者现代诗），并给出专业的修改意见，要求学生模型以古诗的形式改进。

    学生的诗歌如下：
    ---
    {intermediate_output}
    ---

    你需要：
    1.  **格律审查：**
        *  如果学生诗歌是古诗，你需要严格检查格律是否正确，包括平仄、押韵、对仗等。请详细指出不符合格律的地方。
        *  如果学生诗歌是现代诗，则忽略格律审查，但请评估其韵律和节奏感。

    2.  **意境评估：**
        *  评估学生诗歌的意境是否深远，是否能表达情感。如果意境不够，请指出并提出改进建议。
        * 评估学生诗歌是否符合诗歌的主题，如果主题不明确或者偏离，请指出。

    3.  **用词分析：**
        *  评估学生诗歌的用词是否准确、精炼，是否有用典，并评估用典是否恰当。如果用词不当，请给出修改建议。
        *  评估学生诗歌用字是否符合古诗词用字习惯，是否有违和感。

    4.  **艺术性评估：**
        * 评估学生诗歌是否具有艺术性，是否有想象力和创造力。如果缺乏艺术性，请指出并给出改进建议。

    5.  **修改指导：**
        *  根据以上评估，给学生模型提供一个明确的修改方向，并明确要求学生模型以古诗的形式进行改进，包括给出新的韵脚和对仗方式（如果需要）。
        *  如果学生模型不是创作古诗而是现代诗，则需要明确提示学生模型用古诗形式进行改进。

    请使用以下格式输出你的评估结果和改进建议：

    格律审查：（详细的格律问题，如果没有，请写 “无” 。如果不是古诗，写 “现代诗，无需格律审查”）
    意境评估：（对意境、情感和主题的评价）
    用词分析：（对用词的评价和建议）
    艺术性评估：（对艺术性的评价和建议）
    修改指导：（明确的修改方向和古诗形式改进提示，包括新的韵脚或者对仗方式，如果需要。）
    """

    eval_prompt2 = """
    你是一位资深的 AI 导师，你的任务是评估学生的 AI 模型生成的文本，并提供改进建议。
    学生的输出文本如下：
    ---
    {intermediate_output}
    ---
    你需要：
    1.  评估学生模型的输出是否完整、准确地理解了用户的需求。如果学生模型未能完全理解，请指出具体缺失的部分。
    2.  评估学生模型生成的文本的逻辑是否清晰、流畅。如果文本存在逻辑不清晰或者表达不流畅的地方，请指出具体问题，并给出优化建议。
    3. 评估学生模型生成的文本是否简洁明了，避免冗余或含糊不清的表达。 如果文本表达不简洁，请给出修改建议。
    4.  评估学生模型生成的文本是否符合预期格式，例如，如果期望输出JSON，则检查格式是否正确。
    5. 如果学生模型输出了代码，请评估代码是否符合语法、逻辑正确、效率是否可以改进。
    6. 总结以上评估，并给学生模型提供一个具体的改进方向的提示词，以帮助学生模型更好地完成任务。

    请使用以下格式输出你的评估结果和改进建议：

    评估结果：
    1.  （是否完整、准确）
    2.  （逻辑是否清晰、流畅）
    3. (表达是否简洁明了）
    4.  （格式是否正确）
    5.  (代码是否符合语法逻辑和效率要求)

    改进提示词： （具体的、可执行的提示词）
    """
    evaluation_result = teacher_model.generate_response(eval_prompt.format(intermediate_output=intermediate_output))
#    evaluation_result = teacher_model.generate_response(f"用户的输入是：{user_input}\n基于用户输入，指出以下回复的不足：{intermediate_output}")
    print(evaluation_result)

    # 根据评估结果调整和改进模型
    revise_prompt = """
    你是一个正在学习中的AI学生模型，你接收到了老师模型的评估结果，以下是老师的反馈：
    ---
    {evaluation_result}
    ---

    你需要：
    1.  仔细阅读并理解老师模型的所有评估意见，包括指出的问题和改进建议。
    2.  针对老师指出的问题，分析你之前的输出为什么会产生这些问题，并思考如何避免类似问题的发生。
    3.  根据老师的改进建议，修改你的输出策略。
    4.  基于修改后的输出策略，重新生成文本。
    5.  特别注意，请根据老师的反馈来决定输出的内容和格式，老师认为重要的地方请优先处理。
    6. 确保你生成的文本是清晰、流畅，准确的。

    请使用以下格式输出你的改进文本：
    改进后的输出：(改进后的输出文本)
    """
    revised_output = student_model.generate_response(revise_prompt.format(evaluation_result=evaluation_result))
#   revised_output = student_model.generate_response(f"{intermediate_output}\n以上段落有以下不足，请修改：\n{evaluation_result}")
    print("\n最终输出:", revised_output)

if __name__ == "__main__":
    main()


中间输出:  
 南京大学,钟灵毓秀,
百年历史,辉煌不朽。

紫金山下,绿树成荫,
古典建筑,古朴典雅。

学术殿堂,人才辈出,
大师云集,智慧闪耀。

严谨治学,勤奋好学,
学子们,奋发向前。

南京大学,校训“严谨、勤奋、求实、创新”,
百年校庆,庆祝辉煌。

格律审查：
学生的诗歌并非严格意义上的古诗，因为它没有遵循古典诗歌的平仄、押韵和对仗等格律规则。因此，这里不进行格律审查。

意境评估：
诗歌意境较为直接，表达了南京大学的美丽景色和学术氛围。但整体上缺乏深度和层次感，未能充分展现南京大学的历史厚重感和文化底蕴。

用词分析：
用词准确，但较为平淡，缺乏生动性和感染力。例如，“钟灵毓秀”、“辉煌不朽”等词汇虽然常用，但在此处显得有些俗套。

艺术性评估：
诗歌艺术性不足，缺乏想象力和独特的表达方式。整体上较为平铺直叙，未能通过丰富的意象和修辞手法提升诗歌的艺术效果。

修改指导：
为了提升诗歌的艺术性和意境深度，以下是对学生模型进行古诗形式改进的建议：

1. 增加意象的丰富性，运用比喻、拟人等修辞手法。
2. 调整韵脚，使诗歌更具节奏感。
3. 改进对仗方式，使诗句更加工整。

改进后的古诗示例：

钟灵毓秀南京城，
百年学府史辉煌。
紫金山下绿荫浓，
古典建筑古韵长。

学术殿堂人才盛，
大师云集智慧光。
严谨治学勤奋志，
学子奋发勇向前。

校训严谨勤求实，
百年校庆共欢畅。
钟山之巅望未来，
金陵城下谱华章。

新的韵脚：长（ang）、光（ang）、向（iang）、实（i）、畅（ang）、章（ang）
对仗方式：钟灵毓秀南京城 / 百年学府史辉煌
          紫金山下绿荫浓 / 古典建筑古韵长
          学术殿堂人才盛 / 大师云集智慧光
          严谨治学勤奋志 / 学子奋发勇向前
          校训严谨勤求实 / 百年校庆共欢畅
          钟山之巅望未来 / 金陵城下谱华章

最终输出:  
 改进后的输出：

钟灵毓秀南京城，
百年学府史辉煌。
紫金山下绿荫浓，
古典建筑古韵长。

学术殿堂人才盛，
大师云集智慧光。
严谨治学勤奋志，
学子奋发勇向前。
校训严谨勤求实，
百年校庆共欢畅。
钟山之巅望未来，
金陵城下谱华章。
