In [1]:
import json
from lemminflect import getAllInflections
import nltk
from nltk.corpus import framenet as fn
import numpy as np
import os
import pandas as pd
import pickle as pkl
import re
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.tree import DecisionTreeClassifier
import spacy
from ipywidgets.widgets.widget_int import IntProgress

nltk.download("framenet_v17")

[nltk_data] Downloading package framenet_v17 to
[nltk_data]     /Users/ryanschaefer/nltk_data...
[nltk_data]   Package framenet_v17 is already up-to-date!


True

In [22]:
class LexicalUnitClassifier:
    def __init__(self, reset_framenet = False, pretrained = True, model_directory = "../models/lexical_units"):
        self.lu_data = None
        self.models = None
        self.rules = None
        self.load_framenet(reset_framenet, model_directory)
            
        if pretrained:
            self.load_trained_models(model_directory)
    
    # Load relevant framenet data
    def load_framenet(self, reset = False, directory = "../models/lexical_units"):
        self.nlp = spacy.load("en_core_web_sm")
        
        # Load framenet from file is specified and file exists
        if not reset:
            filename = os.path.join(directory, "framenet.json")
            if os.path.isfile(filename):
                self.lu_data = json.load(open(filename))
                return
            else:
                print("Framenet file not found in directory `{}`. Resetting framenet...".format(directory))
                
        # Get all lexical units
        lexical_units = fn.lus()
        lu_names = list(set(map(lambda x: x.name, lexical_units)))
        self.lu_data = { key: { "frames": [] } for key in lu_names }
        
        # Progress bar
        progress = IntProgress(0, 0, len(lexical_units))
        display(progress)
        
        # Iterate through all lexical units
        for lu in lexical_units:
            name = lu.name
            # Add lexemes if not already defined
            if "lexemes" not in self.lu_data[name].keys():
                self.lu_data[name]["lexemes"] = {
                    "lemmas": list(map(lambda x: x["name"].lower(), lu.lexemes)),
                    "consecutive": all(list(map(lambda x: x["breakBefore"] == "false", lu.lexemes)))
                }
                # Add words in () or [] to lexemes
                if "(" in name or "[" in name:
                    # Extract substring in brackets
                    tmp_name = name.replace("[", "(").replace("]", ")")
                    substr = re.findall(r'\(.*?\)', tmp_name)[0]
                    # Get all lemmas in tokenized substring
                    lemmas = []
                    for token in self.nlp(substr):
                        lemma = token.lemma_.lower()
                        if lemma not in ["(", ")"]:
                            # Get all inflections of each lemma
                            lemmas.append("/".join(list(set([ x for vals in getAllInflections(lemma).values() for x in vals ] + [ lemma ]))))
                    # If lu contains a /, save words as the same lexeme
                    if "/" in lemmas:
                        index = lemmas.index("/")
                        lemmas[index - 1] = "{}/{}".format(lemmas[index - 1], lemmas[index + 1])
                        lemmas.pop(index)
                        lemmas.pop(index)
                    # Add lemmas to beginning or end based on where the close bracket is
                    if tmp_name.index(".") - tmp_name.index(")") == 1:
                        self.lu_data[name]["lexemes"]["lemmas"] = self.lu_data[name]["lexemes"]["lemmas"] + lemmas
                    else:
                        self.lu_data[name]["lexemes"]["lemmas"] = lemmas + self.lu_data[name]["lexemes"]["lemmas"]
                    
            curr = {
                "name": lu.frame.name,
                "sentences": []
            }
            # Iterate through all sentences that include lu
            for sentence in lu.exemplars:
                curr2 = {
                    "text": sentence.text,
                    "fe": []
                }
                
                # Extract target and frame elements from sentence
                targetFound = False
                for aset in sentence.annotationSet:
                    for layer in aset.layer:
                        if layer.name == "Target":
                            if len(layer.label) > 0:
                                label = layer.label[0]
                                curr2["start"] = label["start"]
                                curr2["end"] = label["end"]
                                targetFound = True
                        elif layer.name == "FE":
                            for label in layer.label:
                                if "start" in label.keys():
                                    curr2["fe"].append({
                                        "name": label["name"],
                                        "start": label["start"],
                                        "end": label["end"]
                                    })
                # Add sentence if a target was found
                if targetFound:
                    curr["sentences"].append(curr2)
            # Store sentences
            self.lu_data[name]["frames"].append(curr)
            progress.value += 1
            
        # Save framenet data to file
        filename = os.path.join(directory, "framenet.json")
        json.dump(self.lu_data, open(filename, "w"), indent = 4)
                
    # Load models and probabilities from files in a directory
    def load_trained_models(self, directory = "../models/lexical_units"):
        self.models = {}
        self.rules = json.load(open("{}/rules.json".format(directory)))
        for filename in os.listdir(directory):
            comps = filename.split(".")
            if comps[1] == "pkl":
                self.models[comps[0].replace("_", ".")] = pkl.load(open(os.path.join(directory, filename), "rb"))
            
    def pos_tag(self, sentence, doc = None):
        # Create spacy parser if one has not already been created
        if doc is None:
            doc = self.nlp(sentence)
            
        # Translations from spacy POS tags to framenet suffixes
        pos_mapping = {
            'ADJ': 'a', # Adjective
            'ADV': 'adv', # Adverb
            'INTJ': 'intj', # Interjection
            'NOUN': 'n', # Noun
            'PROPN': 'n', # Proper noun
            'VERB': 'v', # Verb
            'ADP': 'prep', # Adposition (preposition and postposition)
            'AUX': 'v', # Auxiliary verb
            'CONJ': 'c', # Conjunction
            'CCONJ': 'c', # Coordinating conjunction
            'SCONJ': 'scon', # Subordinating conjunction
            'DET': 'art', # Determiner (article)
            'NUM': 'num', # Numeral
            'PART': 'part', # Particle
            'PRON': 'pron' # Pronoun
        }
        
        # Convert all spacy tags to framenet suffixes in the text
        results = []
        for token in doc:
            if token.pos_ in pos_mapping.keys():
                results.append((token.lemma_.lower(), pos_mapping[token.pos_]))
                
        return results
    
    # Get the features of an annotated sentence for training
    def annotated_features(self, sentence):
        doc = self.nlp(sentence["text"])
        
        if "start" in sentence.keys():
            in_lu = False
            tokens = []
            # Get target tokens
            for token in doc:
                if not in_lu and token.idx == sentence["start"]:
                    in_lu = True
                    tokens.append(token)
                elif in_lu and token.idx > sentence["end"]:
                    break
            
            # Return features of target if found or None
            if len(tokens) == 0:
                features = [None, None, None]
            else:
                features = [
                    tokens[0].i / len(doc), tokens[0].dep_, tokens[0].head.lemma_
                ]
            
            return features
        else:
            return self.prediction_features(sentence["text"], sentence["tokens"], doc)
    
    # Get the features of an unannotated sentence for prediction
    def prediction_features(self, sentence, tokens, doc = None):
        # Return None if no tokens given
        if len(tokens) == 0:
            return [None, None, None]
        
        # Parse sentence with spacy
        if doc == None:
            doc = self.nlp(sentence)
        
        # Return features of tokens
        for token in doc:
            if token.lemma_.lower() == tokens[0]:
                return [token.i / len(doc), token.dep_, token.head.lemma_]
                
        raise Exception("Tokens '{}' not found in sentence '{}'".format(tokens, sentence))
    
    # Determine all of the lus that are in a sentence
    def find_lus(self, sentence, doc = None):
        # Get POS tags
        token_pos = self.pos_tag(sentence, doc)
        # Iterate through all lus
        possible_lus = []
        for lu, values in self.lu_data.items():
            # Get lu POS tag to match with token tags
            lu_pos = lu.split(".")[-1]
            # Check for multiple consecutive lexemes in the sentence
            if values["lexemes"]["consecutive"] and len(values["lexemes"]["lemmas"]) > 1:
                index = 0
                possible_pos = []
                tokens = []
                # Iterate through tokens
                for lemma, pos in token_pos:
                    # If token is the next lexeme, increment index
                    if any(lemma == w for w in values["lexemes"]["lemmas"][index].split("/")):
                        index += 1
                        possible_pos.append(pos)
                        tokens.append(lemma)
                        # Stop if this was the last lexeme
                        if index == len(values["lexemes"]["lemmas"]):
                            break
                    # If first lexeme was found and this was not the next lexeme, stop
                    elif index > 0:
                        break
                # If all lexemes were found, add lu
                if index == len(values["lexemes"]["lemmas"]) and lu_pos in possible_pos:
                    # Check if all lemmas in lu already in a lu (avoid duplicates)
                    found = False
                    for _, prev_tokens in possible_lus:
                        if all(token in prev_tokens for token in tokens):
                            found = True
                            break
                    if not found:
                        possible_lus.append((lu, tokens))
            # Check for 1 or more nonconsecutive lexemes in the sentence
            else:
                matchCount = 0
                possible_pos = []
                tokens = []
                # Iterate through lexemes and tokens
                for word in values["lexemes"]["lemmas"]:
                    for lemma, pos in token_pos:
                        # If lexeme is in the sentence, increment count and move on to next lexeme
                        if any(w == lemma for w in word.split("/")):
                            matchCount += 1
                            possible_pos.append(pos)
                            tokens.append(lemma)
                            break
                # If all lexemes were found, add lu
                if matchCount == len(values["lexemes"]["lemmas"]) and lu_pos in possible_pos:
                    # Check if all lemmas in lu already in a lu (avoid duplicates)
                    found = False
                    for _, prev_tokens in possible_lus:
                        if all(token in prev_tokens for token in tokens):
                            found = True
                            break
                    if not found:
                        possible_lus.append((lu, tokens))
        return possible_lus

    # Predict which of a lexical unit's possible frames is used in a given sentence
    def predict_frame(self, sentence, lu, tokens, doc = None):
        # If probabilities are defined, predict based on probabilities
        if lu in self.rules.keys():
            probs = self.rules[lu]
            pred_frame = np.random.choice(list(probs.keys()), p = list(probs.values()))
        # If decision tree is defined, predict with tree
        elif lu in self.models.keys():
            features = self.prediction_features(sentence, tokens, doc)
            if None in features:
                return None
            X = pd.DataFrame([features], columns = ["location", "relation", "head"])
            pred_frame = self.models[lu].predict(X)[0]
        else:
            return None
        
        return pred_frame
    
    # Train models and probabilities on framenet example sentences
    def fit(self, output_dir = "../models/lexical_units", random_state = None):
        # Delete old pkl files in output_dir
        for file in os.listdir(output_dir):
            if file.endswith(".pkl"):
                os.remove(os.path.join(output_dir, file))
            
        self.rules = {}
        self.models = {}
        progress = IntProgress(0, 0, len(self.lu_data))
        display(progress)
        # Iterate through lus
        for lu, values in self.lu_data.items():
            # If lu has more than 1 frame, we need to create probabilities or a model
            frames = values["frames"]
            if len(frames) > 1:
                # Get all example sentences and the number of sentences per frame
                sentences = list(map(lambda x: x["sentences"], frames))
                frame_counts = np.array(list(map(len, sentences)))
                
                # If there is a frame with fewer than 10 example sentences, we do not have enough data to train a model
                if min(frame_counts) < 10:
                    # If there are no example sentences, use a uniform distribution
                    if frame_counts.sum() == 0:
                        frame_counts = np.full(len(frame_counts), 1 / len(frame_counts))
                    # Use proportion of available sentences as probabilities of each frame
                    else:
                        frame_counts = frame_counts / frame_counts.sum()
                    self.rules[lu] = { frames[i]["name"]: prob for i, prob in enumerate(frame_counts)}
                # If all frames have at least 10 example sentences, we can train a decision tree to predict the frame
                else:
                    # Store sentences and frame labels to train on
                    data = {
                        "sentences": [],
                        "labels": []
                    }
                    for frame in frames:
                        for sentence in frame["sentences"]:
                            data["sentences"].append(sentence)
                            data["labels"].append(frame["name"])
                    # Extract features from annotated example sentences
                    features = list(map(lambda x: self.annotated_features(x), data["sentences"]))
                    X = pd.DataFrame(features, columns = ["location", "relation", "head"])
                
                    # Pipeline to one-hot-encode categorical features
                    cat_pipeline = Pipeline([
                        ("ohe", OneHotEncoder(handle_unknown = "ignore"))
                    ])
                    col_transformer = ColumnTransformer([
                        ("cat", cat_pipeline, ["relation", "head"])
                    ])
                    pipeline = Pipeline([
                        ("preprocessing", col_transformer),
                        ("model", DecisionTreeClassifier(random_state = random_state))
                    ])
                    
                    # Fit decision tree and store in pickle file
                    pipeline.fit(X, data["labels"])
                    pkl.dump(pipeline, open("{}/{}.pkl".format(output_dir, lu.replace(".", "_")), "wb"))
                    self.models[lu] = pipeline
            progress.value += 1
                    
        # Save probabilities to json file
        json.dump(self.rules, open("{}/rules.json".format(output_dir), "w"), indent = 4)
    
    def predict(self, sentences, model_dir = None):
        # Load models if not already loaded
        if not hasattr(self, "models") or self.models is None:
            if model_dir is None:
                self.load_trained_models()
            else:
                self.load_trained_models(model_dir)
                
        predictions = []
        # Iterate through sentences to predict
        for sentence in sentences:
            # Parse sentence with spacy
            doc = self.nlp(sentence)
            # Identify all lexical units in this sentence
            possible_lus = self.find_lus(sentence, doc)
            curr = []
            # Iterate through lexical units
            for lu, tokens in possible_lus:
                # Get the possible frames for each lexical unit
                possible_frames = self.lu_data[lu]["frames"]
                # If there is only one frame, assign it to the sentence
                if len(possible_frames) == 1:
                    curr.append((lu, possible_frames[0]["name"]))
                # If there is more than one frame, predict which one to use
                else:
                    frame = self.predict_frame(sentence, lu, tokens, doc)
                    if frame is not None:
                        curr.append((lu, frame))
            # Store predicted frames with the target lexical units for this sentence
            predictions.append(curr)
            
        return predictions

