# NSAI Level 1: Neuro-Symbolic Intent Classification

**2-Layer Architecture:**
- **Layer A (Statistical)**: TF-IDF + Logistic Regression for intent prediction
- **Layer B (Neuro-Symbolic)**: Deterministic rules with strict precedence that gate or override statistical output

**Dataset**: utterance, intent (4 classes: investigate, execution, summarization, out_of_scope)

**Output**: Structured, explainable decisions with signals + triggered rules + final intent

**Method**: Pure statistical learning + symbolic rule precedence (no LLMs, no timestamps)

## Part 1: Statistical Model Training

In [16]:
# Imports
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

In [17]:
# Load & Validate Data
df = pd.read_csv('../data/intents_base.csv')

# Assert columns
assert set(df.columns) == {'utterance', 'intent'}, f"Expected columns {{utterance, intent}}, got {set(df.columns)}"

# Normalize intent labels
df['intent'] = df['intent'].str.lower().str.strip()

# Show class distribution
print("Class Distribution:")
print(df['intent'].value_counts())
print(f"\nTotal: {len(df)} records")

Class Distribution:
intent
out_of_scope     169
execution        150
investigate      149
summarization    146
Name: count, dtype: int64

Total: 614 records


In [18]:
# Train/Test Split
X = df['utterance']
y = df['intent']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, 
    test_size=0.2, 
    random_state=42, 
    stratify=y
)

print(f"Train: {len(X_train)} | Test: {len(X_test)}")

Train: 491 | Test: 123


In [19]:
# Level-1 Model
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(
        max_features=5000,
        ngram_range=(1, 2),
        stop_words='english'
    )),
    ('classifier', LogisticRegression(
        solver='lbfgs',
        random_state=42,
        max_iter=1000
    ))
])

# Fit
pipeline.fit(X_train, y_train)
print("Model trained")

Model trained


In [20]:
# Evaluation
y_pred = pipeline.predict(X_test)

print("Classification Report:")
print(classification_report(y_test, y_pred))

print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred, labels=['investigate', 'execution', 'summarization', 'out_of_scope']))
print("\nLabels: ['investigate', 'execution', 'summarization', 'out_of_scope']")

Classification Report:
               precision    recall  f1-score   support

    execution       0.80      0.93      0.86        30
  investigate       0.92      0.73      0.81        30
 out_of_scope       0.94      1.00      0.97        34
summarization       1.00      0.97      0.98        29

     accuracy                           0.91       123
    macro avg       0.92      0.91      0.91       123
 weighted avg       0.92      0.91      0.91       123


Confusion Matrix:
[[22  7  0  1]
 [ 1 28  0  1]
 [ 1  0 28  0]
 [ 0  0  0 34]]

Labels: ['investigate', 'execution', 'summarization', 'out_of_scope']


In [21]:
# Token-Level Evidence
def predict_with_evidence(text, top_k=5):
    """Predict intent and extract top contributing tokens"""
    # Predict
    pred_intent = pipeline.predict([text])[0]
    proba = pipeline.predict_proba([text])[0]
    confidence = float(np.max(proba))
    
    # Get class index
    classes = pipeline.named_steps['classifier'].classes_
    class_idx = np.where(classes == pred_intent)[0][0]
    
    # Get coefficients for predicted class
    coef = pipeline.named_steps['classifier'].coef_[class_idx]
    
    # Get feature names
    feature_names = pipeline.named_steps['tfidf'].get_feature_names_out()
    
    # Transform text to get active features
    tfidf_vec = pipeline.named_steps['tfidf'].transform([text])
    active_features = tfidf_vec.toarray()[0]
    
    # Calculate contributions
    contributions = active_features * coef
    
    # Get top k
    top_indices = np.argsort(contributions)[-top_k:][::-1]
    
    evidence = []
    for idx in top_indices:
        if active_features[idx] > 0:
            evidence.append({
                'token': feature_names[idx],
                'weight': float(contributions[idx])
            })
    
    return {
        'predicted_intent': pred_intent,
        'confidence': confidence,
        'evidence_tokens': evidence
    }

print("Evidence extraction function defined")

Evidence extraction function defined


In [22]:
# Sanity Checks
test_cases = [
    "why is server cpu high",
    "restart nginx on host123",
    "summarize the incident from yesterday"
]

for utterance in test_cases:
    result = predict_with_evidence(utterance)
    print(f"\nUtterance: {utterance}")
    print(f"Predicted: {result['predicted_intent']}")
    print(f"Confidence: {result['confidence']:.4f}")
    print(f"Evidence: {result['evidence_tokens']}")


