# Military Capability Gap Analysis with Semantica Context Graphs

## Use case scope
- Capability gap analysis for defense planning under future scenarios.
- End-to-end flow from source documents and ontologies to decision traces and exports.
- Context graph pattern: Scenario -> Mission Thread -> Events -> Systems -> Capabilities -> Gaps -> Decisions -> Outcomes.

## Questions answered in this notebook
- What capability gaps are present for a given mission thread?
- Which evidence and sources support each gap?
- Which precedents, exceptions, and approvals were used for decisions?
- What multi-hop paths connect scenario context to risk outcomes?

## Semantica features used in this use case
- Ingestion: `FileIngestor`, `WebIngestor`, `OntologyIngestor`
- Parsing: `PDFParser`, `DocumentParser`, optional `DoclingParser`
- Ontology: `ingest_ontology`, `OntologyEvaluator`
- Split: `TextSplitter`, `SemanticChunker`, `StructuralChunker`
- Normalization: `TextNormalizer`, `EntityNormalizer`, `DateNormalizer`, `NumberNormalizer`, `LanguageDetector`, `EncodingHandler`, `TextCleaner`
- Semantic extraction: `NamedEntityRecognizer`, `RelationExtractor`, `EventDetector`, `CoreferenceResolver`, `TripletExtractor`, `SemanticAnalyzer`, `SemanticNetworkExtractor`, `ExtractionValidator`
- KG: `GraphBuilder`, `GraphAnalyzer`, `CentralityCalculator`, `CommunityDetector`, `ConnectivityAnalyzer`, `SimilarityCalculator`, `LinkPredictor`, `PathFinder`, `EntityResolver`
- Context and decisions: `ContextGraph`, `AgentContext`, `PolicyEngine`, `Decision`, `Policy`, `PolicyException`, `ApprovalChain`, `Precedent`
- Reasoning: `Reasoner`, `ExplanationGenerator`
- Provenance and governance: `ProvenanceManager`, `VersionManager`
- Export and reporting: `export_json`, `export_graph`, `export_rdf`, `export_csv`, `export_yaml`, `export_lpg`, `ReportGenerator`
- Visualization: `KGVisualizer`

## Expected outputs
- Capability-gap context graph and knowledge graph artifacts.
- Decision trace records with policy, exception, approval, and precedent links.
- Multi-format exports (JSON, RDF, GraphML, CSV, YAML, LPG, report).
- Summary metrics for ingestion, extraction, graph analytics, reasoning, provenance, and export stages.


In [None]:
from pathlib import Path

BASE_DIR = Path.cwd()
USE_CASE_DIR = BASE_DIR / 'cookbook' / 'use_cases' / 'capability_gap_defense'
DATA_DIR = USE_CASE_DIR / 'data'
OUTPUT_DIR = USE_CASE_DIR / 'outputs'
DATA_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

BASE_DIR, DATA_DIR, OUTPUT_DIR

In [None]:
import semantica.ingest as ingest_module

required_files = [
    'rand_competing_without_fighting_2022.pdf',
    'us_navy_it_strategic_plan_fy2023.pdf',
    'prov.ttl',
    'd3fend.ttl',
    'military_capability_gap_ontology.ttl',
    'military_capability_gap_instances.ttl',
]

present_files = sorted([f.name for f in DATA_DIR.glob('*')])
missing_files = [f for f in required_files if f not in present_files]

web_sources = [
    'https://www.rand.org/pubs/research_reports/RRA733-1.html',
    'https://foundationcapital.com/context-graphs/',
]

web_seed_contents = []
for url in web_sources:
    try:
        web_seed_contents.append(ingest_module.ingest_web(url, method='url'))
    except Exception as e:
        print(f'Web ingestion failed for {url}: {e}')

{'present_files': present_files, 'missing_files': missing_files, 'web_seed_docs': len(web_seed_contents)}


In [None]:
import semantica.ingest as ingest_module
from semantica.ingest import FileIngestor, WebIngestor, OntologyIngestor

file_ingestor = FileIngestor()
file_objects = file_ingestor.ingest_directory(DATA_DIR, recursive=False, read_content=False)

# method wrappers via module namespace (no direct function import)
file_objects_via_method = ingest_module.ingest_file(DATA_DIR, method='directory', recursive=False, read_content=False)

web_ingestor = WebIngestor()
web_contents = []
for url in ['https://www.rand.org/pubs/research_reports/RRA733-1.html']:
    try:
        web_contents.append(web_ingestor.ingest_url(url))
    except Exception as e:
        print(f'Web ingestion failed for {url}: {e}')

web_contents_via_method = []
for url in ['https://www.rand.org/pubs/research_reports/RRA733-1.html']:
    try:
        web_contents_via_method.append(ingest_module.ingest_web(url, method='url'))
    except Exception as e:
        print(f'Web method ingestion failed for {url}: {e}')

ontology_ingestor = OntologyIngestor()
ontology_data = ontology_ingestor.ingest_directory(DATA_DIR, recursive=False)
ontology_data_via_method = ingest_module.ingest_ontology(DATA_DIR, method='directory', recursive=False)

{
    'files': len(file_objects),
    'files_via_method': len(file_objects_via_method) if isinstance(file_objects_via_method, list) else 1,
    'web_docs': len(web_contents),
    'web_docs_via_method': len(web_contents_via_method),
    'ontologies': len(ontology_data),
    'ontologies_via_method': len(ontology_data_via_method) if isinstance(ontology_data_via_method, list) else 1,
}


