In [None]:
import numpy as np
import spacy
import sklearn
import nltk
import math
import pandas as pd
from collections import defaultdict
from nltk.corpus import stopwords
from nltk import word_tokenize
from nltk.util import ngrams
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from string import punctuation
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


True

In [None]:
srp_df = pd.read_csv('/content/SPR QA Training Data - Sheet1 (1).csv', header=None)
srp_df.columns = ['q', 'a']

In [None]:
srp_df.head()

Unnamed: 0,q,a
0,How many trees are on Swanton Pacific Ranch?,More than a thousand but less than 10 million....
1,How many trees were there before the fires in ...,More than a thousand but less than 10 million....
2,What is the most common tree species at SPR?,Coast live oak
3,How old is the oldest tree at SPR?,"1200 year stem; 5,000 year old roots"
4,What biome/ecosystem/ecotype covers the greate...,Coastal


In [None]:
srp_utterances = srp_df['q'].tolist()

In [None]:
small_talk = [
"How are you?",
"How is the weather?",
"Have you listened to the new Drake album?",
"What kind of food do you like?",
"Why is the sky blue?",
"To be or not to be?",
"Any suggestions for a good restaurant?"
]

### Wordnet Similarity Scoring

#### Normalization using Wordnet

In [None]:
def lemmatize_ngram(ngram):
    lemmatized = []
    for n in ngram:
        lemmatized.append(wnl.lemmatize(n))
    return tuple(lemmatized)

def lemmatize_wn_synset(ngram):
    wn_synsets = []
    for n in ngram:
        synset = wn.synsets(n)
        if not synset:
            wn_synsets.append(n)
            continue
        wn_synsets.append(synset[0].name())
    return tuple(wn_synsets)

def lemmatize_wn_hypernum(ngram):
    wn_synsets = []
    for n in ngram:
        synset = wn.synsets(n)
        if not synset:
            wn_synsets.append(n)
            continue
        if not synset[0].hypernyms():
            wn_synsets.append(synset[0].name())
            continue
        wn_synsets.append(synset[0].hypernyms()[0].name())
    return tuple(wn_synsets)

def make_ngram_list(text, max_ngram=4):
    ngram_list = []
    for n in range(1, max_ngram+1):
        ngram_list.append(list(ngrams(text, n)))
    return ngram_list

def score_ngrams(ngram_list_1, ngram_list_2, return_seen, idf_weight):
    ngram_set_list2 = []
    for l in ngram_list_2:
        ngram_set_list2.append(set(l))
    score = 0.0
    seen = set()
    for idx,n_list in enumerate(ngram_list_1):
        for ngram in n_list:
            if ngram in ngram_set_list2[idx] and ngram not in seen:
                #print(ngram)
                idf_w = idf_dict[ngram]
                score += (idx+1)**2 * idf_dict[ngram]
            seen.add(ngram)
    if return_seen:
        return float(score), seen
    return float(score), None

In [None]:
stopwords_en = set(stopwords.words('english'))
punct_set = set(punctuation)
remove_set = stopwords_en.union(punct_set)
wnl = WordNetLemmatizer()

In [None]:
def preprocess(text):
  return [token.lower() for token in word_tokenize(text) if token.lower() not in remove_set]

def get_ngram_features(preprocessed_text):
  ngram_lists = make_ngram_list(preprocessed_text, max_ngram=3)
  normed_ngram_features = defaultdict(list)
  for normalization in [lemmatize_ngram, lemmatize_wn_synset, lemmatize_wn_hypernum]:
    for ngram_list in ngram_lists:
      normed_ngrams = [normalization(ngram) for ngram in ngram_list]
      normed_ngram_features[normalization.__name__].append(normed_ngrams)
  return normed_ngram_features

In [None]:
def get_all_features(text_list):
  normed_ngram_features = defaultdict(list)
  for text in text_list:
    #print(text)
    ngram_lists = make_ngram_list(preprocess(text), max_ngram=3)
    for normalization in [lemmatize_ngram, lemmatize_wn_synset, lemmatize_wn_hypernum]:
      for ngram_list in ngram_lists:
        #print(ngram_list)
        normed_ngrams = [normalization(ngram) for ngram in ngram_list]
        normed_ngram_features[normalization.__name__].extend(normed_ngrams)
  print(normed_ngram_features)
  return normed_ngram_features

#### Calculate IDF weights for scoring

