In [1]:
import spacy

from spacy.matcher import Matcher, PhraseMatcher
from spacy.pipeline import EntityRuler

# High-Level Process
- Process document
- Identify targets
- Define modifiers (KB)
- Mark items (modifiers)
- Apply modifiers (build edges)
- Add negated attributes

# Define classes similar to pyConText

In [2]:
class ItemData:
    """Defines a context item rule, such as a modifier."""
    def __init__(self, literal, category, regex=None, rule="bidirectional"):
        self.literal = literal
        self.category = category
        self.regex = regex
        self.rule = rule.lower()
        
    def __repr__(self):
        return f"ItemData: [{self.literal}, {self.category}, {self.regex}, {self.rule}]"

In [3]:
class TagObject:
    """A marked up modifier in the doc."""
    
    def __init__(self, match_id, start, end, doc, _modifier_item_mapping):
        self.match_id = match_id
        self.start = start
        self.end = end
        self.doc = doc
        self.item_data = _modifier_item_mapping[match_id]
    
        self._scope_start = None
        self._scope_end = None
        self.set_scope()
    
    @property
    def span(self):
        """The spaCy Span object, which is a view of self.doc, covered by this match."""
        return self.doc[self.start: self.end]
    
    @property
    def rule(self):
        return self.item_data.rule
    
    @property
    def category(self):
        return self.item_data.category
    
    @property
    def scope(self):
        return self.doc[self._scope_start: self._scope_end]
    
    def set_scope(self):
        """Applies the rule of the ItemData which generated this TagObject
        to define a scope in the sentence.
        For example, if the rule is "forward", the scope will be [self.end: sentence.end].
        If the rule is "backward", it will be [self.start: sentence.start].
        """
        sent = self.doc[self.start].sent
        
        if self.rule.lower() == "forward":
            self._scope_start, self._scope_end = self.end, sent.end
        elif self.rule.lower() == "backward":
            self._scope_start, self._scope_end = sent.start, self.start
        else:
            self._scope_start, self._scope_end = sent.start, sent.end
            
    def update_scope(self, span):
        self._scope_start, self._scope_end = span.start, span.end
        
    def limit_scope(self, other):
        """If self and obj have the same category or if obj has a directionality of 'terminate',
        use the span of obj to update the scope of self.
        Returns True if obj modfified the scope of self
        """
        if self.span.sent != other.span.sent:
            return False
        if self.rule.lower() == "terminate":
            return False
        if other.rule.lower() not in ("terminate", self.rule.lower()):
            return False

        orig_scope = self.scope

        if (self.rule.lower() in ("forward", "bidirectional")):
            if other > self:
                self._scope_end = min(self._scope_end, other.start)
        elif (self.rule.lower() in ("backward", "bidirectional")):
            if other < self:
                self._scope_start = max(self._scope_start, other.end)
        if orig_scope != self.scope:
            return True
        else:
            return False
        
    def modifies(self, target):
        """Returns True if the target is within the modifier scope."""
        if self.rule == "terminate":
            return False
        if self._scope_start <= target.start < self._scope_end:
            return True
        if self._scope_start <= target.end < self._scope_end:
            return True
        return False
            
    
            
    def __gt__(self, other): return self.span > other.span
    def __ge__(self, other): return self.span >= other.span
    def __lt(self, other): return self.span < other.span
    def __lt(self, other): return self.span <= other.span
    def __repr__(self):
        return f"<TagObject> [{self.span}, {self.category}]"

# Wrap up in a Component

In [4]:
from spacy.tokens import Doc, Span

In [5]:
Span.set_extension("is_negated", default=False, force=True)

In [6]:
Doc.set_extension("context_edges", default=(), force=True)

In [7]:
class ConTextComponent:
    name = "context"
    
    def __init__(self, item_data, nlp):
        self.item_data = item_data
        self.nlp = nlp
        
        self._modifier_item_mapping = dict()
        self.phrase_matcher = PhraseMatcher(nlp.vocab)
        for i, item in enumerate(item_data):
            uid = self.nlp.vocab.strings[str(i)] # UID is the hash which we'll use to retrieve the ItemData from a spaCy match
            self.phrase_matcher.add(str(i), 
                           None, 
                           nlp(item.literal)        )
            self._modifier_item_mapping[uid] = item
    
    def update_scopes(self, marked_modifiers):
        for i in range(len(marked_modifiers) - 1):
            modifier1 = marked_modifiers[i]
            for j in range(i+1, len(marked_modifiers)):
                modifier2 = marked_modifiers[j]
                # TODO: Add modifier -> modifier edges
                modifier1.limit_scope(modifier2)
                modifier2.limit_scope(modifier1)
                
    def apply_modifiers(self, marked_targets, marked_modifiers):
        edges = []
        for target in marked_targets:
            for modifier in marked_modifiers:
                if modifier.modifies(target):
                    edges.append((target, modifier))
        return edges
        
        
    def __call__(self, doc):
        targets = doc.ents
        marked_modifiers = []
        matches = self.phrase_matcher(doc)
        for (match_id, start, end) in matches:
            tag_object = TagObject(match_id, start, end, doc, self._modifier_item_mapping)
            marked_modifiers.append(tag_object)
        
        self.update_scopes(marked_modifiers)
        edges = self.apply_modifiers(targets, marked_modifiers)
        doc._.context_edges = edges
        
        # Add negation attributes to entities
        for target, modifier in edges:
            if modifier.category.lower() == "definite_negated_existence":
                target._.is_negated = True
            else:
                target._.is_negated = False
        return doc