- Modules: `FileIngestor`, `WebIngestor`, `OntologyIngestor`
- Loads local files.
- Fetches web content.
- Ingests ontology files.

## Ontology Ingestion Detail (Schema + Instance Coverage)

- Modules: `ingest_ontology`, `OntologyEvaluator`
- Reads ontology files and basic schema info.
- Runs competency-question coverage check.

In [None]:
import semantica.ontology as ontology_module

ontology_files = sorted([p for p in DATA_DIR.glob('*.ttl')])
ontology_details = []
for of in ontology_files:
    try:
        od = ontology_module.ingest_ontology(of, method='file')
        if isinstance(od, list):
            for item in od:
                ontology_details.append({
                    'file': of.name,
                    'classes': len(item.data.get('classes', [])),
                    'properties': len(item.data.get('properties', [])),
                })
        else:
            ontology_details.append({
                'file': of.name,
                'classes': len(od.data.get('classes', [])),
                'properties': len(od.data.get('properties', [])),
            })
    except Exception as e:
        ontology_details.append({'file': of.name, 'error': str(e)})

ontology_details[:10]


## Ontology Coverage Check for Capability Gap Analysis

In [None]:
from semantica.ontology import OntologyEvaluator

ontology_eval = OntologyEvaluator()

# Evaluate first ingested ontology if available (schema adequacy for use-case questions)
ontology_eval_result = None
if ontology_data:
    ontology_eval_result = ontology_eval.evaluate_ontology(
        ontology_data[0].data,
        competency_questions=[
            'What capability gaps are revealed for a mission thread?',
            'Which systems provide required capabilities?',
            'What evidence and provenance support a gap decision?',
            'Which precedents and exceptions affected a decision?'
        ]
    )

if ontology_eval_result:
    {
        'coverage_score': ontology_eval_result.coverage_score,
        'completeness_score': ontology_eval_result.completeness_score,
        'gaps': ontology_eval_result.gaps[:5],
    }
else:
    {'coverage_score': None, 'completeness_score': None, 'gaps': []}

In [None]:
import semantica.parse as parse_module
from semantica.parse import PDFParser

pdf_parser = PDFParser()
pdf_docs = []
for pdf_path in sorted(DATA_DIR.glob('*.pdf')):
    try:
        with open(pdf_path, 'rb') as f:
            if f.read(4) != b'%PDF':
                print(f'Skipping non-PDF payload: {pdf_path.name}')
                continue

        parsed = parse_module.parse_pdf(pdf_path, method='default', pages=list(range(0, 12)))
        if not isinstance(parsed, dict) or ('full_text' not in parsed and 'text' not in parsed):
            parsed = pdf_parser.parse(pdf_path, pages=list(range(0, 12)))

        text = parsed.get('full_text', parsed.get('text', ''))
        if text:
            pdf_docs.append({
                'doc_id': pdf_path.stem,
                'source': str(pdf_path),
                'text': text[:50000],
                'metadata': parsed.get('metadata', {}),
            })
    except Exception as e:
        print(f'PDF parse failed for {pdf_path.name}: {e}')

len(pdf_docs)


## Multi-Format Parsing: DocumentParser + Optional DoclingParser

- Modules: `PDFParser`, `DocumentParser`, optional `DoclingParser`
- Parses PDF/documents.
- Extracts text and metadata.
- Uses Docling parser if available.

In [None]:
from semantica.parse import DocumentParser
import semantica.parse as parse_module

doc_parser = DocumentParser()
doc_parser_preview = {}

if pdf_docs:
    sample_pdf = Path(pdf_docs[0]['source'])
    try:
        parsed_doc = parse_module.parse_document(sample_pdf, method='default')
        if not isinstance(parsed_doc, dict):
            parsed_doc = doc_parser.parse_document(sample_pdf)
        doc_parser_preview = {
            'source': sample_pdf.name,
            'keys': list(parsed_doc.keys())[:10],
            'text_chars': len(parsed_doc.get('full_text', parsed_doc.get('text', '')) or ''),
        }
    except Exception as e:
        doc_parser_preview = {'source': sample_pdf.name, 'error': str(e)}

docling_preview = {'docling_available': bool(getattr(parse_module, 'DOCLING_AVAILABLE', False))}
if getattr(parse_module, 'DOCLING_AVAILABLE', False) and pdf_docs:
    try:
        docling_parser = parse_module.DoclingParser(export_format='markdown')
        dres = docling_parser.parse(Path(pdf_docs[0]['source']))
        docling_preview['keys'] = list(dres.keys())[:10]
        docling_preview['text_chars'] = len(dres.get('full_text', dres.get('text', '')) or '')
    except Exception as e:
        docling_preview['error'] = str(e)

{'document_parser': doc_parser_preview, 'docling_parser': docling_preview}


In [None]:
import json
from pathlib import Path

web_items = web_contents + web_contents_via_method + web_seed_contents

corpus = (
    [
        {'doc_id': d['doc_id'], 'source': d['source'], 'text': d['text']}
        for d in pdf_docs
    ]
    + [
        {
            'doc_id': f'web_{i}',
            'source': getattr(w, 'url', f'web_source_{i}'),
            'text': (getattr(w, 'content', str(w)) or '')[:30000],
        }
        for i, w in enumerate(web_items)
    ]
    + [
        {
            'doc_id': Path(ont.source_path).stem,
            'source': ont.source_path,
            'text': json.dumps(ont.data, ensure_ascii=True)[:40000],
        }
        for ont in ontology_data
    ]
)