In [None]:
idf_dict = defaultdict(list)
N = len(srp_utterances)
print(N)
for idx, sent in enumerate(srp_utterances):
    ngram_norm_dict = {}
    ngram_lists = make_ngram_list(preprocess(sent), max_ngram=3)
    for normalization in [lemmatize_ngram, lemmatize_wn_synset, lemmatize_wn_hypernum]:
        normalized_ngrams = []
        for n_list in ngram_lists:
            normalized_ngrams.append([normalization(ngram) for ngram in n_list])
        ngram_norm_dict[normalization.__name__] = normalized_ngrams
    for norm, ngram_lists in ngram_norm_dict.items():
        for ngram_list in ngram_lists:
            for ngram in ngram_list:
                if idx in idf_dict[ngram]:
                    continue
                idf_dict[ngram].append(idx)
          
for key, value in idf_dict.items():
    idf_dict[key] = math.log(17.0/float(len(value)), 10)

52


In [None]:
def score_ngrams(ngram_list_1, ngram_list_2, return_seen=False):
    ngram_set_list2 = []
    for l in ngram_list_2:
        ngram_set_list2.append(set(l))
    score = 0.0
    seen = set()
    seen_match = set()
    for idx,n_list in enumerate(ngram_list_1):
        #print(n_list)
        for ngram in n_list:
            if ngram in ngram_set_list2[idx] and ngram not in seen:
                #print(ngram)
                idf_w = idf_dict[ngram]
                score += (idx+1)**2 * idf_dict[ngram]
                seen_match.add(ngram)
            seen.add(ngram)
    if return_seen:
        return float(score), seen_match
    return float(score), None

In [None]:
### Process training data

In [None]:
features = get_all_features(srp_utterances)

