### 1. 把刚刚做的 embeddings 的文件上传到 elasticsearch

In [4]:
import json
import os
from typing import List, Dict, Any
from elasticsearch import Elasticsearch
import logging
from dotenv import load_dotenv
from tqdm import tqdm
import traceback

# Set up logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

class ElasticsearchUploader:
    def __init__(self, 
                 es_host: str = 'localhost',
                 es_port: int = 9200,
                 index_name: str = 'ped_literature_3_files_01_01_2025'):
        """Initialize Elasticsearch connection."""
        self.es = Elasticsearch(
            hosts=[f"http://{es_host}:{es_port}"],
            basic_auth=('elastic', os.getenv('ES_PASSWORD', 'Lyx19930115'))
        )
        self.index_name = index_name

    def create_es_mapping(self):
        """Create Elasticsearch index with appropriate mapping."""
        mapping = {
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 1
            },
            "mappings": {
                "properties": {
                    "source": {
                        "properties": {
                            "title": {"type": "text"},
                            "page_number": {"type": "integer"}
                        }
                    },
                    "original_text": {"type": "text"},
                    "embedding": {
                        "type": "dense_vector",
                        "dims": 1536,
                        "similarity": "cosine"
                    },
                    "relationships": {
                        "type": "nested",
                        "properties": {
                            "subject": {"type": "keyword"},
                            "predicate": {"type": "keyword"},
                            "object": {"type": "keyword"}
                        }
                    }
                }
            }
        }
        
        try:
            # Delete index if it exists
            if self.es.indices.exists(index=self.index_name):
                logger.info(f"Deleting existing index '{self.index_name}'")
                self.es.indices.delete(index=self.index_name)
            
            # Create new index
            self.es.indices.create(index=self.index_name, body=mapping)
            logger.info(f"Created index '{self.index_name}' with mapping")
            
        except Exception as e:
            logger.error(f"Error creating index mapping: {str(e)}")
            raise

    def bulk_index_documents(self, documents: List[Dict[str, Any]], batch_size: int = 100):
        """Index documents in bulk to Elasticsearch."""
        try:
            total_batches = (len(documents) + batch_size - 1) // batch_size
            
            for i in tqdm(range(0, len(documents), batch_size), desc="Indexing documents", total=total_batches):
                batch = documents[i:i + batch_size]
                bulk_data = []
                
                for doc in batch:
                    # Add index action
                    bulk_data.extend([
                        {"index": {"_index": self.index_name}},
                        doc
                    ])
                
                if bulk_data:
                    response = self.es.bulk(operations=bulk_data, refresh=True)
                    
                    # Check for errors
                    if response.get('errors', False):
                        logger.warning(f"Some errors occurred in batch {i//batch_size + 1}")
                        for item in response['items']:
                            if 'error' in item.get('index', {}):
                                logger.error(f"Error: {item['index']['error']}")
                    else:
                        logger.debug(f"Successfully indexed batch {i//batch_size + 1}")
            
            logger.info(f"Successfully indexed {len(documents)} documents")
            
        except Exception as e:
            logger.error(f"Error during bulk indexing: {str(e)}")
            raise

    def verify_index(self):
        """Verify the index exists and return basic stats."""
        try:
            stats = self.es.indices.stats(index=self.index_name)
            doc_count = stats['indices'][self.index_name]['total']['docs']['count']
            
            # Get a sample document to verify structure
            sample = self.es.search(
                index=self.index_name,
                size=1,
                query={"match_all": {}}
            )
            if sample['hits']['hits']:
                rel_count = len(sample['hits']['hits'][0]['_source'].get('relationships', []))
                logger.info(f"Sample document contains {rel_count} relationships")
            
            logger.info(f"Index '{self.index_name}' contains {doc_count} documents")
            return doc_count
        except Exception as e:
            logger.error(f"Error verifying index: {str(e)}")
            raise

    def process_embeddings_file(self, input_path: str, batch_size: int = 100):
        """Process the embeddings file and upload to Elasticsearch."""
        try:
            # Load embeddings file
            logger.info(f"Loading embeddings from {input_path}")
            with open(input_path, 'r', encoding='utf-8') as f:
                documents = json.load(f)
            
            logger.info(f"Loaded {len(documents)} source documents")
            total_relationships = sum(len(doc.get('relationships', [])) for doc in documents)
            logger.info(f"Total relationships across all documents: {total_relationships}")
            
            # Create/recreate index with mapping
            self.create_es_mapping()
            
            # Upload documents
            self.bulk_index_documents(documents, batch_size)
            
            # Verify upload
            final_count = self.verify_index()
            
            return {
                "success": True,
                "documents_indexed": final_count,
                "total_relationships": total_relationships,
                "index_name": self.index_name
            }
            
        except Exception as e:
            logger.error(f"Error processing embeddings file: {str(e)}")
            raise

    def delete_index(self):
        """Delete the specified index if it exists."""
        try:
            if self.es.indices.exists(index=self.index_name):
                self.es.indices.delete(index=self.index_name)
                logger.info(f"Successfully deleted index '{self.index_name}'")
            else:
                logger.info(f"Index '{self.index_name}' does not exist")
        except Exception as e:
            logger.error(f"Error deleting index: {str(e)}")
            raise