{'corpus_items': len(corpus), 'sample': [c['doc_id'] for c in corpus[:5]]}


## Orchestration-Path Chunking (Decision-Time Context Capture)

- Modules: `TextSplitter`, `PipelineBuilder`
- Splits documents into chunks.
- Defines pipeline steps for ingest, split, extract, graph, export.

In [None]:
from semantica.split import TextSplitter

splitter = TextSplitter(method='recursive', chunk_size=1800, chunk_overlap=250)

texts = [doc.get('text', '') for doc in corpus]
chunks_by_doc = splitter.split_batch(texts)

chunked_docs = []
for doc, chunks in zip(corpus, chunks_by_doc):
    for idx, ch in enumerate(chunks or []):
        chunked_docs.append({
            'doc_id': f"{doc['doc_id']}::chunk_{idx}",
            'source': doc['source'],
            'text': ch.text if hasattr(ch, 'text') else str(ch),
            'parent_doc_id': doc['doc_id'],
        })

extraction_corpus = chunked_docs if chunked_docs else corpus

{'chunked_docs': len(chunked_docs), 'extraction_docs': len(extraction_corpus)}


## Split Strategies (Semantic / Structural / Entity-Aware)

- Modules: `SemanticChunker`, `StructuralChunker`, `TextSplitter` (`entity_aware`)
- Runs multiple split strategies.
- Compares chunk counts/output.

In [None]:
from semantica.split import SemanticChunker, StructuralChunker

split_strategy_preview = {}
if corpus:
    sample_text = corpus[0]['text'][:12000]
    try:
        semantic_chunker = SemanticChunker(chunk_size=1200, chunk_overlap=200)
        sem_chunks = semantic_chunker.chunk(sample_text)
        split_strategy_preview['semantic_chunks'] = len(sem_chunks)
    except Exception as e:
        split_strategy_preview['semantic_chunks_error'] = str(e)

    try:
        structural_chunker = StructuralChunker(chunk_size=1200, chunk_overlap=150)
        st_chunks = structural_chunker.chunk(sample_text)
        split_strategy_preview['structural_chunks'] = len(st_chunks)
    except Exception as e:
        split_strategy_preview['structural_chunks_error'] = str(e)

    try:
        ea_splitter = TextSplitter(method=['entity_aware', 'recursive'], chunk_size=1200, chunk_overlap=150)
        ea_chunks = ea_splitter.split(sample_text)
        split_strategy_preview['entity_aware_chunks'] = len(ea_chunks)
    except Exception as e:
        split_strategy_preview['entity_aware_chunks_error'] = str(e)

split_strategy_preview

In [None]:
from semantica.pipeline import PipelineBuilder

# Represent execution-path orchestration explicitly (the layer where decision traces should be captured)
pipeline = (
    PipelineBuilder()
    .add_step('ingest_sources', 'ingest', sources=len(corpus))
    .add_step('chunk_context', 'split', method='recursive')
    .add_step('semantic_extract', 'extract', entity_relation_event_triplet=True)
    .add_step('build_context_graph', 'context_graph')
    .add_step('policy_and_trace', 'decision_trace_capture')
    .add_step('export_and_observe', 'export_observability')
    .connect_steps('ingest_sources', 'chunk_context')
    .connect_steps('chunk_context', 'semantic_extract')
    .connect_steps('semantic_extract', 'build_context_graph')
    .connect_steps('build_context_graph', 'policy_and_trace')
    .connect_steps('policy_and_trace', 'export_and_observe')
    .build(name='capability_gap_orchestration_path')
)

{'pipeline': pipeline.name, 'steps': [s.name for s in pipeline.steps]}

## Normalization Layer (Text, Entity, Date, Number, Language, Encoding)

- Modules: `TextNormalizer`, `EntityNormalizer`, `DateNormalizer`, `NumberNormalizer`, `LanguageDetector`, `EncodingHandler`, `TextCleaner`
- Cleans and normalizes text.
- Normalizes entities, date/time, and numeric values.
- Detects language and handles encoding.

In [None]:
import semantica.normalize as normalize_module
from semantica.normalize import TextNormalizer, EntityNormalizer, DateNormalizer, NumberNormalizer
from semantica.normalize import LanguageDetector, EncodingHandler, TextCleaner

text_normalizer = TextNormalizer()
entity_normalizer = EntityNormalizer()
date_normalizer = DateNormalizer()
number_normalizer = NumberNormalizer()
language_detector = LanguageDetector(default_language='en')
encoding_handler = EncodingHandler()
text_cleaner = TextCleaner()

normalized_extraction_corpus = []
for item in extraction_corpus:
    txt = item.get('text', '')

    cleaned = normalize_module.clean_text(txt, method='default') if txt else ''
    normalized_text = normalize_module.normalize_text(cleaned, method='default') if cleaned else ''

    lang = normalize_module.detect_language(normalized_text, method='default') if normalized_text else 'en'
    _ = normalize_module.handle_encoding(normalized_text, method='default') if normalized_text else normalized_text

    normalized_extraction_corpus.append({
        **item,
        'text': normalized_text,
        'language': lang,
    })

extraction_corpus = normalized_extraction_corpus

demo_date = date_normalizer.normalize_date('12 Apr 2028 05:15 UTC')
demo_num = number_normalizer.normalize_number('42.0%')
demo_entity = entity_normalizer.normalize_entity('ground radar layer', entity_type='System')

