In [1]:
import spacy
import xml.etree.ElementTree as ET
from spacy.symbols import nsubj, dobj, pobj, iobj, neg, xcomp, VERB
import pandas as pd
import re
import os

nlp=spacy.load('en_core_web_lg')

def merge_trip(df):
    if df.shape[0] > 1:
        return [df.iloc[0].noun, df.iloc[0].verb, df.iloc[1].noun]

## make sure to allign relation name between this data and own data

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
spacy.explain("ner")



In [None]:
def verb_code_dict(pico_path, verb_path):
    """reads coding ontology and verb lists, 
    directly matches verbs to their CAMEO codes and returns this verbs:codes dictionairy.
    verb with codes that cannot be read are printed out as full line of the file"""
    #read PETRARCH Internal Coding Ontology (= pico)
    pico_path = os.path.join(os.getcwd(), pico_path)
    pico_file = open(pico_path, 'r')
    pico_lines = pico_file.readlines()

    #get all 20 codes with their respective code
    main_codes = {}                             #we run one iteration for all the main codes, only main codes contain relation name
    for line in pico_lines:
        line = line.split('#')
        if line[0] == "" or line[0] == "\n":    #only intro comments and empty lines
            continue
        else: 
            code_split = line[0].split(":")     #splits into CAMEO code and related hex
            if len(line) > 1 and code_split[0][2] == "0":      #only main categories have 0 in 3rd idx, [cat_num 0] -> [010]
                main_codes[code_split[0][:2]] = line[-1].replace("\n","")
    
    #map code to code we want to use in the training
    map_codes = {"DiplomaticCoop" : "Engage In Diplomatic Cooperation", 
                "MaterialCoop" : "Engage In Material Cooperation",
                "ProvideAid" : "Provide Aid",
                "Exhibit Force Posture": "Exhibit Military Posture",
                "Use Unconventional Mass Violence" : "Engage In Unconventional Mass Violence"}
    main_codes = {k: (map_codes[v] if v in map_codes else v) for k, v in main_codes.items()}
    
    #read verbs and match their code to the relation extracted in main_codes
    verb_path = os.path.join(os.getcwd(), verb_path)
    verb_file = open(verb_path, 'r')
    verb_lines = verb_file.readlines()
    
    verb_dict = {}
    for line in verb_lines:
        if line[0] == "#":
            continue
        elif line.startswith("---"):    #main verbs have a lead code, which is applied to all very in the section
                                        #unless a separate code is specified for a specific verb in section
            try: cur_main_code = re.split("\[|\]|---", line)[2].replace(":","")[:2]  #we only need main codes which are first two numbers
                                                                                #sometimes code starts with ":", e.g.: ---  OFFEND   [:110]  ---
                                                                                #we just remove those to get the main code
            except:                     #depending on chosen verb dictionairy, there may be main verbs without lead codes
                print("couldn't finde code in: ", line.replace("\n","")) 
                cur_main_code == "--"
            if cur_main_code == "": cur_main_code = "--"
        elif line == "\n":              #skip empty lines
            continue
        elif line[0] == "-" or line[0] == "~" or line[0] == "+" or line[0] == "&": #removes all special structures we cannot use
            continue
        else:
            if len(re.split("\[|\]", line)) > 1:    #verbs with their own code, e.g.: AFFIRM [051] 
                code = re.split("\[|\]", line)[1].replace(":","")[:2]
                if code != "--":
                    if "{" in line:         #conjugated verbs, e.g. "APPLY {APPLYING APPLIED APPLIES } [020]"
                        line_s = re.split("\{|\}", line)    #split at { and }
                        verb_dict[line_s[0]] = main_codes[code] 
                        for word in line_s[1].split():
                            verb_dict[word.lower()] = main_codes[code]
                    else:
                        word = re.split("\[|\]", line)[0]
                        verb_dict[word.lower()] = main_codes[code]
            else:
                if cur_main_code != "--":
                    if "{" in line:         #e.g. "HURRY {HURRIES HURRYING HURRIED }" 
                        line_s = re.split("\{|\}", line)    #split at { and }
                        verb_dict[line_s[0]] = main_codes[cur_main_code]
                        for word in line_s[1].split():
                            verb_dict[word.lower()] = main_codes[cur_main_code]
                    else:                   #only single words with sometimes comments, e.g.: CENSURE  # JON 5/17/95
                        word = line.split("#")[0].rstrip()    #gets part before "#", removes all whitespaces to the right
                        verb_dict[word.lower()] = main_codes[cur_main_code]

    return verb_dict

