Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/nvisy-engine/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
//! - [`RetryExt`]: automatic retry with configurable backoff.
//! - [`TimeoutExt`]: wall-clock deadline enforcement for pipeline phases.

mod petgraph;
mod retry;
mod timeout;
mod petgraph_ext;
mod retry_ext;
mod timeout_ext;

pub(crate) use self::petgraph::GraphExt;
pub(crate) use self::petgraph_ext::GraphExt;
#[allow(unused_imports)] // wired when operations gain internal retry
pub(crate) use self::retry::RetryExt;
pub(crate) use self::timeout::TimeoutExt;
pub(crate) use self::retry_ext::RetryExt;
pub(crate) use self::timeout_ext::TimeoutExt;
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashSet;
use nvisy_ontology::entity::{Entities, Entity, RefinementMethod};
use nvisy_ontology::workflow::{DeduplicationStrategy, GroupingCriteria};

use super::grouping::GroupEntities;
use super::group_entities::GroupEntities;
use super::span_size::SpanSize;
use crate::operation::Document;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,10 @@ use std::mem;
use nvisy_ontology::entity::{Entities, Entity, EntityKind, Overlap};
use nvisy_ontology::workflow::GroupingCriteria;

use super::group_key::GroupKey;
use crate::operation::Document;

const TARGET: &str = "nvisy_engine::op::deduplication::grouping";

/// Hash key for the first grouping phase.
///
/// For [`Strict`] and [`Narrowing`]/[`Widening`] this stores the exact
/// value; for [`Normalized`] it stores the lowercased, trimmed form.
///
/// [`Strict`]: GroupingCriteria::Strict
/// [`Narrowing`]: GroupingCriteria::Narrowing
/// [`Widening`]: GroupingCriteria::Widening
/// [`Normalized`]: GroupingCriteria::Normalized
#[derive(Hash, PartialEq, Eq)]
struct GroupKey {
kind: EntityKind,
value: String,
}

impl GroupKey {
async fn new(entity: &Entity, criteria: GroupingCriteria, document: &Document) -> Self {
// Entities without a text value (e.g. image bounding boxes)
// get a unique sentinel so they don't all bucket together.
// They will still be grouped by location overlap in phase 2.
let value = match document.value_at(&entity.location).await {
Some(v) => criteria.bucket_value(&v),
None => entity.id.to_string(),
};
Self {
kind: entity.entity_kind,
value,
}
}
}
const TARGET: &str = "nvisy_engine::op::deduplication::group_entities";