{'normalized_docs': len(extraction_corpus), 'demo_date': str(demo_date), 'demo_number': demo_num, 'demo_entity': demo_entity}


In [None]:
from semantica.semantic_extract import NamedEntityRecognizer, RelationExtractor, EventDetector
from semantica.semantic_extract import CoreferenceResolver, TripletExtractor, SemanticAnalyzer
from semantica.semantic_extract import SemanticNetworkExtractor, ExtractionValidator

ner = NamedEntityRecognizer(method='pattern', confidence_threshold=0.2)
relation_extractor = RelationExtractor(method='pattern', confidence_threshold=0.2)
event_detector = EventDetector()
coref_resolver = CoreferenceResolver()
triplet_extractor = TripletExtractor(method='pattern', include_provenance=True)
semantic_analyzer = SemanticAnalyzer()
semantic_network_extractor = SemanticNetworkExtractor()
validator = ExtractionValidator()

texts = [item.get('text', '') for item in extraction_corpus if item.get('text')]
resolved_texts = [coref_resolver.resolve(t) for t in texts]

entities_batch = ner.process_batch(resolved_texts)
triplets_batch = triplet_extractor.process_batch(resolved_texts)
relations_batch = [relation_extractor.extract_relations(t, entities=e) for t, e in zip(resolved_texts, entities_batch)]
events_batch = [event_detector.detect_events(t) for t in resolved_texts]

all_entities = [e for batch in entities_batch for e in batch]
all_relationships = [r for batch in relations_batch for r in batch]
all_events = [ev for batch in events_batch for ev in batch]
all_triplets = [tr for batch in triplets_batch for tr in batch]

_ = validator.validate_entities(all_entities)
_ = validator.validate_relations(all_relationships)

semantic_networks = [
    {
        'doc_id': extraction_corpus[i].get('doc_id', f'doc_{i}'),
        'analysis': semantic_analyzer.analyze(resolved_texts[i]),
        'network': semantic_network_extractor.extract(resolved_texts[i], entities=entities_batch[i], relations=relations_batch[i]),
    }
    for i in range(min(len(resolved_texts), len(extraction_corpus)))
]

{
    'entities': len(all_entities),
    'relationships': len(all_relationships),
    'events': len(all_events),
    'triplets': len(all_triplets),
    'semantic_networks': len(semantic_networks),
    'documents_processed': len(resolved_texts),
}


## Data Quality Controls: Deduplication + Conflict Detection

In [None]:
from semantica.kg import EntityResolver
import semantica.conflicts as conflicts_module

entity_dicts = []
for e in all_entities:
    entity_dicts.append({
        'id': str(getattr(e, 'id', getattr(e, 'text', 'unknown'))),
        'name': str(getattr(e, 'text', getattr(e, 'id', 'unknown'))),
        'type': str(getattr(e, 'label', getattr(e, 'type', 'entity'))),
        'metadata': getattr(e, 'metadata', {}) or {}
    })

entity_resolver = EntityResolver(strategy='fuzzy')
resolved_entities = entity_resolver.resolve_entities(entity_dicts[:200]) if entity_dicts else []

conflict_rows = [
    {'id': 'System_GroundRadarLayer', 'coveragePercent': '42', 'type': 'system'},
    {'id': 'System_GroundRadarLayer', 'coveragePercent': '58', 'type': 'system'},
]
conflicts = conflicts_module.detect_conflicts(conflict_rows, method='value', property_name='coveragePercent')
resolved_conflicts = conflicts_module.resolve_conflicts(conflicts, method=conflicts_module.voting) if conflicts else []

{
    'entities_before_resolution': len(entity_dicts[:200]),
    'entities_after_resolution': len(resolved_entities),
    'conflicts_detected': len(conflicts),
    'conflicts_resolved': len(resolved_conflicts),
}


In [None]:
from semantica.kg import GraphBuilder, GraphAnalyzer

graph_builder = GraphBuilder(merge_entities=True, resolve_conflicts=True)
kg = graph_builder.build([{'entities': all_entities, 'relationships': all_relationships}], extract=False)

graph_analyzer = GraphAnalyzer()
kg_analysis = graph_analyzer.analyze_graph(kg)

{
    'kg_entities': len(kg.get('entities', [])),
    'kg_relationships': len(kg.get('relationships', [])),
    'has_analysis': bool(kg_analysis),
}

## KG Analytics (Centrality, Communities, Connectivity, Similarity, Link Prediction)

- Modules: `CentralityCalculator`, `CommunityDetector`, `ConnectivityAnalyzer`, `SimilarityCalculator`, `LinkPredictor`, `NodeEmbedder`
- Runs graph metrics and analytics.
- Calculates centrality, communities, connectivity, similarity, and link predictions.
- Checks node embedding availability.

In [None]:
from semantica.kg import CentralityCalculator, CommunityDetector, ConnectivityAnalyzer
from semantica.kg import SimilarityCalculator, LinkPredictor

extended_kg_analytics = {}

try:
    centrality_calc = CentralityCalculator()
    cent = centrality_calc.calculate_all_centrality(kg)
    extended_kg_analytics['centrality_keys'] = list(cent.keys())[:10]
except Exception as e:
    extended_kg_analytics['centrality_error'] = str(e)

try:
    community_detector = CommunityDetector()
    comm = community_detector.detect_communities(kg, algorithm='louvain')
    extended_kg_analytics['community_count'] = comm.get('num_communities', None) if isinstance(comm, dict) else None