In [None]:
#version1

# doc = nlp(text2)
# verbs = []
# dict = {}
# for possible_verb in doc:
#     if possible_verb.pos == VERB:
#         if neg in [child.dep for child in possible_verb.children]: continue
#         else: 
#             for chunk in doc.noun_chunks:
#                 if chunk.root.head.idx == possible_verb.idx:
#                     verbs.append([possible_verb.idx, possible_verb, chunk.text, chunk.root.dep_])
#                     if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
#                     else: dict[possible_verb.idx] = 1
        

# trip_idx = [key for key in dict if dict[key] > 1]
# verbs, trip_idx

In [17]:
#version2 - for text2: technically want join is an xcomp so the child entity of want should be treated as child entity of join
def get_triples(sentence, verb_dict):
    """create triplet structure for training from text input, 
    verb_dict needs to be loaded before,
    spacy model needs to be initialized before """
    doc = nlp(sentence)
    verbs = []
    dict = {}

    for possible_verb in doc:
        if possible_verb.pos == VERB:
            if neg in [child.dep for child in possible_verb.children]: continue
            else: 
                for possible_subject in possible_verb.children: 
                    if possible_subject.dep == xcomp:   #subj / obj of composed verb should also be subj / obj of main verb
                        main_verb = possible_subject
                        main_idx = possible_subject.idx
                        for chunk in doc.noun_chunks:
                            if chunk.root.head.idx == possible_verb.idx:
                                verbs.append([main_idx, main_verb.lemma_, chunk.text, chunk.root.dep_])
                                if main_idx in dict.keys(): dict[main_idx] += 1
                                else: dict[main_idx] = 1

                for chunk in doc.noun_chunks:
                    if chunk.root.head.idx == possible_verb.idx:
                        verbs.append([possible_verb.idx, possible_verb.lemma_, chunk.text, chunk.root.dep_])
                        if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                        else: dict[possible_verb.idx] = 1
    
    trip_idx = [key for key in dict if dict[key] > 1]

    #priority for subj-relation-obj triplets
    mapper = {"nsubj":1,"dobj":2, "pobj":2, "iobj":2}

    #create df from verbs extracted 
    df = pd.DataFrame(verbs, columns = ["idx", "verb", "noun", "noun_type"])
    df["noun_type"] = df.noun_type.map(mapper)  #turn noun_types into priority 

    #create groups that resolve around same word
    gb = df.groupby('idx')    
    #only keep groups if verb idx was identified as potential triplet before, sort by priority for structure
    df_l = [gb.get_group(x).sort_values("noun_type") for x in gb.groups if gb.get_group(x).idx.iloc[0] in dict]
    matches = [merge_trip(group) for group in df_l if not merge_trip(group) == None] #get groups into triplet structure
    
    #turn matches into triples by only keeping those with coded verbs, return code instead of verb
    triples = [[f"<triplet>{match[0]}<subj>{match[2]}<obj>{verb_dict[match[1]]}"] for match in matches if match[1].lower() in verb_dict]

    return triples

In [6]:
text = "According to a poll by Kyodo news agency released Saturday, 78 of 100 people surveyed opposed the military action in Iraq."
text2 = "I want to join together the feelings of each of us as individuals who oppose the war."
text3 = "I am getting hold of you"
text4 = "Russia ends ties with EU, US, Australia"

In [7]:
verb_dict = verb_code_dict("dictionaries/PETR.Internal.Coding.Ontology.txt", "dictionaries/newdict.txt")

#CAMEO.2.0.txt = 1451 words with relations
#CAMEO.2.0_unsorted.txt = 1452 words with relations
#CAMEO.verbpatterns.150430.txt = 1514 words with relations
#newdict.txt = 1522 words with relations
len(verb_dict)

couldn't finde code in:  --- DEFEND  ###
couldn't finde code in:  --- REVOKE_   ###
couldn't finde code in:  --- SEND   ###
couldn't finde code in:  --- COLLAPSE  ###


1522

In [19]:
for sent in [text, text2, text3, text4]:
    print(get_triples(sent, verb_dict = verb_dict))

[['<triplet>78 of 100 people<subj>the military action<obj>Disapprove']]
[['<triplet>I<subj>the feelings<obj>Consult'], ['<triplet>who<subj>the war<obj>Disapprove']]
[]
[]


In [8]:
import os
import sys
os.getcwd()