/// Extension trait that groups entities for deduplication.
pub(super) trait GroupEntities {
Expand Down
41 changes: 41 additions & 0 deletions crates/nvisy-engine/src/operation/deduplication/group_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! Hash key for the first grouping phase of deduplication.

use nvisy_ontology::entity::{Entity, EntityKind};
use nvisy_ontology::workflow::GroupingCriteria;

use crate::operation::Document;

/// Hash key for the first grouping phase.
///
/// For [`Strict`] and [`Narrowing`]/[`Widening`] this stores the exact
/// value; for [`Normalized`] it stores the lowercased, trimmed form.
///
/// [`Strict`]: GroupingCriteria::Strict
/// [`Narrowing`]: GroupingCriteria::Narrowing
/// [`Widening`]: GroupingCriteria::Widening
/// [`Normalized`]: GroupingCriteria::Normalized
#[derive(Hash, PartialEq, Eq)]
pub(super) struct GroupKey {
pub(super) kind: EntityKind,
pub(super) value: String,
}

impl GroupKey {
pub(super) async fn new(
entity: &Entity,
criteria: GroupingCriteria,
document: &Document,
) -> Self {
// Entities without a text value (e.g. image bounding boxes)
// get a unique sentinel so they don't all bucket together.
// They will still be grouped by location overlap in phase 2.
let value = match document.value_at(&entity.location).await {
Some(v) => criteria.bucket_value(&v),
None => entity.id.to_string(),
};
Self {
kind: entity.entity_kind,
value,
}
}
}
40 changes: 21 additions & 19 deletions crates/nvisy-engine/src/operation/deduplication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@
//! [`DeduplicationStrategy`]: nvisy_ontology::workflow::DeduplicationStrategy
//! [`ConflictResolution`]: nvisy_ontology::workflow::ConflictResolution

mod calibration;
mod conflict;
mod grouping;
mod calibrate_entities;
mod fuse_entities;
mod group_entities;
mod group_key;
mod resolve_conflicts;
pub(crate) mod span_size;
mod strategy;

use std::mem;

use nvisy_core::Result;
use nvisy_ontology::entity::Entities;
use nvisy_ontology::workflow::{
CalibrationMap, ConflictResolution, Deduplication, DeduplicationStrategy, GroupingCriteria,
CalibrationMap, ConflictResolution, Deduplication as DeduplicationConfig,
DeduplicationStrategy, GroupingCriteria,
};

use self::calibration::CalibrationExt;
use self::conflict::ConflictResolutionExt;
use self::strategy::DeduplicationStrategyExt;
use self::calibrate_entities::CalibrationExt;
use self::fuse_entities::DeduplicationStrategyExt;
use self::resolve_conflicts::ConflictResolutionExt;
use crate::operation::{DocumentEnvelope, Operation};

const TARGET: &str = "nvisy_engine::op::deduplication";
Expand All @@ -46,17 +48,17 @@ const TARGET: &str = "nvisy_engine::op::deduplication";
/// threshold filtering operation.
///
/// Created from the [`Deduplication`] graph node configuration.
pub struct DeduplicationOp {
pub struct Deduplication {
grouping: GroupingCriteria,
strategy: DeduplicationStrategy,
calibration: CalibrationMap,
confidence_threshold: Option<f64>,
conflict_resolution: ConflictResolution,
}

impl DeduplicationOp {
/// Create from a [`Deduplication`] graph node config.
pub fn new(cfg: &Deduplication) -> Self {
impl Deduplication {
/// Create from a [`DeduplicationConfig`] graph node config.
pub fn new(cfg: &DeduplicationConfig) -> Self {
tracing::debug!(
target: TARGET,
grouping = ?cfg.grouping,
Expand Down Expand Up @@ -118,7 +120,7 @@ impl DeduplicationOp {
}
}

impl Operation for DeduplicationOp {
impl Operation for Deduplication {
async fn execute(&self, envelope: &mut DocumentEnvelope) -> Result<()> {
if !envelope.audit.entities.is_empty() {
tracing::debug!(
Expand Down Expand Up @@ -460,11 +462,11 @@ mod tests {
#[tokio::test]
async fn confidence_threshold_filters() {
let doc = Document::from_text("John......Jane").await;
let cfg = Deduplication {
let cfg = DeduplicationConfig {
confidence_threshold: Some(0.85),
..Default::default()
};
let op = DeduplicationOp::new(&cfg);
let op = Deduplication::new(&cfg);
let entities: Entities = vec![
Entity::test_builder(0, 4).test_build(),
Entity::test_builder(10, 14)
Expand All @@ -481,11 +483,11 @@ mod tests {
#[tokio::test]
async fn full_pipeline() {
let doc = Document::from_text(TEST_TEXT).await;
let cfg = Deduplication {
let cfg = DeduplicationConfig {
strategy: DeduplicationStrategy::MaxConfidence,
..Default::default()
};
let op = DeduplicationOp::new(&cfg);
let op = Deduplication::new(&cfg);
let entities: Entities = vec![
Entity::test_builder(0, 4).with_confidence(0.7).test_build(),
Entity::test_builder(0, 4).with_confidence(0.8).test_build(),
Expand All @@ -507,8 +509,8 @@ mod tests {
#[tokio::test]
async fn empty_input() {
let doc = Document::from_text("").await;
let cfg = Deduplication::default();
let op = DeduplicationOp::new(&cfg);
let cfg = DeduplicationConfig::default();
let op = Deduplication::new(&cfg);
let result = op.deduplicate(Entities::new(), &doc).await;
assert!(result.is_empty());
}
Expand Down
38 changes: 12 additions & 26 deletions crates/nvisy-engine/src/operation/detection/entity_recognition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,25 @@
use nvisy_codec::Span;
use nvisy_codec::handler::TextData;
use nvisy_core::{Error, ErrorKind, Result};
use nvisy_ontology::entity::{Entity, TextLocation};
use nvisy_ontology::entity::{Entities, TextLocation};
use nvisy_ontology::workflow::NerDetection;
use nvisy_provider::agent::{DetectionConfig, NerAgent};
use nvisy_provider::http::HttpClient;

use super::rebase_entities::RebaseEntities;
use crate::operation::{DocumentEnvelope, Operation};
use crate::pipeline::RuntimeConfig;

const TARGET: &str = "nvisy_engine::op::entity_recognition";

/// NER-based entity recognition. Wraps an [`NerAgent`] which manages
/// coreference state internally between successive text spans.
pub struct EntityRecognitionOp {
pub struct EntityRecognition {
agent: NerAgent,
config: DetectionConfig,
}

impl EntityRecognitionOp {
impl EntityRecognition {
/// Build from graph config and runtime dependencies.
pub async fn new(
cfg: &NerDetection,
Expand All @@ -47,43 +48,28 @@ impl EntityRecognitionOp {
Ok(Self { agent, config })
}

async fn detect(&self, spans: &[Span<TextLocation, TextData>]) -> Result<Vec<Entity>> {
async fn detect(&self, spans: &[Span<TextLocation, TextData>]) -> Result<Entities> {
tracing::debug!(target: TARGET, span_count = spans.len(), "running NER");
let mut entities = Vec::new();
let mut entities = Entities::new();

for span in spans {
let detected = self
let detected: Entities = self
.agent
.detect_entities(span.data.as_str(), &self.config)
.await
.map_err(|e| Error::runtime(e.to_string(), "ner-agent", e.is_retryable()))?;
.map_err(|e| Error::runtime(e.to_string(), "ner-agent", e.is_retryable()))?
.into();

for mut entity in detected {
// Adjust entity's text location offsets to be relative
// to the document (not the span) by adding the span's
// start offset.
if let nvisy_ontology::entity::Location::Text(ref mut elem) = entity.location {
elem.start_offset += span.location.start_offset;
elem.end_offset += span.location.start_offset;
}

entities.push(entity);
}
entities.extend(detected.rebase_offsets(span));
}

Ok(entities)
}
}

impl Operation for EntityRecognitionOp {
impl Operation for EntityRecognition {
async fn execute(&self, envelope: &mut DocumentEnvelope) -> Result<()> {
let locations = envelope.document.collect_text_locations().await;
let mut spans: Vec<Span<TextLocation, TextData>> = Vec::with_capacity(locations.len());
for located in locations {
if let Some(data) = envelope.document.read_text(&located.location).await {
spans.push(Span::from_located(located, data));
}
}
let spans = envelope.document.collect_text_spans().await;
if !spans.is_empty() {
let detected = self.detect(&spans).await?;
tracing::debug!(
Expand Down
16 changes: 10 additions & 6 deletions crates/nvisy-engine/src/operation/detection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Detection operations: NER (language model) and pattern matching.
//! Entity recognition operations: NER (language-model) and pattern
//! matching.
//!
//! Both methods detect entities in extracted text. They are logically
//! independent and run sequentially per document within the detection
//! phase.
//! Both methods detect entities in extracted text and run sequentially
//! within the detection phase. They share the [`RebaseEntities`]
//! extension trait for shifting per-span offsets onto document
//! coordinates.

mod entity_recognition;
mod pattern_engine;
mod pattern_recognition;
mod rebase_entities;

pub(crate) use self::entity_recognition::EntityRecognitionOp;
pub(crate) use self::pattern_recognition::PatternRecognitionOp;
pub(crate) use self::entity_recognition::EntityRecognition;
pub(crate) use self::pattern_recognition::PatternRecognition;
57 changes: 57 additions & 0 deletions crates/nvisy-engine/src/operation/detection/pattern_engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! [`PatternEngineRef`]: thin wrapper that holds either a borrowed reference
//! to the global [`PatternEngine`] singleton or an owned engine built
//! from custom config, exposed uniformly via [`Deref`].
//!
//! [`PatternEngine`]: nvisy_pattern::PatternEngine

use std::ops::Deref;

use nvisy_ontology::workflow::PatternDetection;
use nvisy_pattern::PatternEngine;

/// Holds either a borrowed reference to the global singleton or an
/// owned engine built from custom config.
pub(super) enum PatternEngineRef {
/// The process-wide [`PatternEngine::instance`] singleton, used
/// when the [`PatternDetection`] config carries default settings
/// (no custom patterns, no custom threshold) so every run can
/// share the same compiled regex/dictionary automata.
Shared(&'static PatternEngine),
/// A freshly compiled engine carrying this run's custom
/// configuration. Owned because no other run will see the same
/// pattern set / threshold combination.
Owned(PatternEngine),
}

impl PatternEngineRef {
/// Resolve a [`PatternDetection`] config into either a borrowed
/// reference to the shared singleton (when the config is the
/// default empty shape) or a freshly built owned engine (when the
/// config names patterns or sets a confidence threshold).
pub(super) fn new(cfg: &PatternDetection) -> Self {
let needs_custom = !cfg.patterns.is_empty() || cfg.confidence_threshold.is_some();
if !needs_custom {
return Self::Shared(PatternEngine::instance());
}
let mut builder = PatternEngine::builder();
if !cfg.patterns.is_empty() {
let names: Vec<&str> = cfg.patterns.iter().map(String::as_str).collect();
builder = builder.with_patterns(&names);
}
if let Some(threshold) = cfg.confidence_threshold {
builder = builder.with_confidence_threshold(threshold);
}
Self::Owned(builder.build().expect("pattern engine must compile"))
}
}

impl Deref for PatternEngineRef {
type Target = PatternEngine;

fn deref(&self) -> &Self::Target {
match self {
Self::Shared(e) => e,
Self::Owned(e) => e,
}
}
}
Loading