except Exception as e:
    extended_kg_analytics['community_error'] = str(e)

try:
    connectivity_analyzer = ConnectivityAnalyzer()
    conn = connectivity_analyzer.analyze_connectivity(kg)
    extended_kg_analytics['connectivity_keys'] = list(conn.keys())[:10] if isinstance(conn, dict) else []
except Exception as e:
    extended_kg_analytics['connectivity_error'] = str(e)

try:
    sim_calc = SimilarityCalculator(method='cosine')
    extended_kg_analytics['sample_cosine_similarity'] = sim_calc.cosine_similarity([1.0, 0.0, 1.0], [0.8, 0.2, 0.9])
except Exception as e:
    extended_kg_analytics['similarity_error'] = str(e)

try:
    link_predictor = LinkPredictor()
    lp = link_predictor.predict_links(kg, top_k=5)
    extended_kg_analytics['predicted_links'] = len(lp) if hasattr(lp, '__len__') else None
except Exception as e:
    extended_kg_analytics['link_prediction_error'] = str(e)

extended_kg_analytics

In [None]:
from semantica.kg import NodeEmbedder

node_embedding_status = {}
try:
    embedder = NodeEmbedder(method='node2vec', embedding_dimension=32, walk_length=20, num_walks=5)
    node_embedding_status['node2vec_ready'] = True
except Exception as e:
    node_embedding_status['node2vec_ready'] = False
    node_embedding_status['reason'] = str(e)

node_embedding_status

In [None]:
from semantica.context import ContextGraph

context_graph = ContextGraph(advanced_analytics=True, centrality_analysis=True, community_detection=True)

seed_nodes = [
    {'id': 'Scenario_FutureA2AD_2028', 'type': 'scenario', 'properties': {'content': 'Future A2/AD escalation scenario'}},
    {'id': 'MissionThread_ForceProtection', 'type': 'mission_thread', 'properties': {'content': 'Protect forward operating assets under drone saturation'}},
    {'id': 'Event_LowAltitudeSwarmIncursions', 'type': 'event', 'properties': {'content': 'Repeated low-altitude swarm incursions'}},
    {'id': 'System_GroundRadarLayer', 'type': 'system', 'properties': {'content': 'Ground radar surveillance layer'}},
    {'id': 'Capability_LowAltitudeDetection', 'type': 'capability', 'properties': {'content': 'Low altitude detection capability'}},
    {'id': 'Outcome_MissionRiskIncrease', 'type': 'outcome', 'properties': {'content': 'Rising mission risk and delayed response'}},
    {'id': 'Gap_LowAltitudeDetectionCoverage', 'type': 'capability_gap', 'properties': {'content': 'Insufficient low-altitude detection coverage'}},
]

seed_edges = [
    {'source_id': 'Scenario_FutureA2AD_2028', 'target_id': 'MissionThread_ForceProtection', 'type': 'has_mission_thread'},
    {'source_id': 'MissionThread_ForceProtection', 'target_id': 'Event_LowAltitudeSwarmIncursions', 'type': 'includes_event'},
    {'source_id': 'Event_LowAltitudeSwarmIncursions', 'target_id': 'System_GroundRadarLayer', 'type': 'stresses_system'},
    {'source_id': 'System_GroundRadarLayer', 'target_id': 'Capability_LowAltitudeDetection', 'type': 'provides_capability'},
    {'source_id': 'Capability_LowAltitudeDetection', 'target_id': 'Outcome_MissionRiskIncrease', 'type': 'affects_outcome'},
    {'source_id': 'MissionThread_ForceProtection', 'target_id': 'Gap_LowAltitudeDetectionCoverage', 'type': 'reveals_gap'},
]

context_graph.add_nodes(seed_nodes)
context_graph.add_edges(seed_edges)

entity_nodes = [
    {
        'id': str(getattr(ent, 'id', getattr(ent, 'text', f'Entity_{idx}'))),
        'type': str(getattr(ent, 'label', getattr(ent, 'type', 'entity'))),
        'properties': {'content': str(getattr(ent, 'text', getattr(ent, 'id', f'Entity_{idx}')))},
    }
    for idx, ent in enumerate(all_entities[:60])
]
context_graph.add_nodes(entity_nodes)

scenario_edges = [
    {'source_id': 'Scenario_FutureA2AD_2028', 'target_id': n['id'], 'type': 'contextualizes'}
    for n in entity_nodes
]
context_graph.add_edges(scenario_edges)

relation_edges = [
    {
        'source_id': str(getattr(getattr(rel, 'subject', None), 'id', None) or getattr(rel, 'source', 'unknown_source')),
        'target_id': str(getattr(getattr(rel, 'object', None), 'id', None) or getattr(rel, 'target', 'unknown_target')),
        'type': str(getattr(rel, 'predicate', None) or getattr(rel, 'type', 'related_to')),
    }
    for rel in all_relationships[:120]
]
context_graph.add_edges(relation_edges)

context_graph.stats()


In [None]:
from semantica.vector_store import VectorStore
from semantica.context import AgentContext

vector_store = VectorStore(backend='inmemory', dimension=384)
agent_context = AgentContext(
    vector_store=vector_store,
    knowledge_graph=context_graph,
    decision_tracking=True,
    advanced_analytics=True,
    kg_algorithms=True,
    vector_store_features=True,
    graph_expansion=True,
    max_expansion_hops=3,
)

