In [None]:
import spacy
import json
import random
import re
import pandas as pd
import numpy as np

from copy import deepcopy
from sklearn import model_selection
from spacy.tokenizer import Tokenizer
from spacy.lang.en import English
from spacy.symbols import ORTH
from spacy.symbols import NORM
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.model_selection import RandomizedSearchCV
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report, confusion_matrix
from sklearn import tree
import matplotlib.pyplot as plt
import random
import plotly.express as px
import sys 
import fasttext
sys.path.append('./luima_sbd')
import luima_sbd.sbd_utils as luima
from spacy.language import Language
random.seed(42)

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes,file_name,
                          title=None,
                          cmap=plt.cm.Blues):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots(figsize=(8, 8))
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel='True label',
           xlabel='Predicted label')
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    fig.savefig(f"./figures/for_report/{file_name}.jpg")
    return ax

In [None]:
def top_tfidf_features(row, features, top_n=15):
    ''' Get top n tfidf values in row and return them with their corresponding feature names.'''
    topn_ids = np.argsort(row)[::-1][:top_n]
    top_feats = [(features[i], row[i]) for i in topn_ids]
    df = pd.DataFrame(top_feats)
    df.columns = ['feature', 'tfidf']
    return df


def top_features_in_doc(Xtr, features, row_id, top_n=15):
    ''' Top tfidf features in specific document (matrix row) '''
    xtr_row = Xtr[row_id]
    if type(xtr_row) is not np.ndarray:
        xtr_row = xtr_row.toarray()
    row = np.squeeze(xtr_row)
    return top_tfidf_features(row, features, top_n)


def top_mean_features(Xtr, features, grp_ids=None, min_tfidf=0.1, top_n=25):
    ''' Return the top n features that on average are most important amongst documents in rows
        indentified by indices in grp_ids. '''
    if grp_ids:
        D = Xtr[grp_ids]
    else:
        D = Xtr
    if type(D) is not np.ndarray:
        D = D.toarray()
    D[D < min_tfidf] = 0
    tfidf_means = np.mean(D, axis=0)
    return top_tfidf_features(tfidf_means, features, top_n)


def top_features_by_class(Xtr, y, features, min_tfidf=0.1, top_n=25):
    ''' Return a list of dfs, where each df holds top_n features and their mean tfidf value
        calculated across documents with the same class label. '''
    dfs = {}
    labels = np.unique(y)
    for label in labels:
        ids = np.where(y==label)
        feats_df = top_mean_features(Xtr, features, ids, min_tfidf=min_tfidf, top_n=top_n)
        feats_df.label = label
        dfs[label] = feats_df
    return dfs


def span_top_tfidf(spans_txt, spans_tfidf, features, index):
    print('span text:\n'+spans_txt[index]+'\n')
    print(top_features_in_doc(spans_tfidf, features, index))

In [None]:
corpus_fpath = './ldsi_s2021/ldsi_bva_sentence_corpus_v1.json'
data = json.load(open(corpus_fpath))
affirmed = open('./ldsi_s2021/affirmed_ids.txt', 'r').read().split("\n")
denied= open('./ldsi_s2021/denied_ids.txt', 'r').read().split("\n")
remanded = open('./ldsi_s2021/remanded_ids.txt', 'r').read().split("\n")
# print(len(affirmed), len(denied), len(remanded))
annotations = data['annotations']
documents_by_id = {d['_id']: d for d in data['documents']}
types_by_id = {t['_id']: t for t in data['types']}
type_ids_by_name = {t['name']: t['_id'] for t in data['types']}
type_names_by_id = {t['_id']: t['name'] for t in data['types']}
doc_id_by_name = {d['name']: d['_id'] for d in data['documents']}
doc_name_by_id = {d['_id']: d['name'] for d in data['documents']}

def make_span_data(documents_by_id, types_by_id, annotations):
    span_data = []
    for a in annotations:
        start = a['start']
        end = a['end']
        document_txt = documents_by_id[a['document']]['plainText']
        atype = a['type']
        document_name=documents_by_id[a['document']]['name']
        if document_name in affirmed:
            decision='affirmed'
        elif document_name in denied:
            decision='denied'
        elif document_name in remanded:
            decision='remanded'
        sd = {'txt': document_txt[start:end],
              'document': a['document'],
              'type': types_by_id[atype]['name'],
              'start': a['start'],
              'start_normalized': a['start'] / len(document_txt),
              'end': a['end'],
              'name': document_name,
              'decisions': decision}
        span_data.append(sd)
    return span_data