Utterance: why is server cpu high
Predicted: investigate
Confidence: 0.5534
Evidence: [{'token': 'high', 'weight': 0.3994956699594776}, {'token': 'cpu', 'weight': 0.37353200156682}, {'token': 'cpu high', 'weight': 0.12062450706017938}, {'token': 'server', 'weight': 0.1170763458242249}]

Utterance: restart nginx on host123
Predicted: execution
Confidence: 0.5797
Evidence: [{'token': 'restart', 'weight': 0.6980231193486794}, {'token': 'restart nginx', 'weight': 0.09781387842338801}, {'token': 'nginx host123', 'weight': 0.09781387842338801}, {'token': 'nginx', 'weight': 0.09781387842338801}, {'token': 'host123', 'weight': 0.05029071292704667}]

Utterance: summarize the incident from yesterday
Predicted: summarization
Confidence: 0.7562
Evidence: [{'token': 'summarize', 'weight': 1.3179584121997834}, {'token': 'incident', 'weight': 0.37610919040396457}, {'token': 'summarize incident', 'weight': 0.20255519935166402}, {'token': 'yesterday', 'weight': 0.08769134009940947}]


## Part 2: NSAI Symbolic Decision Layer

In [None]:
# Configuration Constants
CONFIG = {
    'BASE_MIN_CONF': 0.60,           # Minimum confidence for accepting prediction
    'MIN_MARGIN': 0.10,              # Minimum margin between top-2 predictions
    'EXECUTION_MIN_CONF': 0.85,      # Higher bar for execution intent
    'MIN_TOKENS_OUT_OF_SCOPE': 3,    # Token count threshold for out_of_scope
    'RANDOM_STATE': 42,              # For reproducibility
    'TEST_SIZE': 0.2                 # Train/test split ratio
}

# Explicit Rule Priority (highest to lowest)
RULE_PRIORITY = ["R1", "R4", "R2", "R3"]


print("Configuration loaded:")print(f"\nRule Priority: {RULE_PRIORITY}")

for key, val in CONFIG.items():    print(f"  {key} = {val}")

Configuration loaded:
  BASE_MIN_CONF = 0.6
  MIN_MARGIN = 0.1
  EXECUTION_MIN_CONF = 0.85
  MIN_TOKENS_OUT_OF_SCOPE = 3
  RANDOM_STATE = 42
  TEST_SIZE = 0.2


In [24]:
# Signal Extraction
def extract_signals(text, top_k=5):
    """Extract all signals from the model for a given utterance"""
    # Get probabilities
    proba = pipeline.predict_proba([text])[0]
    classes = pipeline.named_steps['classifier'].classes_
    
    # Sort by probability
    sorted_indices = np.argsort(proba)[::-1]
    max_confidence = float(proba[sorted_indices[0]])
    second_best_confidence = float(proba[sorted_indices[1]])
    margin = max_confidence - second_best_confidence
    
    # Get TF-IDF representation
    tfidf_vec = pipeline.named_steps['tfidf'].transform([text])
    active_features = tfidf_vec.toarray()[0]
    feature_names = pipeline.named_steps['tfidf'].get_feature_names_out()
    
    # Count meaningful tokens (non-zero TF-IDF)
    meaningful_tokens = np.sum(active_features > 0)
    
    # Extract top contributing tokens per intent
    top_tokens_per_intent = {}
    for intent_idx, intent in enumerate(classes):
        coef = pipeline.named_steps['classifier'].coef_[intent_idx]
        contributions = active_features * coef
        top_indices = np.argsort(contributions)[-top_k:][::-1]
        
        tokens = []
        for idx in top_indices:
            if active_features[idx] > 0:
                tokens.append({
                    'token': feature_names[idx],
                    'weight': float(contributions[idx])
                })
        top_tokens_per_intent[intent] = tokens
    
    # Build signals dictionary
    signals = {
        'probabilities': {intent: float(proba[i]) for i, intent in enumerate(classes)},
        'max_confidence': max_confidence,
        'second_best_confidence': second_best_confidence,
        'margin': margin,
        'predicted_intent': classes[sorted_indices[0]],
        'meaningful_tokens': int(meaningful_tokens),
        'top_tokens_per_intent': top_tokens_per_intent
    }
    
    return signals

print("Signal extraction function defined")

Signal extraction function defined