In [23]:
model = LexicalUnitClassifier(True)

IntProgress(value=0, max=13572)

In [24]:
model.fit()

IntProgress(value=0, max=10462)

In [25]:
df = pd.read_csv("../datasets/lexical_unit_sentences.csv")

display(df.info())
display(df.describe())
df

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 204022 entries, 0 to 204021
Data columns (total 4 columns):
 #   Column          Non-Null Count   Dtype 
---  ------          --------------   ----- 
 0   Lexical Unit    204022 non-null  object
 1   Frame           204022 non-null  object
 2   Sentence        200751 non-null  object
 3   Sentence Count  204022 non-null  int64 
dtypes: int64(1), object(3)
memory usage: 6.2+ MB


None

Unnamed: 0,Sentence Count
count,204022.0
mean,47.642269
std,52.379088
min,0.0
25%,19.0
50%,33.0
75%,59.0
max,547.0


Unnamed: 0,Lexical Unit,Frame,Sentence,Sentence Count
0,(can't) help.v,Self_control,"` Not if I can help it . """,11
1,(can't) help.v,Self_control,"And now she took a better look at him , Folly ...",11
2,(can't) help.v,Self_control,"` I could n't help feeling that … well , in yo...",11
3,(can't) help.v,Self_control,"Yet , looking into those liquid dark eyes , Fr...",11
4,(can't) help.v,Self_control,She could n't help the tinge of pink that floo...,11
...,...,...,...,...
204017,zone.n,Locale,Dubai 10-28 ( FP ) - Dubai 's Crown Prince She...,32
204018,zone.n,Locale,A Turbo Cat ferry makes a one - hour trip ( 7 ...,32
204019,zone.n,Locale,"Macau , now the Chinese Special Economic Zone ...",32
204020,zonk out.v,Fall_asleep,,0


In [26]:
sentences = [df["Sentence"][1]]
predicted_frames = model.predict(sentences)

for i in range(len(sentences)):
    print(sentences[i])
    display(predicted_frames[i])
    print()

And now she took a better look at him , Folly could n't help noticing the strong , muscular lines of the broad back under that white shirt .


[("(can't) help.v", 'Self_control'),
 ('take.v', 'Removing'),
 ('muscular.a', 'Body_description_holistic'),
 ('line.n', 'Roadways'),
 ('strong.a', 'Judgment_of_intensity'),
 ('now.adv', 'Temporal_collocation'),
 ('at.prep', 'Spatial_co-location'),
 ('broad.a', 'Dimension'),
 ('notice.v', 'Becoming_aware'),
 ('look.n', 'Perception_active'),
 ('under.prep', 'Non-gradable_proximity'),
 ('shirt.n', 'Clothing'),
 ('of.prep', 'Partitive'),
 ('white.a', 'Color')]




In [27]:
sentences = [df[df["Lexical Unit"] == "yelp.v"].reset_index(drop = True)["Sentence"][0]]
predicted_frames = model.predict(sentences)

for i in range(len(sentences)):
    print(sentences[i])
    display(predicted_frames[i])
    print()

Face red , chest puffed with indignation , young John would yelp : ` I assure you quite categorically that I never touched the ball . 


[('never.adv', 'Negation'),
 ('young.a', 'Age'),
 ('would.v', 'Likelihood'),
 ('face.v', 'Confronting_problem'),
 ('touch.v', 'Impact'),
 ('assure.v', 'Telling'),
 ('with.prep', 'Accompaniment'),
 ('chest.n', 'Body_parts'),
 ('ball.n', 'Shapes'),
 ('yelp.v', 'Communication_noise'),
 ('puff.v', 'Ingest_substance')]




In [None]:
df[df["Lexical Unit"] == "yelp.v"]

Unnamed: 0,Lexical Unit,Frame,Sentence,Sentence Count
203663,yelp.v,Communication_noise,"Face red , chest puffed with indignation , you...",11
203664,yelp.v,Communication_noise,"He leaned backwards , digging in his heels , y...",11
203665,yelp.v,Communication_noise,"` Ouch , "" she yelped .",11
203666,yelp.v,Communication_noise,"` Christ , it 's one them white buggers , "" ye...",11
203667,yelp.v,Communication_noise,"` Christ , he got rid of the blanket , "" yelpe...",11
203668,yelp.v,Communication_noise,"` You 've got to be kidding ! "" yelped Margare...",11
203669,yelp.v,Communication_noise,"` Yeah ! "" yelps Paul , furiously .",11
203670,yelp.v,Communication_noise,"` It is not , "" yelped Auguste , wishing Egber...",11
203671,yelp.v,Communication_noise,"Have you prepared them ? "" yelped Auguste .",11
203672,yelp.v,Communication_noise,"` I did n't mean to do that , "" she yelped Yan...",11