spans = make_span_data(documents_by_id, types_by_id, annotations)
span_labels = [s['type'] for s in spans]
span_decisions = [s['decisions'] for s in spans]

In [None]:
random.seed(42)
aff=random.sample(affirmed, 6)
den=random.sample(denied, 6)
rem=random.sample(remanded, 6)
test_affirm, dev_affirm = aff[0:3], aff[3:6] 
test_denied, dev_denied = den[0:3], den[3:6] 
test_remanded, dev_remanded = rem[0:3], rem[3:6] 

test_ids = test_affirm+test_denied+test_remanded
dev_ids = dev_affirm+dev_denied+dev_remanded

test_spans=[]
dev_spans=[]
train_spans=[]
for s in spans:
    if s['name'] in test_ids:
        test_spans.append(s)
    elif s['name'] in dev_ids:
        dev_spans.append(s)
    else:
        train_spans.append(s)
        
unique_files=pd.DataFrame(train_spans).name.unique()

In [None]:
train_spans_txt = [s['txt'] for s in train_spans]
test_spans_txt = [s['txt'] for s in test_spans]
dev_spans_txt = [s['txt'] for s in dev_spans]

In [None]:
model = fasttext.load_model("result/wordEmbeddingsModel.bin")

In [None]:
model.get_nearest_neighbors("veteran")

In [None]:
nlp = spacy.load("en_core_web_sm")

nlp.tokenizer.add_special_case('Vet. App.', [{ORTH: 'Vet. App.'}])
nlp.tokenizer.add_special_case('Fed. Cir.', [{ORTH: 'Fed. Cir.'}])

def spacy_tokenize(txt):
    nlp.disable_pipes('parser')
    doc = nlp.pipe(txt, n_process=4)
    doc = nlp(txt)
    tokens = list(doc)
    clean_tokens = []
    for i in range(len(tokens)):
        t=tokens[i]
        t1=tokens[i]
#         print(t.pos_, t.text)
        if(i!=len(tokens)-1):
            t1=tokens[i+1]
        if(t1!=t and t1.pos_=='PART' and re.search(r'\'', t1.text)):
            scrap = t.text+t1.text
            scrap = re.sub(r'\W','',scrap).lower()
            clean_tokens.append(scrap)
            i=i+1           
        elif t.pos_ == 'PUNCT':
            pass
        elif t.text in ('Vet. App.','Fed. Cir.'):
            lem=t.lemma_
            lem=lem.lower()
            clean_tokens.append(lem)
        elif (t.text[0].isalpha()==False and t.is_digit==False):
            if(t.is_upper==False):
                pass
            else:
                lem=t.lemma_
                lem=lem.lower()
                clean_tokens.append(lem)            
        elif t.pos_ == 'NUM':
            clean_tokens.append(f'<NUM{len(t)}>')
        else:
            lem=t.lemma_
            lem = re.sub(r'\W','',lem)
            lem=lem.lower()
            clean_tokens.append(lem)
    return clean_tokens

In [None]:
def spans_add_spacy_tokens(spans):
    for s in spans:
        tokens = spacy_tokenize(s['txt'])
        s['tokens_spacy'] = tokens
        s['tokens_number'] = len(tokens)

        

    
spans_add_spacy_tokens(train_spans)
spans_add_spacy_tokens(test_spans)
spans_add_spacy_tokens(dev_spans)

In [None]:
def add_word_vec(spans):
    for s in spans:
        final_vector= []
        sum_vec= np.zeros(100)
        if(len(s["tokens_spacy"])!=0):
            for word in s['tokens_spacy']:
                w_vec = model.get_word_vector(word)
                sum_vec=np.add(w_vec,sum_vec)
            final_vector=sum_vec/s['tokens_number']
            s['word_vec']=final_vector
        else:
            s['word_vec']=np.zeros(100)