'c:\\Users\\svawe\\Thesis_RelationExtraction_PoliticsNews\\soft_data\\src\\add_labels'

In [27]:
text6 = "The Berlin School of Economics and Law is situated in Berlin Schöneberg"
from spacy import displacy
from spacy.tokens import Span
from pathlib import Path
doc = nlp(text6)

doc.spans["sc"] = [
    Span(doc, 1, 7, "subj"), 
    Span(doc, 11, 12, "obj"),
]

svg = displacy.serve(doc,style = "span", options = {"colors" : {"subj" :"#6c6c6c", "obj": "#6c6c6c"}})

#svg = displacy.render(nlp(text5),style = "dep", jupyter=True, minify = True)
# output_path = Path(r"C:\Users\svawe\Thesis_RelationExtraction_PoliticsNews\docs\generated_img\spans.svg")
# output_path.open("w", encoding="utf-8").write(svg)


Using the 'span' visualizer
Serving on http://0.0.0.0:5000 ...

Shutting down server on port 5000.


In [39]:
displacy.render(ents, style = "ent", manual = True, jupyter = True)

In [38]:
ents = {
    "text": "The Berlin School of Economics and Law is situated in Berlin Schöneberg",
    "ents": [{"start": 0, "end": 39, "label": "subj"},{"start": 54, "end": 71, "label": "obj"}],
    "title": None
}

In [28]:
" ".join([])

''

In [60]:
displacy.render(rels, style = "dep", manual = True, jupyter = True, options = {"offset_x":200, "distance":100})

In [59]:
rels = {
    "words": [
        {"text": "The Berlin School of Economics and Law", "tag": "SUBJ"},
        {"text": "is", "tag": " "},
        {"text": "situated", "tag": " "},
        {"text": "in", "tag": " "},
        {"text": "Berlin Schöneberg", "tag": "OBJ"}
    ],
    "arcs": [
        {"start": 0, "end": 4, "label": "situated_in", "dir": "right"},
    ]
}

In [9]:
doc = nlp(text5)
verbs = []
dict = {}


for possible_verb in doc:
    if possible_verb.pos == VERB:
        if neg in [child.dep for child in possible_verb.children]: continue
        else: 
            for possible_subject in possible_verb.children: 
                if possible_subject.dep == xcomp:   #subj / obj of composed verb should also be subj / obj of main verb
                    print("went to xcomp on ", possible_subject.text)
                    main_verb = possible_subject
                    main_idx = possible_subject.idx
                    print("new main verb is ", main_verb)
                    for token in doc.ents:
                        if token.label_ in ["GPE", "NORP", "EVENTS", "FAC", "LAW", "ORG", "PERSON"]:
                            if token.root.dep_ == "poss":
                                if token.root.head.head.idx == possible_verb.idx:
                                    verbs.append([main_idx, main_verb.lemma_, token.text, token.root.head.dep_])
                                    if main_idx in dict.keys(): dict[main_idx] += 1
                                    else: dict[main_idx] = 1
                            else:
                                if token.root.head.idx == possible_verb.idx:
                                    print(token.root.head.text)
                                    print(token.text)
                                    print(token.root.dep_)
                                    verbs.append([possible_verb.idx, possible_verb.lemma_, token.text, token.root.dep_])
                                    if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                                    else: dict[possible_verb.idx] = 1

            for token in doc.ents:
                if token.label_ in ["GPE", "NORP", "EVENTS", "FAC", "LAW", "ORG", "PERSON"]:
                    if token.root.dep_ == "poss":
                        if token.root.head.head.idx == possible_verb.idx:
                            verbs.append([possible_verb.idx, possible_verb.lemma_, token.text, token.root.head.dep_])
                            if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                            else: dict[possible_verb.idx] = 1
                    else:
                        if token.root.head.idx == possible_verb.idx:
                            verbs.append([possible_verb.idx, possible_verb.lemma_, token.text, token.root.dep_])
                            if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                            else: dict[possible_verb.idx] = 1

went to xcomp on  trying
new main verb is  trying
began
a Revolutionary Court
nsubj


In [26]:
doc = nlp(text5)
verbs = []
dict = {}