In [29]:
# NSAI Decision Rules with Strict Precedence
def apply_decision_rules(signals):
    """Apply deterministic rules with strict precedence hierarchy
    
    Intent = what user wants (always from model)
    Decision State = what system decides to do
    
    Rule Categories:
    - quality: input sufficiency (R1)
    - safety: execution controls (R4)
    - ambiguity: confidence/margin (R2, R3)
    
    Precedence follows RULE_PRIORITY: R1 > R4 > R2 > R3
    """
    triggered_rules = []
    predicted_intent = signals['predicted_intent']  # Always keep model prediction
    decision_state = 'accepted'
    decision_reason = 'model_prediction'
    
    # R1: QUALITY GATE (highest priority)
    if signals['meaningful_tokens'] < CONFIG['MIN_TOKENS_OUT_OF_SCOPE']:
        triggered_rules.append({
            'rule_id': 'R1',
            'category': 'quality',
            'priority': 100,
            'condition': f"meaningful_tokens < {CONFIG['MIN_TOKENS_OUT_OF_SCOPE']}",
            'signal_value': signals['meaningful_tokens']
        })
        decision_state = 'blocked'
        decision_reason = 'insufficient_tokens'
        # R1 overrides everything - return immediately
        return {
            'triggered_rules': triggered_rules,
            'predicted_intent': predicted_intent,
            'decision_state': decision_state,
            'decision_reason': decision_reason
        }
    
    # R4: SAFETY GATE
    if signals['predicted_intent'] == 'execution' and signals['max_confidence'] < CONFIG['EXECUTION_MIN_CONF']:
        triggered_rules.append({
            'rule_id': 'R4',
            'category': 'safety',
            'priority': 90,
            'condition': f"predicted_intent==execution AND max_confidence < {CONFIG['EXECUTION_MIN_CONF']}",
            'signal_value': signals['max_confidence']
        })
        # Block execution if confidence insufficient
        if signals['max_confidence'] >= CONFIG['BASE_MIN_CONF']:
            decision_state = 'blocked'
            decision_reason = 'execution_safety_block'
        else:
            decision_state = 'needs_clarification'
            decision_reason = 'execution_low_confidence'
        # R4 overrides R2 and R3 - return immediately
        return {
            'triggered_rules': triggered_rules,
            'predicted_intent': predicted_intent,
            'decision_state': decision_state,
            'decision_reason': decision_reason
        }
    
    # R2 & R3: AMBIGUITY GATES
    ambiguity_detected = False
    
    # R2: Low confidence
    if signals['max_confidence'] < CONFIG['BASE_MIN_CONF']:
        triggered_rules.append({
            'rule_id': 'R2',
            'category': 'ambiguity',
            'priority': 50,
            'condition': f"max_confidence < {CONFIG['BASE_MIN_CONF']}",
            'signal_value': signals['max_confidence']
        })
        ambiguity_detected = True
    
    # R3: Low margin
    if signals['margin'] < CONFIG['MIN_MARGIN']:
        triggered_rules.append({
            'rule_id': 'R3',
            'category': 'ambiguity',
            'priority': 40,
            'condition': f"margin < {CONFIG['MIN_MARGIN']}",
            'signal_value': signals['margin']
        })
        ambiguity_detected = True
    
    if ambiguity_detected:
        decision_state = 'needs_clarification'
        decision_reason = 'ambiguous_prediction'
    
    return {
        'triggered_rules': triggered_rules,
        'predicted_intent': predicted_intent,
        'decision_state': decision_state,
        'decision_reason': decision_reason
    }

print("Decision rules with strict precedence defined")

Decision rules with strict precedence defined


In [26]:
# Signal Voting Aggregation (Transparent Scoring)
def weighted_voting(signals):
    """Compute vote scores for transparency
    
    NOTE: Votes are for explainability only.
    Rules have final authority and override votes.
    """
    scores = {intent: 0.0 for intent in signals['probabilities'].keys()}
    vote_log = []
    
    # Vote 1: Base probability scores
    for intent, prob in signals['probabilities'].items():
        scores[intent] += prob * 1.0
        vote_log.append({
            'vote_id': 'V1',
            'source': 'model_probability',
            'intent': intent,
            'contribution': prob * 1.0
        })
    
    # Vote 2: Confidence bonus for high-confidence predictions
    if signals['max_confidence'] > CONFIG['EXECUTION_MIN_CONF']:
        bonus = 0.3
        scores[signals['predicted_intent']] += bonus
        vote_log.append({
            'vote_id': 'V2',
            'source': 'high_confidence_bonus',
            'intent': signals['predicted_intent'],
            'contribution': bonus
        })
    
    # Vote 3: Margin bonus
    if signals['margin'] > 0.20:
        bonus = 0.2
        scores[signals['predicted_intent']] += bonus
        vote_log.append({
            'vote_id': 'V3',
            'source': 'high_margin_bonus',
            'intent': signals['predicted_intent'],
            'contribution': bonus
        })
    
    return {
        'score_per_intent': scores,
        'vote_log': vote_log,
        'vote_winner': max(scores, key=scores.get)
    }