defaultdict(<class 'list'>, {'lemmatize_ngram': [('many',), ('tree',), ('swanton',), ('pacific',), ('ranch',), ('many', 'tree'), ('tree', 'swanton'), ('swanton', 'pacific'), ('pacific', 'ranch'), ('many', 'tree', 'swanton'), ('tree', 'swanton', 'pacific'), ('swanton', 'pacific', 'ranch'), ('many',), ('tree',), ('fire',), ('2020',), ('many', 'tree'), ('tree', 'fire'), ('fire', '2020'), ('many', 'tree', 'fire'), ('tree', 'fire', '2020'), ('common',), ('tree',), ('specie',), ('spr',), ('common', 'tree'), ('tree', 'specie'), ('specie', 'spr'), ('common', 'tree', 'specie'), ('tree', 'specie', 'spr'), ('old',), ('oldest',), ('tree',), ('spr',), ('old', 'oldest'), ('oldest', 'tree'), ('tree', 'spr'), ('old', 'oldest', 'tree'), ('oldest', 'tree', 'spr'), ('biome/ecosystem/ecotype',), ('cover',), ('greatest',), ('area',), ('spr',), ('biome/ecosystem/ecotype', 'cover'), ('cover', 'greatest'), ('greatest', 'area'), ('area', 'spr'), ('biome/ecosystem/ecotype', 'cover', 'greatest'), ('cover', 'grea

In [None]:
features.keys()

dict_keys(['lemmatize_ngram', 'lemmatize_wn_synset', 'lemmatize_wn_hypernum'])

In [None]:
len(features['lemmatize_wn_synset'][1])

1

In [None]:
feature_list = []
uno_g = []
two_g = []
three_g = []
for norm, items in features.items():
  for ngram in items:
    if len(ngram)==1:
      uno_g.append(ngram)
    elif len(ngram)==2:
      two_g.append(ngram)
    elif len(ngram)==3:
      three_g.append(ngram)
  
all_features = [uno_g, two_g, three_g]

In [None]:
### Test WordNet Approach
test_utters = ['What kind of tree can be found here?','What does SPR stand for?', 'What is your name?']#, 'How are you today?', 'What does SPR stand for?']

# test_ngram = get_ngram_features(preprocess(test_utter[1]))

# feature_list = []
# uno_g = []
# two_g = []
# three_g = []
# for norm, items in test_ngram.items():
#   uno_g.extend(items[0])
#   two_g.extend(items[1])
#   three_g.extend(items[2])
# test_features = [uno_g, two_g, three_g]
# test_features

In [None]:
print(score_ngrams(test_features, all_features, return_seen=True))

(0.6283889300503115, {('spr', 'base.n.08'), ('spr',), ('base.n.08',), ('stand',), ('support.n.10',), ('spr', 'stand'), ('spr', 'support.n.10')})


In [None]:
def test(test_utter):
  for i in test_utter:
    print("User utterance: " +str(i))
    test_ngram = get_ngram_features(preprocess(i))
    feature_list = []
    uno_g = []
    two_g = []
    three_g = []
    for norm, items in test_ngram.items():
      uno_g.extend(items[0])
      two_g.extend(items[1])
      three_g.extend(items[2])
    test_features = [uno_g, two_g, three_g]
    
    score, seen = score_ngrams(test_features, all_features, return_seen=True)
    print('Score:' + str(score))
    print('Score >= 0.5: ' + str(score>=0.5))
    print('Seen:')
    print(list(seen))
    #print(score, seen)

In [None]:
test_utters = ['What kind of trees can be found here?', 'What does SPR stand for?',  'Do you enjoy cake?', 'Are there birches around?','What is your name?']

In [None]:
test(test_utters)

User utterance: What kind of trees can be found here?
Score:3.1156157115292085
Score >= 0.5: True
Seen:
[('woody_plant.n.01',), ('kind.n.01',), ('tree',), ('tree.n.01',)]
User utterance: What does SPR stand for?
Score:0.6283889300503115
Score >= 0.5: True
Seen:
[('spr',)]
User utterance: Do you enjoy cake?
Score:0.0
Score >= 0.5: False
Seen:
[]
User utterance: Are there birches around?
Score:1.2304489213782739
Score >= 0.5: True
Seen:
[('wood.n.01',)]
User utterance: What is your name?


RuntimeError: ignored

In [None]:
all_features

[[('many',),
  ('tree',),
  ('swanton',),
  ('pacific',),
  ('ranch',),
  ('many.a.01',),
  ('tree.n.01',),
  ('swanton',),
  ('pacific.n.01',),
  ('ranch.n.01',),
  ('many.a.01',),
  ('woody_plant.n.01',),
  ('swanton',),
  ('pacific.n.01',),
  ('farm.n.01',)],
 [('many', 'tree'),
  ('tree', 'swanton'),
  ('swanton', 'pacific'),
  ('pacific', 'ranch'),
  ('many.a.01', 'tree.n.01'),
  ('tree.n.01', 'swanton'),
  ('swanton', 'pacific.n.01'),
  ('pacific.n.01', 'ranch.n.01'),
  ('many.a.01', 'woody_plant.n.01'),
  ('woody_plant.n.01', 'swanton'),
  ('swanton', 'pacific.n.01'),
  ('pacific.n.01', 'farm.n.01')],
 [('many', 'tree', 'swanton'),
  ('tree', 'swanton', 'pacific'),
  ('swanton', 'pacific', 'ranch'),
  ('many.a.01', 'tree.n.01', 'swanton'),
  ('tree.n.01', 'swanton', 'pacific.n.01'),
  ('swanton', 'pacific.n.01', 'ranch.n.01'),
  ('many.a.01', 'woody_plant.n.01', 'swanton'),
  ('woody_plant.n.01', 'swanton', 'pacific.n.01'),
  ('swanton', 'pacific.n.01', 'farm.n.01')]]

## SBERT Classifier

In [None]:
!pip install spacy_sentence_bert

In [None]:
import spacy_sentence_bert
nlp_sbert = spacy_sentence_bert.load_model('en_stsb_distilbert_base')

##### Perform cosine sim on each sentence in training data

In [None]:
srp_embedded_sentences = []
for sent in srp_utterances:
  srp_embedded_sentences.append(nlp_sbert(sent).vector)
srp_embedded_sentences_np = np.array(srp_embedded_sentences)

In [None]:
small_talk_embed = nlp_sbert(small_talk[0]).vector
for sent in srp_embedded_sentences:
  print(cosine_similarity(sent.reshape(1, -1), small_talk_embed.reshape(1, -1)))

[[0.11206175]]
[[-0.01854086]]
[[0.05036506]]
[[0.15270287]]
[[0.13122937]]
[[0.2334921]]
[[0.42959803]]


##### Or average vectors from training data before using cos similarity

In [None]:
srp_embedded_sentences_np.shape

(7, 768)

In [None]:
cosine_similarity(np.mean(srp_embedded_sentences_np, axis=0).reshape(1,-1), small_talk_embed.reshape(1,-1))

array([[0.23025683]], dtype=float32)