for possible_verb in doc:
    if possible_verb.pos == VERB:
        if neg in [child.dep for child in possible_verb.children]: continue
        else: 
            for possible_subject in possible_verb.children: 
                if possible_subject.dep == xcomp:   #subj / obj of composed verb should also be subj / obj of main verb
                    main_verb = possible_subject
                    main_idx = possible_subject.idx
                    
                    for chunk in doc.noun_chunks:
                        if chunk.root.dep_ == "poss":
                            if chunk.root.head.head.idx == possible_verb.idx:
                                verbs.append([main_idx, main_verb.lemma_, chunk.text, chunk.root.head.dep_])
                                if main_idx in dict.keys(): dict[main_idx] += 1
                                else: dict[main_idx] = 1
                        else:
                            if chunk.root.head.idx == possible_verb.idx:
                                verbs.append([main_idx, main_verb.lemma_, chunk.text, chunk.root.dep_])
                                if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                                else: dict[possible_verb.idx] = 1

            for chunk in doc.noun_chunks:       #for normal verbs, check chunks directly
    #                 if chunk.root.head.idx == possible_verb.idx:
                if chunk.root.head.dep_ == "poss":
                    if chunk.root.head.head.idx == possible_verb.idx:
                        verbs.append([possible_verb.idx, possible_verb.lemma_, chunk.text, chunk.root.head.dep_])
                        if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                        else: dict[possible_verb.idx] = 1
                else:
                    if chunk.root.head.idx == possible_verb.idx:
                        verbs.append([possible_verb.idx, possible_verb.lemma_, chunk.text, chunk.root.dep_])
                        if possible_verb.idx in dict.keys(): dict[possible_verb.idx] += 1
                        else: dict[possible_verb.idx] = 1

In [27]:
verbs

[[49, 'try', 'a Revolutionary Court', 'nsubj'],
 [43, 'begin', 'a Revolutionary Court', 'nsubj'],
 [49, 'try', 'five people', 'dobj'],
 [84, 'carry', 'that', 'nsubj'],
 [84, 'carry', 'the death penalty', 'dobj'],
 [132, 'report', 'state news agency Irna', 'nsubj']]

In [10]:
from typing import Union

In [16]:
Union(chunk_types[0], ["GPE", "NORP", "EVENTS", "FAC", "LAW", "ORG", "PERSON"])

TypeError: Cannot instantiate typing.Union

In [26]:
intersection = set(chunk_types[3]) & set(["GPE", "NORP", "EVENTS", "FAC", "LAW", "ORG", "PERSON"])
intersection == set()

True

In [28]:
doc = nlp(text5)
chunk_types = []
for chunk in doc.noun_chunks:
    if  set([word.ent_type_ for word in chunk]) & set(["GPE", "NORP", "EVENTS", "FAC", "LAW", "ORG", "PERSON"]) != set():
        print(chunk)

Olaf Scholz
Beijing
German chancellor
the German economy’s unsustainable dependence
China
Olaf Scholz
German executives


In [14]:
chunk_types

[['DATE', 'DATE'],
 ['PERSON', 'PERSON'],
 ['GPE'],
 ['', 'ORDINAL', ''],
 ['NORP', ''],
 ['', '', ''],
 ['', 'NORP', '', '', '', ''],
 ['GPE'],
 ['PERSON', 'PERSON'],
 ['', '', ''],
 ['NORP', '']]

In [30]:
text5 = "Germany takes part in Russia-Ukraine Conflict"
from spacy import displacy
svg = displacy.render(nlp(text5),style = "dep", jupyter=True, options = {"distance": 90}, minify = False)
for chunk in nlp(text5).noun_chunks:
    print(chunk.text, chunk.root.dep_, chunk.root.head.head.text, chunk.root.head.dep_)

#svg = displacy.render(nlp(text5),style = "dep", jupyter=True, minify = True)
# output_path = Path(r"C:\Users\svawe\Thesis_RelationExtraction_PoliticsNews\docs\generated_img\dep_tree.svg")
# output_path.open("w", encoding="utf-8").write(svg)

Germany nsubj takes ROOT
part dobj takes ROOT
Russia-Ukraine Conflict pobj takes prep


In [24]:
for chunk in nlp(text5).noun_chunks:
    print(chunk.text, chunk.root.dep_, chunk.root.head.dep_, chunk.root.head.head.text)

Monday pobj prep began
a Revolutionary Court nsubj ccomp reported
Tehran pobj prep Court
five people dobj xcomp began
charges pobj prep trying
that nsubj relcl charges
the death penalty dobj relcl charges
state news agency Irna nsubj ROOT reported


In [10]:
for token in doc.ents:
    print(token, token.label_)

Monday DATE
a Revolutionary Court ORG
Tehran GPE
five CARDINAL
Irna ORG