print("Transparent voting function defined")

Transparent voting function defined


In [30]:
# Demo: Explainable Inference
import json

test_utterances = [
    "restart nginx on host123",
    "why is server cpu high",
    "hello",
    "summarize yesterday's incidents"
]

for utterance in test_utterances:
    print(f"\n{'='*80}")
    print(f"UTTERANCE: {utterance}")
    print('='*80)
    
    # Layer A: Extract signals from statistical model
    signals = extract_signals(utterance)
    
    # Layer B: Apply neuro-symbolic rules
    rules_output = apply_decision_rules(signals)
    
    # Transparent voting (for explainability only)
    voting_output = weighted_voting(signals)
    
    # Structured output
    output = {
        'utterance': utterance,
        'predicted_intent': rules_output['predicted_intent'],
        'signals': {
            'max_confidence': signals['max_confidence'],
            'margin': signals['margin'],
            'meaningful_tokens': signals['meaningful_tokens'],
            'probabilities': signals['probabilities']
        },
        'triggered_rules': [
            {
                'rule_id': r['rule_id'],
                'category': r['category'],
                'priority': r['priority'],
                'condition': r['condition'],
                'value': r['signal_value']
            } for r in rules_output['triggered_rules']
        ],
        'decision_state': rules_output['decision_state'],
        'decision_reason': rules_output['decision_reason']
    }
    
    # Print structured output
    print(f"\nSTRUCTURED OUTPUT:")
    print(json.dumps(output, indent=2))
    
    # Print detailed breakdown
    print(f"\n{'─'*80}")
    print(f"LAYER A — STATISTICAL MODEL:")
    print(f"  predicted_intent: {signals['predicted_intent']}")
    print(f"  max_confidence: {signals['max_confidence']:.4f}")
    print(f"  margin: {signals['margin']:.4f}")
    print(f"  meaningful_tokens: {signals['meaningful_tokens']}")
    
    print(f"\nLAYER B — NEURO-SYMBOLIC RULES:")
    if output['triggered_rules']:
        print(f"  Triggered: {len(output['triggered_rules'])} rule(s)")
        for rule in sorted(output['triggered_rules'], key=lambda r: r['priority'], reverse=True):
            print(f"    [P{rule['priority']}] {rule['rule_id']}: {rule['condition']} (value={rule['value']:.4f})")
    else:
        print(f"  No rules triggered - accepting model prediction")
    
    print(f"\nTRANSPARENT VOTING (for reference):")
    print(f"  vote_winner: {voting_output['vote_winner']}")
    for intent, score in sorted(voting_output['score_per_intent'].items(), key=lambda x: x[1], reverse=True):
        print(f"    {intent}: {score:.4f}")
    
    print(f"\n{'─'*80}")
    print(f"FINAL DECISION:")
    print(f"  predicted_intent: {output['predicted_intent']}")
    print(f"  decision_state: {output['decision_state']}")
    print(f"  decision_reason: {output['decision_reason']}")
    
    if rules_output['triggered_rules']:
        highest_priority_rule = sorted(rules_output['triggered_rules'], key=lambda r: r['priority'], reverse=True)[0]
        print(f"  ↳ Decision controlled by {highest_priority_rule['rule_id']} ({highest_priority_rule['category']})")
    else:
        print(f"  ↳ Model prediction accepted")


UTTERANCE: restart nginx on host123

STRUCTURED OUTPUT:
{
  "utterance": "restart nginx on host123",
  "predicted_intent": "execution",
  "signals": {
    "max_confidence": 0.5796926800149172,
    "margin": 0.41430265968025004,
    "meaningful_tokens": 5,
    "probabilities": {
      "execution": 0.5796926800149172,
      "investigate": 0.1494487335143245,
      "out_of_scope": 0.16539002033466724,
      "summarization": 0.10546856613609105
    }
  },
  "triggered_rules": [
    {
      "rule_id": "R4",
      "category": "safety",
      "priority": 90,
      "condition": "predicted_intent==execution AND max_confidence < 0.85",
      "value": 0.5796926800149172
    }
  ],
  "decision_state": "needs_clarification",
  "decision_reason": "execution_low_confidence"
}

────────────────────────────────────────────────────────────────────────────────
LAYER A — STATISTICAL MODEL:
  predicted_intent: execution
  max_confidence: 0.5797
  margin: 0.4143
  meaningful_tokens: 5

LAYER B — NEURO-SYMB