add_word_vec(train_spans)
add_word_vec(dev_spans)
add_word_vec(test_spans)

In [None]:
df=pd.DataFrame(train_spans)
df[df.tokens_number==0]

In [None]:
train_mean=df.tokens_number.mean()
train_std=df.tokens_number.std()
print(train_mean, train_std)

In [None]:
spacy_tfidf_vectorizer = TfidfVectorizer(tokenizer=spacy_tokenize,
                                         min_df=3,
                                         ngram_range=(1,1))
spacy_tfidf_vectorizer = spacy_tfidf_vectorizer.fit(train_spans_txt)

tfidf_features_spacy = spacy_tfidf_vectorizer.get_feature_names()

In [None]:
# file_name="spacy_tfidf_vectorizer"
# dump(spacy_tfidf_vectorizer, f'{file_name}.joblib')

In [None]:
train_tfidf_spacy = spacy_tfidf_vectorizer.transform(train_spans_txt).toarray()
test_tfidf_spacy = spacy_tfidf_vectorizer.transform(test_spans_txt).toarray()
dev_tfidf_spacy = spacy_tfidf_vectorizer.transform(dev_spans_txt).toarray()
train_spans_labels = np.array([s['type'] for s in train_spans])
test_spans_labels = np.array([s['type'] for s in test_spans])
dev_spans_labels = np.array([s['type'] for s in dev_spans])


In [None]:
span_top_tfidf(train_spans_txt, 
               train_tfidf_spacy,
               tfidf_features_spacy,
               random.randint(0, len(train_spans)))

In [None]:
dfs = top_features_by_class(train_tfidf_spacy, 
                            train_spans_labels,
                            tfidf_features_spacy)
dfs

In [None]:
# TFIDF BLOCK
def make_feature_vectors_and_labels(spans, vectorizer):
    # function takes long to execute
    # note: we un-sparse the matrix here to be able to manipulate it
    tfidf = spacy_tfidf_vectorizer.transform([s['txt'] for s in spans]).toarray()
    starts_normalized = np.array([s['start_normalized'] for s in spans])
    num_tokens_norm = np.array([((s['tokens_number']-train_mean)/train_std) for s in spans])
#     word_embd = np.array([s['word_vec'] for s in spans])
    y = np.array([s['type'] for s in spans])
    print(tfidf.shape, starts_normalized.shape, num_tokens_norm.shape)
#     , word_embd.shape
    X = np.concatenate((tfidf, np.expand_dims(starts_normalized, axis=1), np.expand_dims(num_tokens_norm, axis=1)), axis=1)
    return X, y

# word_embd

In [None]:
# Word_EMBD BLOCK
def make_feature_vectors_and_labels(spans, vectorizer):
    # function takes long to execute
    # note: we un-sparse the matrix here to be able to manipulate it
#     tfidf = spacy_tfidf_vectorizer.transform([s['txt'] for s in spans]).toarray()
    starts_normalized = np.array([s['start_normalized'] for s in spans])
    num_tokens_norm = np.array([((s['tokens_number']-train_mean)/train_std) for s in spans])
    word_embd = np.array([s['word_vec'] for s in spans])
    y = np.array([s['type'] for s in spans])
    print(starts_normalized.shape, num_tokens_norm.shape, word_embd.shape)
#     tfidf.shape
    X = np.concatenate((word_embd, np.expand_dims(starts_normalized, axis=1), np.expand_dims(num_tokens_norm, axis=1)), axis=1)
    return X, y
# tfidf

In [None]:
%time train_X, train_y = make_feature_vectors_and_labels(train_spans, spacy_tfidf_vectorizer)
dev_X, dev_y = make_feature_vectors_and_labels(dev_spans, spacy_tfidf_vectorizer)
test_X, test_y = make_feature_vectors_and_labels(test_spans, spacy_tfidf_vectorizer)

In [None]:
print(f'{train_X.shape} {train_y.shape}')
print(f'{dev_X.shape} {dev_y.shape}')
print(f'{test_X.shape} {test_y.shape}')

# Linear Support Vector Machine Classifier (Linear Model)

In [None]:
from sklearn.svm import LinearSVC