# Add to model and process documents

In [8]:
nlp = spacy.load("en_core_web_sm")
_ = nlp.remove_pipe("ner")

In [9]:
item_data = [ItemData("no evidence of", category="DEFINITE_NEGATED_EXISTENCE", rule="forward"),
                  ItemData("but", category="TERMINATE", rule="terminate"),
            ItemData("is ruled out", category="DEFINITE_NEGATED_EXISTENCE", rule="backward")]

In [10]:
context = ConTextComponent(item_data, nlp)

In [11]:
# Use the EntityRuler class for rule-based NER
ruler = EntityRuler(nlp, overwrite_ents=True)

patterns = [
    [{"lower": "pneumonia"}],
    [{"lower": "asthma"}],
    [{"lower": "angina"}],
    [{"lower": "chf"}],
    [{"lower": "pe"}],
    [{"lower": {"REGEX": "clot"}}],
    
]

patterns = [{"label": "CONDITION", "pattern": pattern} for pattern in patterns]

ruler.add_patterns(patterns)

In [12]:
nlp.add_pipe(ruler)

In [13]:
nlp.add_pipe(context)

In [14]:
sents = ["There is no evidence of pneumonia or asthma but there is angina.",
        "There is CHF.",
        "PE is ruled out.",
        "There is clotting but pe is ruled out."]

In [15]:
docs = list(nlp.pipe(sents))

In [21]:
for doc in docs:
    print(doc)
    for ent in doc.ents:
        print("-\t", ent, ent._.is_negated)
    print()

There is no evidence of pneumonia or asthma but there is angina.
-	 pneumonia True
-	 asthma True
-	 angina False

There is CHF.
-	 CHF False

PE is ruled out.
-	 PE True

There is clotting but pe is ruled out.
-	 clotting False
-	 pe True



In [22]:
for doc in docs:
    print(doc._.context_edges)

[(pneumonia, <TagObject> [no evidence of, DEFINITE_NEGATED_EXISTENCE]), (asthma, <TagObject> [no evidence of, DEFINITE_NEGATED_EXISTENCE])]
[]
[(PE, <TagObject> [is ruled out, DEFINITE_NEGATED_EXISTENCE])]
[(pe, <TagObject> [is ruled out, DEFINITE_NEGATED_EXISTENCE])]


# Visualize results

In [23]:
from spacy import displacy

### Entities

In [24]:
def visualize_targets(doc):
    ents_data = []
    modifier_start_chars = set()
    for target in doc.ents:
        ents_data.append({"start": target.start_char, "end":  target.end_char, "label": target.label_})
    for _, modifier in doc._.context_edges:
        if modifier.span.start_char not in modifier_start_chars:
            ents_data.append({"start": modifier.span.start_char, "end": modifier.span.end_char, "label": modifier.category})
            modifier_start_chars.add(modifier.span.start_char)
    ents_data = sorted(ents_data, key=lambda x: x["start"])

    viz_data = [{"text": doc.text,
                "ents": ents_data,
                }]
    options = {"colors": {"CONDITION": "orange", "DEFINITE_NEGATED_EXISTENCE": "#a2bde8"},
              }
    displacy.render(viz_data, style="ent", manual=True, options=options)

In [25]:
visualize_targets(docs[0])

### Relationships

In [26]:
def visualize_modifiers(doc):
    dep_data = {"words": [],
               "arcs": []}
    dep_data["words"] = [{"text": token.text, "tag": ""} for token in doc]
    for target, modifier in doc._.context_edges:
        if modifier.category.lower() == "definite_negated_existence":
            dep_data["arcs"].append(
                {
                    "start": min(target.start, modifier.start),
                    "end": max(target.start, modifier.start),
                    "label": modifier.category,
                    "dir": "right" if target > modifier.span else "left"
                }
            )
    displacy.render(dep_data, manual=True)

In [27]:
visualize_modifiers(docs[0])