In [35]:
import requests
from datasets import load_dataset
from tqdm import tqdm
import time
import json

class DifyDatasetManager:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.dify.ai/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def create_dataset(self, name: str) -> str:
        """创建新的 Dify 数据集"""
        url = f"{self.base_url}/datasets"
        payload = {
            "name": name,
            "permission": "only_me"
        }
        
        response = requests.post(url, headers=self.headers, json=payload)
        response.raise_for_status()
        return response.json()['id']
    
    def create_document(self, dataset_id: str, title: str, content: str) -> bool:
        """通过文本创建文档"""
        url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"
        
        payload = {
            "name": title,
            "text": content,
            "indexing_technique": "high_quality",
            "process_rule": {"mode": "automatic"}
        }
        
        try:
            response = requests.post(url, headers=self.headers, json=payload)
            if response.status_code == 200:
                return True
            else:
                print(f"Error creating document: {response.text}")
                return False
        except Exception as e:
            print(f"Exception during document creation: {str(e)}")
            return False
    
    def retrieve(self, dataset_id: str, query: str, top_k: int = 1) -> list:
        """从数据集中检索文档"""
        url = f"{self.base_url}/datasets/{dataset_id}/retrieve"
        
        payload = {
            "query": query,
            "retrieval_model": {
                "search_method": "semantic_search",
                "reranking_enable": False,
                "top_k": top_k
            }
        }
        
        response = requests.post(url, headers=self.headers, json=payload)
        response.raise_for_status()
        return response.json().get('documents', [])

# 初始化
DIFY_API_KEY = "dataset-vGo3aB5ogicKiIsdWQ5FmBE2"
dify_manager = DifyDatasetManager(DIFY_API_KEY)

# 加载数据集
print("Loading FiQA dataset...")
fiqa_corpus = load_dataset("BeIR/fiqa", "corpus")
fiqa_queries = load_dataset("BeIR/fiqa", "queries")
fiqa_qrels = load_dataset("BeIR/fiqa-qrels")

# 创建新的数据集
dataset_name = "FiQA_Evaluation_Dataset"
print(f"\nCreating dataset: {dataset_name}")
dataset_id = dify_manager.create_dataset(dataset_name)
print(f"Created dataset with ID: {dataset_id}")

# 上传前20个文档
print("\nUploading first 20 documents...")
for idx, doc in enumerate(tqdm(corpus_data[:20])):
    title = f"Document_{idx+1}"
    success = dify_manager.create_document(
        dataset_id=dataset_id,
        title=title,
        content=doc['text']
    )
    time.sleep(1)  # Rate limiting

# 使用前20个查询评估检索效果
print("\nEvaluating retrieval performance...")
results = []

for idx, query in enumerate(tqdm(queries_data[:20])):
    query_text = query['text']
    
    # 获取检索结果
    retrieved_docs = dify_manager.retrieve(
        dataset_id=dataset_id,
        query=query_text,
        top_k=3
    )
    
    results.append({
        'query_id': idx + 1,
        'query': query_text,
        'retrieved_docs': retrieved_docs
    })
    time.sleep(1)  # Rate limiting

# 保存评估结果
print("\nSaving results...")
with open('dify_evaluation_results.json', 'w') as f:
    json.dump(results, f, indent=2)

# 打印示例结果
print("\nSample Results:")
for result in results[:2]:
    print(f"\nQuery: {result['query']}")
    print("Retrieved documents:")
    for idx, doc in enumerate(result['retrieved_docs'], 1):
        print(f"{idx}. Score: {doc.get('score', 'N/A')}")
        print(f"   Text: {doc.get('text', '')[:200]}...")

Loading FiQA dataset...

Creating dataset: FiQA_Evaluation_Dataset


HTTPError: 409 Client Error: CONFLICT for url: https://api.dify.ai/v1/datasets

In [23]:
from datasets import load_dataset
import pandas as pd