def main():
    """Main function to upload embeddings to Elasticsearch."""
    try:
        uploader = ElasticsearchUploader()
        
        # Verify Elasticsearch connection
        if not uploader.es.ping():
            raise ConnectionError("Could not connect to Elasticsearch")
            
        input_path = r"D:\Dropbox\29. Ampelos\24_PED\PED_PITT_Aaron\backend\PDFs_Share\pdf_json_output\scd_entities_relationships_total_deduplicated_with_embeddings.json"
        
        # Process and upload
        result = uploader.process_embeddings_file(input_path)
        logger.info(f"Processing complete: {result}")
        return result
    
    except Exception as e:
        logger.error(f"Error in main: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        raise

if __name__ == "__main__":
    main()

2025-01-03 10:39:20,934 - DEBUG - Starting new HTTP connection (1): localhost:9200
2025-01-03 10:39:20,964 - DEBUG - http://localhost:9200 "HEAD / HTTP/1.1" 200 0
2025-01-03 10:39:20,965 - INFO - HEAD http://localhost:9200/ [status:200 duration:0.031s]
2025-01-03 10:39:20,966 - INFO - Loading embeddings from D:\Dropbox\29. Ampelos\24_PED\PED_PITT_Aaron\backend\PDFs_Share\pdf_json_output\scd_entities_relationships_total_deduplicated_with_embeddings.json
2025-01-03 10:39:24,022 - INFO - Loaded 2131 source documents
2025-01-03 10:39:24,023 - INFO - Total relationships across all documents: 0
2025-01-03 10:39:24,027 - DEBUG - http://localhost:9200 "HEAD /ped_literature_3_files_01_01_2025 HTTP/1.1" 404 0
2025-01-03 10:39:24,028 - INFO - HEAD http://localhost:9200/ped_literature_3_files_01_01_2025 [status:404 duration:0.004s]
2025-01-03 10:39:24,795 - DEBUG - http://localhost:9200 "PUT /ped_literature_3_files_01_01_2025 HTTP/1.1" 200 0
2025-01-03 10:39:24,796 - INFO - PUT http://localhost:92

### 3. 利用刚刚做的带有 embeddings 的文件进行查询

In [5]:
import json
import os
from typing import List, Dict, Any
from elasticsearch import Elasticsearch
from openai import OpenAI
from dotenv import load_dotenv
import logging
from tqdm import tqdm
import traceback

# Set up logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

class ElasticsearchQuerier:
    def __init__(self,
                 es_host: str = 'localhost',
                 es_port: int = 9200,
                 index_name: str = 'ped_literature_3_files_01_01_2025'):
        """Initialize Elasticsearch connection and OpenAI client."""
        try:
            self.es = Elasticsearch(
                hosts=[f"http://{es_host}:{es_port}"],
                basic_auth=('elastic', os.getenv('ES_PASSWORD', 'Lyx19930115'))
            )
            if not self.es.ping():
                raise ConnectionError("Could not connect to Elasticsearch")
            
            self.openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
            if not os.getenv("OPENAI_API_KEY"):
                raise ValueError("OpenAI API key not found in environment variables")
                
            self.index_name = index_name
            logger.info(f"Successfully initialized ElasticsearchQuerier with index: {index_name}")
            
        except Exception as e:
            logger.error(f"Error initializing ElasticsearchQuerier: {str(e)}")
            raise

    def get_embedding(self, text: str) -> list:
        """Get embedding for a query text."""
        try:
            response = self.openai_client.embeddings.create(
                model="text-embedding-ada-002",
                input=text
            )
            logger.debug(f"Successfully generated embedding for text: {text[:50]}...")
            return response.data[0].embedding
            
        except Exception as e:
            logger.error(f"Error generating embedding: {str(e)}")
            raise

    def get_first_n_words(self, text: str, n: int = 20) -> str:
        """Get first n words from a text."""
        try:
            words = text.split()
            return ' '.join(words[:n]) + ('...' if len(words) > n else '')
        except Exception as e:
            logger.error(f"Error processing text preview: {str(e)}")
            return text[:100] + '...'  # Fallback to character-based truncation

    def keyword_search(self, query_text: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """Basic keyword search focused only on original_text field."""
        try:
            query = {
                "size": top_k,
                "query": {
                    "match": {
                        "original_text": {
                            "query": query_text,
                            "operator": "or"
                        }
                    }
                }
            }
            
            response = self.es.search(index=self.index_name, body=query)
            logger.debug(f"Keyword search completed for query: {query_text}")
            return self._process_results(response)
            
        except Exception as e:
            logger.error(f"Error in keyword search: {str(e)}")
            raise

    def semantic_search(self, query_text: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """Enhanced semantic search with error handling."""
        try:
            query_embedding = self.get_embedding(query_text)
            
            query = {
                "size": top_k,
                "query": {
                    "script_score": {
                        "query": {"match_all": {}},
                        "script": {
                            "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
                            "params": {"query_vector": query_embedding}
                        }
                    }
                }
            }
            
            response = self.es.search(index=self.index_name, body=query)
            logger.debug(f"Semantic search completed for query: {query_text}")
            return self._process_results(response)
            
        except Exception as e:
            logger.error(f"Error in semantic search: {str(e)}")
            raise

    def hybrid_search(self, query_text: str, top_k: int = 3) -> List[Dict[str, Any]]:
        """Enhanced hybrid search combining original_text keyword search with semantic search."""
        try:
            query_embedding = self.get_embedding(query_text)
            
            query = {
                "size": top_k,
                "query": {
                    "script_score": {
                        "query": {
                            "match": {
                                "original_text": {
                                    "query": query_text,
                                    "operator": "or"
                                }
                            }
                        },
                        "script": {
                            "source": "cosineSimilarity(params.query_vector, 'embedding') * params.semantic_weight + _score * params.keyword_weight",
                            "params": {
                                "query_vector": query_embedding,
                                "semantic_weight": 0.7,
                                "keyword_weight": 0.3
                            }
                        }
                    }
                }
            }
            
            response = self.es.search(index=self.index_name, body=query)
            logger.debug(f"Hybrid search completed for query: {query_text}")
            return self._process_results(response)
            
        except Exception as e:
            logger.error(f"Error in hybrid search: {str(e)}")
            raise

    def _process_results(self, response: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Process search results grouping by source with relationships."""
        try:
            # Dictionary to group results by source
            grouped_results = {}
            
            for hit in response['hits']['hits']:
                source = hit['_source']
                source_info = source.get('source', {'title': 'Unknown', 'page_number': 0})
                source_key = f"{source_info.get('title', 'Unknown')}_{source_info.get('page_number', 0)}"
                
                # If this source isn't in our grouped results yet, initialize it
                if source_key not in grouped_results:
                    grouped_results[source_key] = {
                        'source': source_info,
                        'text_preview': self.get_first_n_words(source.get('original_text', '')),
                        'score': round(hit['_score'], 3),
                        'relationships': []
                    }
                
                # Add relationships for this source
                relationships = source.get('relationships', [])
                if relationships:
                    for rel in relationships:
                        relationship = {
                            'subject': rel.get('subject', 'N/A'),
                            'predicate': rel.get('predicate', 'N/A'),
                            'object': rel.get('object', 'N/A')
                        }
                        if relationship not in grouped_results[source_key]['relationships']:
                            grouped_results[source_key]['relationships'].append(relationship)
                else:
                    # Handle documents without relationships
                    relationship = {
                        'subject': 'N/A',
                        'predicate': 'N/A',
                        'object': 'N/A'
                    }
                    if relationship not in grouped_results[source_key]['relationships']:
                        grouped_results[source_key]['relationships'].append(relationship)
            
            # Convert grouped results to list format
            results = []
            for source_data in grouped_results.values():
                results.append(source_data)
                
            return results
            
        except Exception as e:
            logger.error(f"Error processing search results: {str(e)}")
            raise

def print_search_results(search_type: str, query: str, results: List[Dict[str, Any]]):
        """Enhanced results printing with source grouping."""
        try:
            print(f"\n{search_type} Search Results for: '{query}'")
            print("=" * 80)
            
            if not results:
                print("No results found.")
                return
                
            for i, result in enumerate(results, 1):
                print(f"\nResult {i} (Score: {result['score']}):")
                print(f"Source: {result['source'].get('title', 'Unknown')}, "
                    f"Page {result['source'].get('page_number', 'N/A')}")
                print(f"Text Preview: {result['text_preview']}")
                print("\nRelationships:")
                for j, rel in enumerate(result['relationships'], 1):
                    print(f"  {j}. {rel['subject']} → {rel['predicate']} → {rel['object']}")
                print("-" * 80)
                
        except Exception as e:
            logger.error(f"Error printing search results: {str(e)}")
            print("Error occurred while displaying results.")

def main():
    """Enhanced main function with proper error handling."""
    try:
        load_dotenv()
        querier = ElasticsearchQuerier()
        
        query = "What are the treatments for sickle cell pain crisis?"
        
        # Run all three search types with error handling
        for search_type in ['keyword', 'semantic', 'hybrid']:
            try:
                results = getattr(querier, f"{search_type}_search")(query)
                print_search_results(search_type.capitalize(), query, results)
            except Exception as e:
                logger.error(f"Error in {search_type} search: {str(e)}")
                print(f"Error occurred during {search_type} search.")
                
    except Exception as e:
        logger.error(f"Error in main: {str(e)}")
        logger.error(f"Traceback: {traceback.format_exc()}")
        raise

if __name__ == "__main__":
    main()

2025-01-03 10:41:04,916 - DEBUG - Starting new HTTP connection (1): localhost:9200
2025-01-03 10:41:04,924 - DEBUG - http://localhost:9200 "HEAD / HTTP/1.1" 200 0
2025-01-03 10:41:04,926 - INFO - HEAD http://localhost:9200/ [status:200 duration:0.010s]
2025-01-03 10:41:05,574 - INFO - Successfully initialized ElasticsearchQuerier with index: ped_literature_3_files_01_01_2025
2025-01-03 10:41:05,613 - DEBUG - http://localhost:9200 "POST /ped_literature_3_files_01_01_2025/_search HTTP/1.1" 200 None
2025-01-03 10:41:05,616 - INFO - POST http://localhost:9200/ped_literature_3_files_01_01_2025/_search [status:200 duration:0.035s]
2025-01-03 10:41:05,623 - DEBUG - Keyword search completed for query: What are the treatments for sickle cell pain crisis?
2025-01-03 10:41:05,630 - DEBUG - Request options: {'method': 'post', 'url': '/embeddings', 'files': None, 'post_parser': <function Embeddings.create.<locals>.parser at 0x0000020F0D507B00>, 'json_data': {'input': 'What are the treatments for si


Keyword Search Results for: 'What are the treatments for sickle cell pain crisis?'

Result 1 (Score: 21.634):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 661
Text Preview: 638 638 CHRonIC He ALtH Cond ItIons A nd dI seAses disorders in the SCD complex have similar symptoms, such...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 2 (Score: 20.148):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 662
Text Preview: 639 639 sICKL e Ce LL dI seAse Treatment If your child has SCD, she should be diagnosed as early...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 3 (Score: 16.418):
Source: Bright Futures Guidelines for Health... (Z-Library), Page 397
Text Preview: Table 2 reviews 

2025-01-03 10:41:05,877 - DEBUG - receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Fri, 03 Jan 2025 15:41:05 GMT'), (b'Content-Type', b'application/json'), (b'Transfer-Encoding', b'chunked'), (b'Connection', b'keep-alive'), (b'access-control-allow-origin', b'*'), (b'access-control-expose-headers', b'X-Request-ID'), (b'openai-model', b'text-embedding-ada-002'), (b'openai-organization', b'user-pztuwilaiwjvfjkapn4ghvrs'), (b'openai-processing-ms', b'66'), (b'openai-version', b'2020-10-01'), (b'strict-transport-security', b'max-age=31536000; includeSubDomains; preload'), (b'x-ratelimit-limit-requests', b'10000'), (b'x-ratelimit-limit-tokens', b'5000000'), (b'x-ratelimit-remaining-requests', b'9999'), (b'x-ratelimit-remaining-tokens', b'4999986'), (b'x-ratelimit-reset-requests', b'6ms'), (b'x-ratelimit-reset-tokens', b'0s'), (b'x-request-id', b'req_a3479a83a8a15b08e014c8e3bcd4df7c'), (b'CF-Cache-Status', b'DYNAMIC'), (b'Set-Cookie', b'__cf_bm=yDfKX7qgHmz


Semantic Search Results for: 'What are the treatments for sickle cell pain crisis?'

Result 1 (Score: 1.873):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 662
Text Preview: 639 639 sICKL e Ce LL dI seAse Treatment If your child has SCD, she should be diagnosed as early...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 2 (Score: 1.822):
Source: Bright Futures Guidelines for Health... (Z-Library), Page 397
Text Preview: Table 2 reviews the 3 types of heat-related illness as well as exertional sickling. Ensuring Adequate Nutrition To perform optimally...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 3 (Score: 1.804):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 661
Text Pr

2025-01-03 10:41:06,139 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-01-03 10:41:06,141 - DEBUG - receive_response_body.started request=<Request [b'POST']>
2025-01-03 10:41:06,179 - DEBUG - receive_response_body.complete
2025-01-03 10:41:06,181 - DEBUG - response_closed.started
2025-01-03 10:41:06,183 - DEBUG - response_closed.complete
2025-01-03 10:41:06,185 - DEBUG - HTTP Response: POST https://api.openai.com/v1/embeddings "200 OK" Headers({'date': 'Fri, 03 Jan 2025 15:41:06 GMT', 'content-type': 'application/json', 'transfer-encoding': 'chunked', 'connection': 'keep-alive', 'access-control-allow-origin': '*', 'access-control-expose-headers': 'X-Request-ID', 'openai-model': 'text-embedding-ada-002', 'openai-organization': 'user-pztuwilaiwjvfjkapn4ghvrs', 'openai-processing-ms': '75', 'openai-version': '2020-10-01', 'strict-transport-security': 'max-age=31536000; includeSubDomains; preload', 'x-ratelimit-limit-requests': '10000', 'x-ratelimit


Hybrid Search Results for: 'What are the treatments for sickle cell pain crisis?'

Result 1 (Score: 7.053):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 661
Text Preview: 638 638 CHRonIC He ALtH Cond ItIons A nd dI seAses disorders in the SCD complex have similar symptoms, such...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 2 (Score: 6.655):
Source: Caring for Your Baby and Young Child  Birth to Age 5 (Tanya Altmann American Academy of Pediatrics) (Z-Library), Page 662
Text Preview: 639 639 sICKL e Ce LL dI seAse Treatment If your child has SCD, she should be diagnosed as early...

Relationships:
  1. N/A → N/A → N/A
--------------------------------------------------------------------------------

Result 3 (Score: 5.501):
Source: Bright Futures Guidelines for Health... (Z-Library), Page 397
Text Preview: Table 2 reviews the 