In [3]:
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
import json

# Variables
variables = { "dcm_id": "d427fe94-fc61-4269-8584-78556a36758c" }

# Read the query from .graphql file
with open("graphql/GetCompleteTaxonomyHierarchy.graphql") as f:
    query = f.read()

# Configure the GraphQL transport
transport = RequestsHTTPTransport(
    url="https://app-uat.quinsights.tech/graphql",  # <-- replace with your endpoint
    headers={"Authorization": "Bearer oSpSJAryVQu5MK72ilpO2rrXLrBEtmOE"},  # <-- insert your token
    use_json=True
)

# Initialize client
client = Client(transport=transport, fetch_schema_from_transport=True)

# Execute
result = client.execute(gql(query), variable_values=variables)
print(json.dumps(result, indent=2))

{
  "taxonomy_items": [
    {
      "id": "7608df3e-839b-4e53-bcc7-1296e91986bf",
      "status": "published",
      "taxonomy_item_name": "travel_insurance_product",
      "category": "product_type",
      "data_type": "string",
      "description": "Virtual root category for all travel insurance product segments",
      "unit": null,
      "aliases": [
        "travel_insurance",
        "insurance_product",
        "product_root"
      ],
      "examples": [
        "Reiseversicherung",
        "Assurance voyage"
      ],
      "parent_relationships": [
        {
          "id": "04831613-87e6-4d71-ae15-4f20b93b4bb3",
          "related_taxonomy_item": {
            "id": "4ec1fb50-56e9-4976-b3ad-ae5cfd863a09",
            "taxonomy_item_name": "luggage_travel_delay",
            "category": "segment_type",
            "data_type": "string",
            "description": "Coverage for luggage loss, damage, and travel delays",
            "unit": null,
            "llm_instruction": "Id

In [4]:
base_system_prompt = """You are an expert for analysing insurance documents. 
                        Your job is to extract relevant information from the document in a structured json format. 
                        You will receive a markdown version of the insurance document and analyse the document for the following information:"""

In [None]:
# Step 1: Extract segment information from GraphQL response
import json
import os
from pathlib import Path

# Load the GraphQL response
with open('example_gql_response.json', 'r') as f:
    gql_response = json.load(f)

# Extract segment_type items from the response
segments = []
for item in gql_response['taxonomy_items']:
    if item['category'] == 'product_type':
        for parent_rel in item['parent_relationships']:
            segment_item = parent_rel['related_taxonomy_item']
            if segment_item['category'] == 'segment_type':
                segments.append({
                    'id': segment_item['id'],
                    'name': segment_item['taxonomy_item_name'],
                    'description': segment_item['description'],
                    'aliases': segment_item['aliases'],
                    'examples': segment_item['examples'],
                    'llm_instruction': segment_item['llm_instruction']
                })

print("Found segment_type items:")
for i, segment in enumerate(segments, 1):
    print(f"{i}. {segment['name']}")
    print(f"   Description: {segment['description']}")
    print(f"   Aliases: {segment['aliases']}")
    print(f"   Examples: {segment['examples']}")
    print()

In [None]:
# Step 2: Load all markdown documents
markdown_dir = Path('travel-insurance/markdown')
documents = {}

for md_file in markdown_dir.glob('*.md'):
    company_name = md_file.stem
    with open(md_file, 'r', encoding='utf-8') as f:
        content = f.read()
    documents[company_name] = content
    print(f"Loaded {company_name}: {len(content)} characters")

print(f"\nTotal documents loaded: {len(documents)}")
print(f"Companies: {list(documents.keys())}")

In [None]:
# Step 3: Load the structured output schema
with open('segment_structured_output.json', 'r') as f:
    output_schema = json.load(f)

print("Structured output schema:")
print(json.dumps(output_schema, indent=2))

In [None]:
# Step 4: Set up OpenAI and LangChain with structured output
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from pydantic import BaseModel, Field
from typing import List, Dict, Any
import os

# Set up OpenAI API key (make sure to set OPENAI_API_KEY environment variable)
# os.environ["OPENAI_API_KEY"] = "your-api-key-here"  # Uncomment and set your API key

# Define Pydantic model based on the JSON schema
class SegmentSection(BaseModel):
    """Schema for structured extraction of a segment section from a document."""
    section_reference: str = Field(description="Reference or identifier for the section.")
    full_text_part: str = Field(description="Full text of the section part.")
    llm_summary: str = Field(description="LLM-generated summary of the section.")
    segment_name: str = Field(description="Name of the segment discussed in the section.")
    is_included: bool = Field(description="Indicates if the segment is included.")

# Initialize OpenAI model with structured output
llm = ChatOpenAI(
    model="gpt-4o-mini",  # You can change to gpt-4 if needed
    temperature=0,
).with_structured_output(SegmentSection)

print("OpenAI model initialized with structured output")

In [None]:
# Step 5: Create prompt template combining base system prompt with segment-specific instructions
def create_segment_prompt(segment_info):
    """Create a prompt template for a specific segment."""
    
    prompt_template = ChatPromptTemplate.from_messages([
        ("system", f"""{base_system_prompt}
        
Specific Instructions for this segment:
{segment_info['llm_instruction']}

You are analyzing the document for the presence of the segment: "{segment_info['name']}"
Description: {segment_info['description']}
Aliases: {segment_info['aliases']}
Examples: {segment_info['examples']}

Analyze the provided insurance document and determine if this segment is covered. If you find coverage for this segment:
- Extract the relevant section reference (e.g., section number, heading)
- Include the full text of the relevant section
- Provide a clear summary of what is covered
- Set is_included to true

If the segment is not covered:
- Set is_included to false
- Provide a brief explanation in the summary
- Use "N/A" for section_reference and full_text_part

Always set segment_name to: "{segment_info['name']}"
"""),
        ("human", "Document to analyze:\n\n{document_text}")
    ])
    
    return prompt_template

# Test the prompt creation
test_segment = segments[0]  # luggage_travel_delay
test_prompt = create_segment_prompt(test_segment)
print(f"Created prompt template for segment: {test_segment['name']}")

In [None]:
# Step 6: Create analysis chains for each segment
def create_analysis_chain(segment_info):
    """Create an analysis chain for a specific segment."""
    prompt = create_segment_prompt(segment_info)
    chain = prompt | llm
    return chain

# Create chains for all segments
segment_chains = {}
for segment in segments:
    segment_chains[segment['name']] = create_analysis_chain(segment)
    print(f"Created analysis chain for: {segment['name']}")

print(f"\nTotal segment chains created: {len(segment_chains)}")

In [None]:
# Step 7: Create parallel processing function for analyzing one document across all segments
def analyze_document_for_all_segments(document_text, company_name):
    """Analyze a specific document for all segments in parallel."""
    
    # Create parallel runnables for each segment
    segment_runnables = {}
    for segment_name, chain in segment_chains.items():
        segment_runnables[segment_name] = chain
    
    # Create RunnableParallel for all segments
    parallel_analysis = RunnableParallel(segment_runnables)
    
    # Prepare input - each segment gets the same document_text
    # The key fix: use the same input structure for all segments
    input_data = {"document_text": document_text}
    
    return parallel_analysis, input_data

# Test the parallel setup for analyzing one document
test_company = "axa"
test_document = documents[test_company]
test_parallel, test_input = analyze_document_for_all_segments(test_document, test_company)

print(f"Created parallel analysis setup for document: {test_company}")
print(f"Will analyze {len(segment_chains)} segments in parallel")
print(f"Segments to analyze: {list(segment_chains.keys())}")
print(f"Input structure: {list(test_input.keys())}")  # Should show ['document_text']

In [None]:
# Step 8: Execute analysis for a specific document
async def analyze_single_document(company_name):
    """Analyze a single document for all segments in parallel."""
    
    if company_name not in documents:
        print(f"Error: Company '{company_name}' not found in documents.")
        print(f"Available companies: {list(documents.keys())}")
        return None
    
    document_text = documents[company_name]
    
    print(f"\n=== Analyzing document: {company_name}.md ===")
    print(f"Document length: {len(document_text)} characters")
    print(f"Analyzing {len(segments)} segments in parallel...")
    
    # Create parallel analysis for all segments on this document
    parallel_analysis, input_data = analyze_document_for_all_segments(document_text, company_name)
    
    try:
        # Execute parallel analysis for all segments
        # The key fix: pass the same input_data to all segments
        results = await parallel_analysis.ainvoke(input_data)
        
        # Print summary
        print(f"\n=== Results Summary for {company_name} ===")
        for segment_name, result in results.items():
            status = "✓ INCLUDED" if result.is_included else "✗ NOT FOUND"
            print(f"  {segment_name}: {status}")
        
        return results
        
    except Exception as e:
        print(f"Error analyzing document {company_name}: {e}")
        return {"error": str(e)}

# Function to analyze all documents (one by one, with segments in parallel for each)
async def analyze_all_documents():
    """Analyze all documents sequentially, with segments analyzed in parallel for each document."""
    
    all_results = {}
    
    for company_name in documents.keys():
        print(f"\n{'='*80}")
        results = await analyze_single_document(company_name)
        all_results[company_name] = results
    
    return all_results

# Note: Uncomment and set your OpenAI API key before running
print("Reverted analysis functions ready!")
print("Usage examples:")
print("  # Analyze a specific document:")
print("  results = await analyze_single_document('axa')")
print("  # Analyze all documents:")
print("  all_results = await analyze_all_documents()")

In [None]:
# Step 9: Helper functions for result analysis and export (updated for new structure)
def export_single_document_results(results, company_name, filename=None):
    """Export results for a single document to JSON file."""
    
    if filename is None:
        filename = f"{company_name}_segment_analysis.json"
    
    # Convert Pydantic models to dictionaries
    if "error" in results:
        serializable_results = results
    else:
        serializable_results = {}
        for segment_name, result in results.items():
            if hasattr(result, 'dict'):  # Pydantic model
                serializable_results[segment_name] = result.dict()
            else:
                serializable_results[segment_name] = result
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump({company_name: serializable_results}, f, indent=2, ensure_ascii=False)
    
    print(f"Results for {company_name} exported to {filename}")

def export_all_results(all_results, filename="all_documents_segment_analysis.json"):
    """Export results for all documents to JSON file."""
    
    serializable_results = {}
    for company_name, company_results in all_results.items():
        if company_results is None:
            serializable_results[company_name] = None
        elif "error" in company_results:
            serializable_results[company_name] = company_results
        else:
            serializable_results[company_name] = {}
            for segment_name, result in company_results.items():
                if hasattr(result, 'dict'):  # Pydantic model
                    serializable_results[company_name][segment_name] = result.dict()
                else:
                    serializable_results[company_name][segment_name] = result
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(serializable_results, f, indent=2, ensure_ascii=False)
    
    print(f"All results exported to {filename}")

def create_summary_matrix_new_structure(all_results):
    """Create a summary matrix showing which segments are found in which documents."""
    
    import pandas as pd
    
    matrix_data = []
    for company_name, company_results in all_results.items():
        if company_results is not None and "error" not in company_results:
            row = {"company": company_name}
            for segment_name, result in company_results.items():
                row[segment_name] = result.is_included if hasattr(result, 'is_included') else False
            matrix_data.append(row)
    
    if matrix_data:
        df = pd.DataFrame(matrix_data)
        df = df.set_index('company')
        return df
    else:
        return None

def print_single_document_results(results, company_name):
    """Print detailed results for a single document."""
    
    print(f"\n{'='*60}")
    print(f"DOCUMENT: {company_name.upper()}")
    print(f"{'='*60}")
    
    if results is None:
        print("No results available")
        return
    
    if "error" in results:
        print(f"ERROR: {results['error']}")
        return
    
    for segment_name, result in results.items():
        print(f"\n--- SEGMENT: {segment_name.upper()} ---")
        if hasattr(result, 'is_included'):
            print(f"Included: {'✓ YES' if result.is_included else '✗ NO'}")
            print(f"Section Reference: {result.section_reference}")
            print(f"Summary: {result.llm_summary}")
            if result.is_included and result.full_text_part != "N/A":
                print(f"Full Text (first 300 chars): {result.full_text_part[:300]}...")
        else:
            print(f"Result: {result}")

print("Updated helper functions ready for single document and batch analysis")

## Usage Instructions

To run the segment analysis:

1. **Set your OpenAI API key**:
   ```python
   import os
   os.environ["OPENAI_API_KEY"] = "your-api-key-here"
   ```

2. **Analyze a specific document** (e.g., axa.md):
   ```python
   # This will analyze all segments in parallel for the specified document
   results = await analyze_single_document('axa')
   ```

3. **Analyze all documents** (one by one, with segments in parallel for each):
   ```python
   all_results = await analyze_all_documents()
   ```

4. **Analyze the results**:
   ```python
   # For single document results
   print_single_document_results(results, 'axa')
   export_single_document_results(results, 'axa')
   
   # For all documents results
   summary_df = create_summary_matrix_new_structure(all_results)
   print(summary_df)
   export_all_results(all_results)
   ```

## What this updated implementation does:

- **Analyzes one document at a time** (you specify which company/document)
- **Parallelizes segment analysis** for that specific document (all segments analyzed simultaneously)
- **Extracts 2 segments** from your GraphQL response: `luggage_travel_delay` and `home_assistance`
- **Uses structured output** to enforce the JSON schema you provided
- **Combines LLM instructions** from GraphQL with your base system prompt
- **Returns structured results** with section references, full text, and summaries per segment

## Example workflow:
1. `results = await analyze_single_document('axa')` - Analyzes axa.md for both segments in parallel
2. `results = await analyze_single_document('allianz')` - Analyzes allianz.md for both segments in parallel
3. etc.

The parallel processing now runs the segment analysis (luggage_travel_delay + home_assistance) simultaneously for the single document you're analyzing.