fiqa_corpus = load_dataset("BeIR/fiqa", "corpus")
fiqa_queries = load_dataset("BeIR/fiqa", "queries")
fiqa_qrels = load_dataset("BeIR/fiqa-qrels")

Using the latest cached version of the dataset since BeIR/fiqa couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'corpus' at /Users/yingxuanli/.cache/huggingface/datasets/BeIR___fiqa/corpus/0.0.0/e31855201132ea2a257d7df77c828d7c02427521 (last modified on Sun Mar  9 16:03:04 2025).


README.md:   0%|          | 0.00/14.0k [00:00<?, ?B/s]

train.tsv:   0%|          | 0.00/210k [00:00<?, ?B/s]

dev.tsv:   0%|          | 0.00/18.3k [00:00<?, ?B/s]

test.tsv:   0%|          | 0.00/25.3k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/14166 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1238 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1706 [00:00<?, ? examples/s]

In [33]:
import requests
from datasets import load_dataset
from tqdm import tqdm
import time
from datetime import datetime

class DifyDatasetManager:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.dify.ai/v1"
        # Add Content-Type header
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def upload_document(self, dataset_id: str, title: str, content: str) -> bool:
        """上传单个文档到数据集"""
        # 使用正确的API端点
        url = f"{self.base_url}/datasets/{dataset_id}/documents"
        
        payload = {
            "name": title,
            "text": content
        }
        
        try:
            response = requests.post(url, headers=self.headers, json=payload)
            if response.status_code == 200:
                return True
            else:
                print(f"Error uploading document: {response.text}")
                print(f"Status code: {response.status_code}")
                return False
        except Exception as e:
            print(f"Exception during upload: {str(e)}")
            return False

def prepare_documents_file(corpus_data):
    """准备文档文件"""
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f:
        for doc in corpus_data:
            title = doc.get('title', '')
            text = doc.get('text', '')
            f.write(f"# {title}\n{text}\n###\n")
        return f.name

# 初始化
DIFY_API_KEY = "dataset-vGo3aB5ogicKiIsdWQ5FmBE2"
dify_manager = DifyDatasetManager(DIFY_API_KEY)

# 加载 FiQA 数据集
print("Loading FiQA dataset...")
fiqa_corpus = load_dataset("BeIR/fiqa", "corpus")

# 检查数据集结构
print("\nDataset structure:")
for split, dataset in fiqa_corpus.items():
    print(f"Split '{split}' contains {len(dataset)} documents")
    print(f"Features: {list(dataset.features.keys())}")

# 创建新的数据集
dataset_name = "FiQA_Financial_Dataset"
print(f"\nCreating dataset: {dataset_name}")
try:
    dataset_id = dify_manager.create_dataset(dataset_name)
    print(f"Created dataset with ID: {dataset_id}")
except Exception as e:
    print(f"Failed to create dataset: {e}")
    raise

# 上传所有文档
print("\nUploading documents...")
success_count = 0
total_docs = sum(len(dataset) for dataset in fiqa_corpus.values())

# 遍历所有分片中的文档
for split, dataset in fiqa_corpus.items():
    print(f"\nProcessing split: {split}")
    
    for idx, doc in enumerate(tqdm(dataset)):
        if idx % 10 == 0:  # Add delay every 10 requests to avoid rate limits
            time.sleep(1)
        
        # 安全地获取文档字段
        doc_id = doc['_id']
        doc_title = doc.get('title', '')  # 使用 get() 方法提供默认值
        doc_text = doc.get('text', '')
        
        title = f"Document_{split}_{doc_id}"
        
        success = dify_manager.upload_document(
            dataset_id=dataset_id,
            title=title,
            content=doc_text
        )
        
        if success:
            success_count += 1
            if success_count % 50 == 0:
                print(f"\nProgress: {success_count}/{total_docs} documents uploaded")