stored = agent_context.store(
    [{'content': c['text'][:2500], 'metadata': {'source': c['source'], 'doc_id': c['doc_id']}} for c in corpus],
    extract_entities=False,
    extract_relationships=False
)

d1 = agent_context.record_decision(
    category='capability_gap_assessment',
    scenario='Future A2/AD mission thread with low-altitude swarm pressure',
    reasoning='Mission requires persistent low-altitude detection, but current radar layer indicates limited valley and urban coverage.',
    outcome='gap_identified_low_altitude_detection',
    confidence=0.93,
    entities=['MissionThread_ForceProtection', 'Capability_LowAltitudeDetection', 'Gap_LowAltitudeDetectionCoverage'],
)

d2 = agent_context.record_decision(
    category='capability_gap_mitigation',
    scenario='Counter low-altitude swarm incursions',
    reasoning='Need layered sensing integration and revised mission doctrine to close detection delay.',
    outcome='recommend_multilayer_sensor_fusion',
    confidence=0.88,
    entities=['System_GroundRadarLayer', 'Gap_LowAltitudeDetectionCoverage'],
)

retrieved = agent_context.retrieve(
    query='Which capability gaps most increase mission risk in this scenario?',
    max_results=8,
    expand_graph=True,
    include_entities=True,
)

{'stored': stored.get('stored_count', 0), 'decisions': [d1, d2], 'retrieved': len(retrieved)}

## Decision Traces: Policies, Exceptions, Approval Chains, Precedents, Cross-System Context

- Modules: `AgentContext`, `ContextGraph`, `PolicyEngine`
- Models: `Decision`, `Policy`, `PolicyException`, `ApprovalChain`, `Precedent`
- Records decisions and policy checks.
- Adds exceptions, approvals, precedents, and cross-system context.

In [None]:
from datetime import datetime
from semantica.context.decision_models import Decision, Policy, PolicyException, ApprovalChain, Precedent
from semantica.context.policy_engine import PolicyEngine

# Policy model aligned to 'policy v3.2 + exception route' pattern from the article
policy_engine = PolicyEngine(context_graph)
renewal_policy = Policy(
    policy_id='POL-CAPGAP-3.2',
    name='Capability Gap Escalation Policy',
    description='Escalate and require approval when mission-critical capability coverage is below threshold.',
    rules={
        'min_confidence': 0.8,
        'required_categories': ['capability_gap_assessment', 'capability_gap_mitigation'],
        'allowed_outcomes': ['gap_identified_low_altitude_detection', 'recommend_multilayer_sensor_fusion', 'escalate_for_exception']
    },
    category='capability_gap_assessment',
    version='3.2',
    created_at=datetime.now(),
    updated_at=datetime.now(),
    metadata={'entities': ['MissionThread_ForceProtection', 'System_GroundRadarLayer']}
)
policy_engine.add_policy(renewal_policy)

# Construct explicit trace artifacts (exception, approval, precedent link)
trace_decision = Decision(
    decision_id='',
    category='capability_gap_assessment',
    scenario='Coverage threshold breach during swarm-pressure mission thread',
    reasoning='Below-threshold low-altitude detection coverage with repeated threat ingress; escalation required.',
    outcome='escalate_for_exception',
    confidence=0.89,
    timestamp=datetime.now(),
    decision_maker='joint_ops_agent',
    metadata={'policy_version': '3.2'}
)

policy_exception = PolicyException(
    exception_id='',
    decision_id=trace_decision.decision_id,
    policy_id='POL-CAPGAP-3.2',
    reason='Emergency force-protection override due to active swarm threat',
    approver='VP_Operations',
    approval_timestamp=datetime.now(),
    justification='Mission-critical risk outweighs standard route latency',
    metadata={'channel': 'slack_dm'}
)

approval_chain = ApprovalChain(
    approval_id='',
    decision_id=trace_decision.decision_id,
    approver='Finance_Controller',
    approval_method='zoom_call',
    approval_context='Approved exceptional spend for layered sensing package',
    timestamp=datetime.now(),
    metadata={'step': 'final_finance_gate'}
)

precedent_link = Precedent(
    precedent_id='',
    source_decision_id=d1,
    similarity_score=0.92,
    relationship_type='similar_scenario',
    metadata={'note': 'Prior low-altitude detection gap precedent'}
)

# Persist trace artifacts into ContextGraph as first-class decision-trace nodes
trace_decision_id = context_graph.record_decision(
    category=trace_decision.category,
    scenario=trace_decision.scenario,
    reasoning=trace_decision.reasoning,
    outcome=trace_decision.outcome,
    confidence=trace_decision.confidence,
    entities=['MissionThread_ForceProtection', 'Gap_LowAltitudeDetectionCoverage'],
    decision_maker=trace_decision.decision_maker,
    metadata={'policy_version': '3.2', 'cross_system_context': {'crm': 'critical_account', 'zendesk': 'open_escalation', 'pagerduty': 'sev1_incidents'}}
)

context_graph.add_node(policy_exception.exception_id, 'policy_exception', policy_exception.reason)
context_graph.add_edge(trace_decision_id, policy_exception.exception_id, 'has_exception')
context_graph.add_node(approval_chain.approval_id, 'approval', approval_chain.approval_context)
context_graph.add_edge(trace_decision_id, approval_chain.approval_id, 'approved_by_chain')
context_graph.add_node(precedent_link.precedent_id, 'precedent', 'precedent linkage')
context_graph.add_edge(trace_decision_id, precedent_link.precedent_id, 'uses_precedent')
context_graph.add_edge(precedent_link.precedent_id, d1, 'points_to_decision')

