# Pipeline Stage 1: Data Ingestion and Analysis

This notebook is designed to be part of an Elyra pipeline. It handles:
1. Loading multiple XML documents
2. Analyzing each document with the XML Analysis Framework
3. Preparing data for downstream processing
4. Saving results for the next pipeline stage

## Environment Setup

In [1]:
# Install required packages in the pipeline environment
%pip install xml-analysis-framework==1.2.12 --upgrade -q --force-reinstall --no-cache-dir
%pip install pandas --upgrade -q --force-reinstall --no-cache-dir

import xml_analysis_framework as xaf
import json
import pandas as pd
from pathlib import Path
from datetime import datetime
import os

print(f"XML Analysis Framework version: {xaf.__version__}")
print(f"Processing started at: {datetime.now()}")

Note: you may need to restart the kernel to use updated packages.


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
streamlit 1.45.1 requires packaging<25,>=20, but you have packaging 25.0 which is incompatible.
gradio 5.29.0 requires fastapi<1.0,>=0.115.2, but you have fastapi 0.104.1 which is incompatible.
gradio 5.29.0 requires starlette<1.0,>=0.40.0; sys_platform != "emscripten", but you have starlette 0.27.0 which is incompatible.[0m[31m
[0m

Note: you may need to restart the kernel to use updated packages.


XML Analysis Framework version: 1.2.12
Processing started at: 2025-07-27 21:44:15.670630


## Data Discovery

In [2]:
# Configure input data directory
DATA_DIR = Path("data")
OUTPUT_DIR = Path("pipeline_outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

# Find all XML files to process
xml_files = list(DATA_DIR.glob("*.xml")) + list(DATA_DIR.glob("*.kml"))

print(f"Found {len(xml_files)} XML files to process:")
for file in xml_files:
    print(f"  - {file.name} ({file.stat().st_size / 1024:.1f} KB)")

if not xml_files:
    print("⚠️ No XML files found. Adding sample file...")
    xml_files = [DATA_DIR / "mapbox-example.kml"]
    print(f"Using sample file: {xml_files[0]}")

Found 6 XML files to process:
  - full_export.xml (8.0 KB)
  - spring-boot-example-pom.xml (2.1 KB)
  - ivysettings.xml (3.3 KB)
  - build.xml (16.1 KB)
  - ivy.xml (4.3 KB)
  - mapbox-example.kml (1.1 KB)


## Batch Document Analysis

In [3]:
def analyze_document_for_pipeline(file_path: Path) -> dict:
    """Analyze a single document and prepare for pipeline processing"""
    try:
        # Comprehensive analysis
        result = xaf.analyze(str(file_path))
        enhanced = xaf.analyze_enhanced(str(file_path))
        schema = xaf.analyze_schema(str(file_path))
        
        # Create chunks for vector processing
        chunks = xaf.chunk(str(file_path), strategy="hierarchical")
        
        # Compile pipeline-ready data
        return {
            'file_info': {
                'file_name': file_path.name,
                'file_path': str(file_path),
                'file_size': f"{file_path.stat().st_size:,} bytes ({file_path.stat().st_size / 1024:.1f} KB)",
                'processed_at': datetime.now().isoformat()
            },
            'document_analysis': {
                'document_type': result['document_type'].type_name,
                'confidence': result['document_type'].confidence,
                'handler_used': result['handler_used'],
                'ai_use_cases': enhanced.ai_use_cases,
                'key_findings': enhanced.key_findings,
                'quality_metrics': enhanced.quality_metrics or {},
                'structured_data': enhanced.structured_data
            },
            'schema_info': {
                'total_elements': schema.total_elements,
                'max_depth': schema.max_depth,
                'root_element': schema.root_element,
                'namespaces': schema.namespaces
            },
            'chunks': [
                {
                    'chunk_id': chunk.chunk_id,
                    'content': chunk.content,
                    'element_path': chunk.element_path,
                    'start_line': chunk.start_line,
                    'end_line': chunk.end_line,
                    'elements_included': chunk.elements_included,
                    'metadata': chunk.metadata,
                    'token_estimate': chunk.token_estimate
                }
                for chunk in chunks
            ],
            'processing_stats': {
                'total_chunks': len(chunks),
                'total_tokens': sum(chunk.token_estimate for chunk in chunks),
                'avg_chunk_size': sum(len(chunk.content) for chunk in chunks) / len(chunks) if chunks else 0
            }
        }
    except Exception as e:
        return {
            'file_info': {
                'file_name': file_path.name,
                'file_path': str(file_path),
                'file_size': f"{file_path.stat().st_size:,} bytes ({file_path.stat().st_size / 1024:.1f} KB)",
                'processed_at': datetime.now().isoformat()
            },
            'error': str(e),
            'processing_stats': {'total_chunks': 0, 'total_tokens': 0, 'avg_chunk_size': 0}
        }

# Process all documents
print("🔄 Processing documents...")
processed_documents = []
processing_summary = []

for i, file_path in enumerate(xml_files, 1):
    print(f"\n[{i}/{len(xml_files)}] Processing: {file_path.name}")
    
    doc_analysis = analyze_document_for_pipeline(file_path)
    processed_documents.append(doc_analysis)
    
    # Create summary entry
    if 'error' not in doc_analysis:
        summary_entry = {
            'file_name': doc_analysis['file_info']['file_name'],
            'document_type': doc_analysis['document_analysis']['document_type'],
            'confidence': doc_analysis['document_analysis']['confidence'],
            'total_chunks': doc_analysis['processing_stats']['total_chunks'],
            'total_tokens': doc_analysis['processing_stats']['total_tokens'],
            'status': 'success'
        }
        print(f"  ✅ Success: {summary_entry['document_type']} ({summary_entry['total_chunks']} chunks)")
    else:
        summary_entry = {
            'file_name': doc_analysis['file_info']['file_name'],
            'document_type': 'unknown',
            'confidence': 0.0,
            'total_chunks': 0,
            'total_tokens': 0,
            'status': 'error',
            'error': doc_analysis['error']
        }
        print(f"  ❌ Error: {summary_entry['error']}")
    
    processing_summary.append(summary_entry)

print(f"\n✅ Processed {len(processed_documents)} documents")

🔄 Processing documents...

[1/6] Processing: full_export.xml
File size: 0.0 MB
Using iterative parsing for large file: data/full_export.xml
  ✅ Success: ServiceNow Incident (4 chunks)

[2/6] Processing: spring-boot-example-pom.xml
File size: 0.0 MB
Using iterative parsing for large file: data/spring-boot-example-pom.xml
  ✅ Success: Maven POM (6 chunks)

[3/6] Processing: ivysettings.xml
File size: 0.0 MB
Using iterative parsing for large file: data/ivysettings.xml
  ✅ Success: Ivy Settings (0 chunks)

[4/6] Processing: build.xml
File size: 0.0 MB
Using iterative parsing for large file: data/build.xml
  ✅ Success: Apache Ant Build (4 chunks)

[5/6] Processing: ivy.xml
File size: 0.0 MB
Using iterative parsing for large file: data/ivy.xml
  ✅ Success: Ivy Module Descriptor (0 chunks)

[6/6] Processing: mapbox-example.kml
File size: 0.0 MB
Using iterative parsing for large file: data/mapbox-example.kml
  ✅ Success: KML Geographic Data (0 chunks)

✅ Processed 6 documents


## Processing Summary

In [4]:
# Create summary DataFrame
summary_df = pd.DataFrame(processing_summary)

print("📊 Processing Summary:")
print(summary_df)

# Overall statistics
successful_docs = summary_df[summary_df['status'] == 'success']
total_chunks = successful_docs['total_chunks'].sum()
total_tokens = successful_docs['total_tokens'].sum()

print(f"\n📈 Overall Statistics:")
print(f"  • Successful analyses: {len(successful_docs)}/{len(summary_df)}")
print(f"  • Total chunks generated: {total_chunks:,}")
print(f"  • Total tokens estimated: {total_tokens:,}")
print(f"  • Document types found: {successful_docs['document_type'].nunique()}")

# Document type distribution
if len(successful_docs) > 0:
    print(f"\n📋 Document Types:")
    type_counts = successful_docs['document_type'].value_counts()
    for doc_type, count in type_counts.items():
        print(f"  • {doc_type}: {count} document(s)")

📊 Processing Summary:
                     file_name          document_type  confidence  \
0              full_export.xml    ServiceNow Incident        0.95   
1  spring-boot-example-pom.xml              Maven POM        1.00   
2              ivysettings.xml           Ivy Settings        0.95   
3                    build.xml       Apache Ant Build        0.95   
4                      ivy.xml  Ivy Module Descriptor        0.95   
5           mapbox-example.kml    KML Geographic Data        0.95   

   total_chunks  total_tokens   status  
0             4             4  success  
1             6            39  success  
2             0             0  success  
3             4            23  success  
4             0             0  success  
5             0             0  success  

📈 Overall Statistics:
  • Successful analyses: 6/6
  • Total chunks generated: 14
  • Total tokens estimated: 66
  • Document types found: 6

📋 Document Types:
  • ServiceNow Incident: 1 document(s)
  • Mav

## Save Pipeline Outputs

In [5]:
# Save complete analysis results
analysis_output_file = OUTPUT_DIR / "document_analyses.json"
with open(analysis_output_file, 'w') as f:
    json.dump(processed_documents, f, indent=2, default=str)
    
print(f"💾 Saved detailed analyses to: {analysis_output_file}")

# Save processing summary
summary_output_file = OUTPUT_DIR / "processing_summary.csv"
summary_df.to_csv(summary_output_file, index=False)
print(f"💾 Saved processing summary to: {summary_output_file}")

# Prepare vector-ready chunks for next stage
vector_chunks = []
for doc in processed_documents:
    if 'error' not in doc:
        for chunk in doc['chunks']:
            vector_chunk = {
                'id': f"{doc['file_info']['file_name']}_{chunk['chunk_id']}",
                'content': chunk['content'],
                'metadata': {
                    'source_file': doc['file_info']['file_name'],
                    'document_type': doc['document_analysis']['document_type'],
                    'confidence': doc['document_analysis']['confidence'],
                    'chunk_id': chunk['chunk_id'],
                    'element_path': chunk['element_path'],
                    'token_estimate': chunk['token_estimate'],
                    'ai_use_cases': doc['document_analysis']['ai_use_cases']
                }
            }
            vector_chunks.append(vector_chunk)

vector_output_file = OUTPUT_DIR / "vector_ready_chunks.json"
with open(vector_output_file, 'w') as f:
    json.dump(vector_chunks, f, indent=2)

print(f"💾 Saved {len(vector_chunks)} vector-ready chunks to: {vector_output_file}")

# Helper function to convert pandas/numpy types to JSON-serializable types
def convert_for_json(obj):
    """Convert pandas/numpy types to JSON-serializable types"""
    if hasattr(obj, 'item'):  # numpy/pandas scalar
        return obj.item()
    elif hasattr(obj, 'tolist'):  # numpy array
        return obj.tolist()
    else:
        return obj

# Create pipeline metadata (with type conversion)
pipeline_metadata = {
    'pipeline_stage': 'data_ingestion',
    'processed_at': datetime.now().isoformat(),
    'input_files': [str(f) for f in xml_files],
    'total_documents': len(processed_documents),
    'successful_documents': len(successful_docs),
    'total_chunks': convert_for_json(total_chunks),  # Convert pandas int64
    'total_tokens': convert_for_json(total_tokens),   # Convert pandas int64
    'output_files': {
        'analyses': str(analysis_output_file),
        'summary': str(summary_output_file),
        'vector_chunks': str(vector_output_file)
    }
}

metadata_file = OUTPUT_DIR / "pipeline_metadata.json"
with open(metadata_file, 'w') as f:
    json.dump(pipeline_metadata, f, indent=2)

print(f"💾 Saved pipeline metadata to: {metadata_file}")
print(f"\n🎉 Data ingestion stage completed successfully!")
print(f"Ready for next pipeline stage: Vector Database Population")

💾 Saved detailed analyses to: pipeline_outputs/document_analyses.json
💾 Saved processing summary to: pipeline_outputs/processing_summary.csv
💾 Saved 14 vector-ready chunks to: pipeline_outputs/vector_ready_chunks.json
💾 Saved pipeline metadata to: pipeline_outputs/pipeline_metadata.json

🎉 Data ingestion stage completed successfully!
Ready for next pipeline stage: Vector Database Population