# 打印最终结果
print("\nUpload Summary:")
print(f"Dataset ID: {dataset_id}")
print(f"Total documents: {total_docs}")
print(f"Successfully uploaded: {success_count}")
print(f"Failed uploads: {total_docs - success_count}")

Loading FiQA dataset...

Dataset structure:
Split 'corpus' contains 57638 documents
Features: ['_id', 'title', 'text']

Creating dataset: FiQA_Financial_Dataset
Failed to create dataset: 'DifyDatasetManager' object has no attribute 'create_dataset'


AttributeError: 'DifyDatasetManager' object has no attribute 'create_dataset'

In [None]:
import requests
import json
from typing import Dict, List
import pandas as pd
from tqdm import tqdm

class DifyRetriever:
    def __init__(self, api_key: str, dataset_id: str):
        self.api_key = api_key
        self.dataset_id = dataset_id
        self.base_url = "https://api.dify.ai/v1"
        
    def retrieve(self, query: str, top_k: int = 10) -> List[Dict]:
        """从 Dify 知识库检索文档"""
        url = f"{self.base_url}/datasets/{self.dataset_id}/retrieve"
        
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        payload = {
            "query": query,
            "retrieval_model": {
                "search_method": "semantic_search",
                "reranking_enable": True,
                "top_k": top_k
            }
        }
        
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        return response.json().get("data", [])

# 加载 FiQA 数据集的查询
from datasets import load_dataset
fiqa_queries = load_dataset("BeIR/fiqa", "queries")

# 转换为更易处理的格式
queries = {
    row['_id']: row['text'] 
    for row in fiqa_queries['train']
}

# Dify API 配置
DIFY_API_KEY = "your_api_key_here"
DATASET_ID = "your_dataset_id_here"
retriever = DifyRetriever(DIFY_API_KEY, DATASET_ID)

# 执行检索评估
def evaluate_retrieval(queries: Dict[str, str], top_k: int = 10):
    results = {}
    for query_id, query_text in tqdm(queries.items(), desc="Evaluating queries"):
        try:
            retrieved_docs = retriever.retrieve(query_text, top_k=top_k)
            results[query_id] = {
                'query': query_text,
                'retrieved': retrieved_docs
            }
        except Exception as e:
            print(f"Error processing query {query_id}: {e}")
    return results

# 运行评估
print("开始评估...")
evaluation_results = evaluate_retrieval(queries, top_k=10)

# 显示评估结果
print("\n评估统计:")
print(f"总查询数: {len(queries)}")
print(f"成功检索数: {len(evaluation_results)}")

# 显示样例结果
sample_query_id = list(evaluation_results.keys())[0]
print("\n样例查询结果:")
print(f"Query: {queries[sample_query_id]}")
print("Top retrieved documents:")
for i, doc in enumerate(evaluation_results[sample_query_id]['retrieved'][:3], 1):
    print(f"\n{i}. Score: {doc.get('score', 'N/A')}")
    print(f"Text: {doc.get('text', '')[:200]}...")

In [10]:

# Convert corpus to dictionary format
corpus = {}
for idx, (doc_id, title, text) in enumerate(zip(
    fiqa_corpus['test']['_id'],  # Using 'test' instead of 'train'
    fiqa_corpus['test']['title'],
    fiqa_corpus['test']['text']
)):
    corpus[doc_id] = {
        "title": title,
        "text": text
    }

# Convert queries to dictionary format
queries = {}
for query_id, query_text in zip(
    fiqa_queries['test']['_id'],
    fiqa_queries['test']['text']
):
    queries[query_id] = query_text

# Print sample data
print("Sample Document:")
sample_doc_id = list(corpus.keys())[0]
print(f"\nDocument ID: {sample_doc_id}")
print(f"Title: {corpus[sample_doc_id]['title']}")
print(f"Text: {corpus[sample_doc_id]['text'][:200]}...")

print("\nSample Query:")
sample_query_id = list(queries.keys())[0]
print(f"\nQuery ID: {sample_query_id}")
print(f"Query: {queries[sample_query_id]}")