# Compliance check against policy v3.2
compliant = policy_engine.check_compliance(trace_decision, 'POL-CAPGAP-3.2')

{'trace_decision_id': trace_decision_id, 'policy_compliant': compliant, 'policy_id': renewal_policy.policy_id, 'policy_version': renewal_policy.version}

In [None]:
# Search precedent and causal impact to convert one-off exceptions into reusable governance
precedents = agent_context.find_precedents(
    scenario='Low-altitude detection shortfall under swarm pressure',
    category='capability_gap_assessment',
    limit=5,
    use_hybrid_search=True
)

impact = context_graph.analyze_decision_impact(trace_decision_id)
insights = context_graph.get_decision_summary()

{
    'precedent_hits': len(precedents),
    'impact_total_influenced': impact.get('total_influenced', 0),
    'decision_total': insights.get('total_decisions', 0),
    'categories': insights.get('categories', {})
}

In [None]:
# Cross-system synthesis snapshot using AgentContext API
try:
    cross_system_snapshot = agent_context.capture_cross_system_inputs(
        systems=['crm', 'ticketing', 'incident_management', 'asset_inventory'],
        entity_id='MissionThread_ForceProtection'
    )
except Exception as e:
    cross_system_snapshot = {'error': str(e)}

cross_system_snapshot

In [None]:
import semantica.context as context_module

hop_1 = context_graph.get_neighbors('Scenario_FutureA2AD_2028', hops=1)
hop_2 = context_graph.get_neighbors('Scenario_FutureA2AD_2028', hops=2)
hop_3 = context_graph.get_neighbors('Scenario_FutureA2AD_2028', hops=3)

reasoning_paths = []
try:
    mh = context_module.multi_hop_query(
        context_graph,
        start_entity='Scenario_FutureA2AD_2028',
        query='Trace mission-thread to capability-gap path',
        max_hops=3,
    )
    reasoning_paths = mh.get('decisions', []) if isinstance(mh, dict) else []
except Exception as e:
    reasoning_paths = [{'error': str(e)}]

{'hop1': len(hop_1), 'hop2': len(hop_2), 'hop3': len(hop_3), 'multi_hop_results': len(reasoning_paths)}


In [None]:
from semantica.reasoning import Reasoner, ExplanationGenerator

reasoner = Reasoner()
reasoner.add_rule('IF MissionRequires(?m, LowAltitudeDetection) AND CoverageStatus(?m, Insufficient) THEN CapabilityGap(?m, LowAltitudeDetectionGap)')
reasoner.add_rule('IF CapabilityGap(?m, LowAltitudeDetectionGap) AND ThreatLevel(?m, High) THEN OutcomeRisk(?m, Elevated)')

reasoner.add_fact('MissionRequires(MissionThread_ForceProtection, LowAltitudeDetection)')
reasoner.add_fact('CoverageStatus(MissionThread_ForceProtection, Insufficient)')
reasoner.add_fact('ThreatLevel(MissionThread_ForceProtection, High)')

inferred = reasoner.forward_chain()

explanation_text = ''
if inferred:
    explanation_generator = ExplanationGenerator()
    explanation = explanation_generator.generate_explanation(inferred[-1])
    explanation_text = explanation.natural_language

{'inferred': [f.conclusion for f in inferred], 'explanation': explanation_text}

## Versioned Decision Governance (Policy / Ontology Change Tracking)

In [None]:
from semantica.change_management import VersionManager

version_manager = VersionManager(base_uri='https://example.org/mcg')

v1 = version_manager.create_version(
    '3.1',
    ontology={'uri': 'https://example.org/mcg', 'classes': [], 'properties': []},
    changes=['Initial capability-gap decision policy baseline'],
    metadata={'structure': {'classes': ['Scenario', 'MissionThread', 'CapabilityGap'], 'properties': ['revealsGap']}}
)

v2 = version_manager.create_version(
    '3.2',
    ontology={'uri': 'https://example.org/mcg', 'classes': [], 'properties': []},
    changes=['Added explicit policy exception and approval-chain trace constructs'],
    metadata={'structure': {'classes': ['Scenario', 'MissionThread', 'CapabilityGap', 'PolicyException', 'ApprovalChain'], 'properties': ['revealsGap', 'has_exception', 'approved_by_chain']}}
)

version_diff = version_manager.compare_versions('3.1', '3.2')
{'latest_version': version_manager.latest_version, 'classes_added': version_diff.get('classes_added', []), 'properties_added': version_diff.get('properties_added', [])}

In [None]:
from semantica.provenance import ProvenanceManager

provenance_db = OUTPUT_DIR / 'capability_gap_provenance.db'
prov = ProvenanceManager(storage_path=str(provenance_db))

for c in corpus:
    prov.track_entity(entity_id=f"source::{c['doc_id']}", source=c['source'], metadata={'document_type': 'corpus_source'})

for ent in all_entities[:80]:
    ent_id = str(getattr(ent, 'id', getattr(ent, 'text', 'unknown_entity')))
    src_doc = (getattr(ent, 'metadata', {}) or {}).get('source_doc', 'unknown_source')
    prov.track_entity(
        entity_id=f"entity::{ent_id}",
        source=src_doc,
        metadata={'entity_text': str(getattr(ent, 'text', ent_id)), 'entity_type': str(getattr(ent, 'label', 'entity'))}
    )