param_distributions = {'n_estimators': np.random.randint(1, 5),
                       'max_depth': np.random.randint(5, 10)}
model_ident="Linear Support Vector Machine"
clf = LinearSVC(random_state=0, tol=1e-4)
%time clf = clf.fit(train_X, train_y)
print(f'{model_ident}\nTRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print(f'{model_ident}\nDEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

# Logistic Regression

In [None]:
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0)
clf = clf.fit(train_X, train_y)
model_ident="Logistic Regression"
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

# Radial kernel SVM

In [None]:
from sklearn.svm import SVC
clf = SVC(kernel='rbf', random_state=0)
%time clf = clf.fit(train_X, train_y)
model_ident="Radial Kernet SVM"
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

# Polynomial kernel SVM

In [None]:
clf = SVC(kernel='sigmoid', random_state=0)
clf = clf.fit(train_X, train_y)
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
model_ident="Sigmoid Kernel SVM"
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

# Decision Trees

In [None]:
clf = tree.DecisionTreeClassifier(max_depth=13, random_state=0)
%time clf = clf.fit(train_X, train_y)
model_ident = "Decision Trees"
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()
clf.get_params()

# Random Forests

In [None]:
from sklearn.ensemble import RandomForestClassifier
from joblib import dump, load
file_name="random_forest"
clf = RandomForestClassifier(max_depth=10, random_state=0, n_estimators=20)
%time clf = clf.fit(train_X, train_y)
model_ident = "Random Forests"
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

# dump(clf, f'{file_name}.joblib')

# Best Model

In [None]:
# best model TFIDF
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0)
clf = clf.fit(train_X, train_y)
model_ident="Logistic Regression on\nTFIDF Featurizatin"
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
print('TEST:\n'+classification_report(test_spans_labels, clf.predict(test_X)))
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()

In [None]:
# best model Word_Embd
clf = SVC(kernel='rbf', random_state=0)
clf = clf.fit(train_X, train_y)
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('DEV:\n'+classification_report(dev_spans_labels, clf.predict(dev_X)))
print('TEST:\n'+classification_report(test_spans_labels, clf.predict(test_X)))
model_ident="Radial Kernel SVM on\n Word Embedding Featurization"
plot_confusion_matrix(dev_spans_labels, clf.predict(dev_X), classes=list(clf.classes_),file_name=model_ident,
                      title=f'Confusion matrix for {model_ident}')
plt.show()


In [None]:
def prediction_errors(clf, eval_spans, vectorizer, 
                      select_true_label=None, 
                      select_pred_label=None):
    eval_X, eval_y = make_feature_vectors_and_labels(eval_spans, vectorizer)
    eval_spans_txt = [s['txt'] for s in eval_spans]
    eval_spans_labels = [s['type'] for s in eval_spans]
    pred_y = clf.predict(eval_X)
    for i in range(len(eval_spans)):
        true_label = eval_spans_labels[i]
        pred_label = pred_y[i]
        if true_label != pred_label:
            if select_true_label and true_label != select_true_label: continue
            if select_pred_label and pred_label != select_pred_label: continue
            doc_name = documents_by_id[eval_spans[i]['document']]['name']
            print('sentence # '+str(i)+' / case '+doc_name+' / @'+str(eval_spans[i]['start']))
            print('pred: '+pred_label+' / true: '+true_label)
            print(eval_spans[i]['txt'])
            print()
    
    

In [None]:
model_file="Radial_Kernel_SVM_WordEmb"
dump(clf, f'{model_file}.joblib')

In [None]:
prediction_errors(clf,
                  random.sample(train_spans, 100),
                  spacy_tfidf_vectorizer,
                  select_true_label='PolicyBasedReasoning')

In [None]:
prediction_errors(clf,
                  random.sample(train_spans, 3000),
                  spacy_tfidf_vectorizer,
                  select_pred_label='LegalRule')

In [None]:
prediction_errors(clf,
                  random.sample(train_spans, 5000),
                  spacy_tfidf_vectorizer,
                  select_pred_label='LegalPolicy')

In [None]:
prediction_errors(clf,
                  random.sample(train_spans, 5000),
                  spacy_tfidf_vectorizer,
                  select_pred_label='RemandInstructions')