# Print statistics
print("\nDataset Statistics:")
print(f"Number of documents: {len(corpus)}")
print(f"Number of queries: {len(queries)}")



KeyError: 'test'

In [None]:
#开源的企业知识库评测数据集
#query和知识库中召回答案的相似度和排序
#调用dify的知识库查询接口
#调用灵搭的知识库查询接口

In [2]:
import pandas as pd
from typing import List, Dict
import numpy as np
from sklearn.metrics import ndcg_score

class RetrievalEvaluator:
    def __init__(self, queries: List[str], documents: List[str], relevance: Dict):
        self.queries = queries
        self.documents = documents
        self.relevance = relevance
        
    def evaluate_system(self, retrieval_system, k=10):
        """评估检索系统"""
        metrics = {
            'precision@k': [],
            'recall@k': [],
            'ndcg@k': []
        }
        
        for query_id, query in enumerate(self.queries):
            # 获取系统返回结果
            retrieved_docs = retrieval_system.search(query, top_k=k)
            
            # 计算评估指标
            true_relevance = self.get_relevance(query_id)
            metrics['precision@k'].append(self.precision_at_k(retrieved_docs, true_relevance, k))
            metrics['recall@k'].append(self.recall_at_k(retrieved_docs, true_relevance, k))
            metrics['ndcg@k'].append(self.ndcg_at_k(retrieved_docs, true_relevance, k))
            
        return {metric: np.mean(values) for metric, values in metrics.items()}

In [5]:
import requests
from evaluation_data import RetrievalEvaluator

class DifyRetriever:
    def __init__(self, api_key):
        self.api_key = api_key
        self.api_url = "https://api.dify.ai/v1/query"
        
    def search(self, query: str, top_k: int = 10):
        headers = {"Authorization": f"Bearer {self.api_key}"}
        response = requests.post(
            self.api_url,
            json={"query": query, "top_k": top_k},
            headers=headers
        )
        return response.json()["documents"]

class EnterpriseRetriever:
    def __init__(self, knowledge_base_path):
        self.kb_path = knowledge_base_path
        # 初始化您的企业知识库检索系统
        
    def search(self, query: str, top_k: int = 10):
        # 实现您的检索逻辑
        pass

# 使用示例
def main():
    # 加载测试数据
    evaluator = RetrievalEvaluator(
        queries=load_test_queries(),
        documents=load_test_documents(),
        relevance=load_relevance_judgments()
    )
    
    # 初始化检索系统
    dify_retriever = DifyRetriever(api_key="dataset-vGo3aB5ogicKiIsdWQ5FmBE2")
    enterprise_retriever = EnterpriseRetriever("path_to_kb")
    
    # 评估两个系统
    dify_results = evaluator.evaluate_system(dify_retriever)
    enterprise_results = evaluator.evaluate_system(enterprise_retriever)
    
    # 输出对比结果
    print("Dify System Results:", dify_results)
    print("Enterprise System Results:", enterprise_results)

ModuleNotFoundError: No module named 'evaluation_data'

In [None]:
def AgentQuery(queryJson):
    host = "https://uat.agentspro.cn/"
    url = host + "openapi/agent/chat/completions/v1"

    # 请求头通常包含一些如内容类型、认证信息等
    headers = {
        "token":"ff19bd0cabcf4dcd83898066c1be4eba.ZG5SR1e3CqcXYSMUKIvHPkcVan0SKPMQ"
    }

    # 请求体是发送到服务器的数据，以下是一个JSON格式的示例
    payload = {
        "agentId":"ff19bd0cabcf4dcd83898066c1be4eba",
        "userChatInput": queryJson
    }  

    # 使用requests库发送POST请求
    response = requests.post(url, json=payload, headers=headers)

    # 打印出响应内容
    json_data = response.text
    data = json.loads(json_data)
    content = data['choices'][-1]['content']
    return(content)