for i, rel in enumerate(all_relationships[:120]):
    src_doc = (getattr(rel, 'metadata', {}) or {}).get('source_doc', 'unknown_source')
    prov.track_relationship(relationship_id=f'rel::{i}', source=src_doc, metadata={'relation_type': str(getattr(rel, 'predicate', getattr(rel, 'type', 'related_to')))})

{'stats': prov.get_statistics(), 'lineage_sample': prov.get_lineage('entity::MissionThread_ForceProtection')}

## Observability-Style Monitoring for Agent Decisions

In [None]:
# Operational monitoring proxies using Semantica-native analytics outputs
context_insights = agent_context.get_context_insights()

decision_quality_monitor = {
    'decision_count': context_insights.get('decision_tracking', {}).get('total_decisions', 0),
    'graph_nodes': context_insights.get('knowledge_graph', {}).get('node_count', 0),
    'graph_edges': context_insights.get('knowledge_graph', {}).get('edge_count', 0),
    'provenance_entries': prov.get_statistics().get('total_entries', 0),
}

decision_quality_monitor

In [None]:
import semantica.export as export_module

kg_json_path = OUTPUT_DIR / 'capability_gap_kg.json'
context_json_path = OUTPUT_DIR / 'capability_gap_context_graph.json'
context_graphml_path = OUTPUT_DIR / 'capability_gap_context_graph.graphml'
kg_rdf_path = OUTPUT_DIR / 'capability_gap_kg.ttl'
kg_csv_base = OUTPUT_DIR / 'capability_gap_kg'

export_module.export_json(kg, kg_json_path, format='json')
export_module.export_json(context_graph.to_dict(), context_json_path, format='json')
export_module.export_graph(context_graph.to_dict(), context_graphml_path, format='graphml')
export_module.export_rdf(kg, kg_rdf_path, format='turtle')
export_module.export_csv({'entities': kg.get('entities', []), 'relationships': kg.get('relationships', [])}, kg_csv_base)

[str(kg_json_path), str(context_json_path), str(context_graphml_path), str(kg_rdf_path)]


## Export Layer (YAML, LPG, Report Generator)

- Exports: `export_json`, `export_graph`, `export_rdf`, `export_csv`, `export_yaml`, `export_lpg`, `ReportGenerator`
- Writes graph and analysis artifacts to multiple formats.
- Generates a report file.

In [None]:
import semantica.export as export_module

extra_exports = {}

try:
    yaml_path = OUTPUT_DIR / 'capability_gap_context_graph.yaml'
    export_module.export_yaml(context_graph.to_dict(), yaml_path)
    extra_exports['yaml'] = str(yaml_path)
except Exception as e:
    extra_exports['yaml_error'] = str(e)

try:
    lpg_path = OUTPUT_DIR / 'capability_gap_kg.cypher'
    export_module.export_lpg(kg, lpg_path, method='cypher')
    extra_exports['lpg'] = str(lpg_path)
except Exception as e:
    extra_exports['lpg_error'] = str(e)

try:
    report_data = {
        'title': 'Military Capability Gap Analysis - End-to-End Report',
        'summary': {
            'corpus_items': len(corpus),
            'extraction_items': len(extraction_corpus),
            'entities': len(all_entities),
            'relationships': len(all_relationships),
            'decisions': context_graph.get_decision_summary().get('total_decisions', 0),
        },
        'metrics': {
            'kg_entities': len(kg.get('entities', [])),
            'kg_relationships': len(kg.get('relationships', [])),
            'context_nodes': context_graph.stats().get('node_count', 0),
            'context_edges': context_graph.stats().get('edge_count', 0),
        },
        'analysis': {'kg_analysis': kg_analysis}
    }
    report_path = OUTPUT_DIR / 'capability_gap_analysis_report.md'
    generator = export_module.ReportGenerator(format='markdown', include_charts=False)
    generator.generate_report(report_data, report_path, format='markdown')
    extra_exports['report'] = str(report_path)
except Exception as e:
    extra_exports['report_error'] = str(e)

extra_exports


In [None]:
from semantica.visualization import KGVisualizer

viz = KGVisualizer(layout='force', color_scheme='default')
kg_html_path = OUTPUT_DIR / 'capability_gap_kg_network.html'

try:
    viz.visualize_network(kg, output='html', file_path=kg_html_path)
    viz_result = str(kg_html_path)
except Exception as e:
    viz_result = f'Visualization skipped: {e}'

viz_result

In [None]:
summary = {
    'corpus_items': len(corpus),
    'entities_extracted': len(all_entities),
    'relationships_extracted': len(all_relationships),
    'events_detected': len(all_events),
    'triplets_extracted': len(all_triplets),
    'kg_entities': len(kg.get('entities', [])),
    'kg_relationships': len(kg.get('relationships', [])),
    'context_graph_stats': context_graph.stats(),
    'reasoning_inferred_rules': [f.conclusion for f in inferred],
    'output_dir': str(OUTPUT_DIR),
    'extended_kg_analytics': extended_kg_analytics if 'extended_kg_analytics' in globals() else {},
    'extra_exports': extra_exports if 'extra_exports' in globals() else {},
    'ontology_details': ontology_details if 'ontology_details' in globals() else [],
}
summary

- Builds final summary dictionary.
- Shows counts and output paths from all pipeline stages.