In [None]:
import sys
sys.path.append(r"C:\Users\siyam\OneDrive\Desktop\Legal DS\project\venv\Lib\site-packages")
import spacy
import json
import random
import re
import pandas as pd
import numpy as np
from copy import deepcopy
from sklearn import model_selection
from spacy.tokenizer import Tokenizer
from spacy.lang.en import English
from spacy.symbols import ORTH
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import classification_report, confusion_matrix
from sklearn import tree
import matplotlib.pyplot as plt

## Some boilerplate code for nicer looking confusion matrices


abridged from [scikit-learn example code](https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html)

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes,
                          title=None,
                          cmap=plt.cm.Blues):
    cm = confusion_matrix(y_true, y_pred)
    fig, ax = plt.subplots(figsize=(8, 8))
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel='True label',
           xlabel='Predicted label')
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    return ax

## Some helper code to analyze TFIDF data

Credit for code goes to [buhrmann.github.io](https://buhrmann.github.io/tfidf-analysis.html)

In [None]:
def top_tfidf_features(row, features, top_n=15):
    ''' Get top n tfidf values in row and return them with their corresponding feature names.'''
    topn_ids = np.argsort(row)[::-1][:top_n]
    top_feats = [(features[i], row[i]) for i in topn_ids]
    df = pd.DataFrame(top_feats)
    df.columns = ['feature', 'tfidf']
    return df


def top_features_in_doc(Xtr, features, row_id, top_n=15):
    ''' Top tfidf features in specific document (matrix row) '''
    xtr_row = Xtr[row_id]
    if type(xtr_row) is not np.ndarray:
        xtr_row = xtr_row.toarray()
    row = np.squeeze(xtr_row)
    return top_tfidf_features(row, features, top_n)


def top_mean_features(Xtr, features, grp_ids=None, min_tfidf=0.1, top_n=25):
    ''' Return the top n features that on average are most important amongst documents in rows
        indentified by indices in grp_ids. '''
    if grp_ids:
        D = Xtr[grp_ids]
    else:
        D = Xtr
    if type(D) is not np.ndarray:
        D = D.toarray()
    D[D < min_tfidf] = 0
    tfidf_means = np.mean(D, axis=0)
    return top_tfidf_features(tfidf_means, features, top_n)


def top_features_by_class(Xtr, y, features, min_tfidf=0.1, top_n=25):
    ''' Return a list of dfs, where each df holds top_n features and their mean tfidf value
        calculated across documents with the same class label. '''
    dfs = {}
    labels = np.unique(y)
    for label in labels:
        ids = np.where(y==label)
        feats_df = top_mean_features(Xtr, features, ids, min_tfidf=min_tfidf, top_n=top_n)
        feats_df.label = label
        dfs[label] = feats_df
    return dfs


def span_top_tfidf(spans_txt, spans_tfidf, features, index):
    print('span text:\n'+spans_txt[index]+'\n')
    print(top_features_in_doc(spans_tfidf, features, index))

# Load Data

In [None]:
# corpus_fpath = '../annotation_data/ldsi_w21_final_annotations_v1.json'
corpus_fpath = "../Data/ldsi_w2021-20220221T223611Z-001/ldsi_w2021/ldsi_w21_curated_annotations_v2.json"
data = json.load(open(corpus_fpath))

In [None]:
data.keys()

Define some convenience shorthands and dictionaries:

In [None]:
annotations = data['annotations']
documents_by_id = {d['_id']: d for d in data['documents']}
types_by_id = {t['_id']: t for t in data['types']}
type_ids_by_name = {t['name']: t['_id'] for t in data['types']}
type_names_by_id = {t['_id']: t['name'] for t in data['types']}
doc_id_by_name = {d['name']: d['_id'] for d in data['documents']}
doc_name_by_id = {d['_id']: d['name'] for d in data['documents']}

# Data Structures

Examine data structures so we know how to work with them

In [None]:
annotations[0]

In [None]:
list(types_by_id.items())[0]

In [None]:
list(documents_by_id.items())[0]

# Very Basic Data Survey

In [None]:
len(documents_by_id)

In [None]:
len(annotations)

In [None]:
doc_lengths = [len(d['plainText']) for d in documents_by_id.values()]
plt.hist(doc_lengths, bins=50)
plt.show()

In [None]:
doc_num_annos = [len([a for a in annotations if a['document'] == doc_id])
                 for doc_id in documents_by_id]
doc_num_annos = [n for n in doc_num_annos if n > 0]
plt.hist(doc_num_annos, bins=100)
plt.show()

# Create Corpus

In [None]:
# get all sentences assuming every annotation is a sentence
def make_span_data(documents_by_id, types_by_id, annotations):
    span_data = []
    for a in annotations:
        start = a['start']
        end = a['end']
        document_txt = documents_by_id[a['document']]['plainText']
        atype = a['type']
        sd = {'txt': document_txt[start:end],
              'document': a['document'],
              'type': types_by_id[atype]['name'],
              'start': a['start'],
              'start_normalized': a['start'] / len(document_txt),
              'end': a['end']}
        span_data.append(sd)
    return span_data

In [None]:
spans = make_span_data(documents_by_id, types_by_id, annotations)
span_labels = [s['type'] for s in spans]

In [None]:
spans[0]

Sample train and test set while stratifying across types

In [None]:
train_spans, test_spans = model_selection.train_test_split(spans,
                                                           test_size=.2,
                                                           random_state=42,
                                                           stratify=span_labels)
train_spans_txt = [s['txt'] for s in train_spans]
test_spans_txt = [s['txt'] for s in test_spans]

In [None]:
print(len(train_spans))
print(len(test_spans))

In [None]:
train_spans[0]

In [None]:
train_spans_txt[0]

In [None]:
print(f'train: {len(train_spans)}; test: {len(test_spans)}')

In [None]:
random.choice(train_spans)

# Some Tough Examples

In [None]:
example_basic_1 = 'In sum, as the preponderance of the evidence is against the Veteran\'s claim, his appeal must be denied.'
example_cit_1 = 'Smith v. Gober, 14 Vet. App. 227 (2000), aff\'d 281 F.3d 1384 (Fed. Cir. 2002); Dela Cruz v. Principi, 15 Vet. App. 143 (2001); see also Quartuccio v. Principi, 16 Vet. App. 183 (2002).'
example_rule_1 = '"To establish a right to compensation for a present disability, a Veteran must show: "(1) the existence of a present disability; (2) in-service incurrence or aggravation of a disease or injury; and (3) a causal relationship between the present disability and the disease or injury incurred or aggravated during service"-the so-called "nexus" requirement."'
example_mixed_1 = 'In Dingess v. Nicholson, 19 Vet. App. 473 (2006), the U.S. Court of Appeals for Veterans Claims held that, upon receipt of an application for a service-connection claim, 38 U.S.C.A. � 5103(a) and 38 C.F.R. � 3.159(b) require VA to provide the claimant with notice that a disability rating and an effective date for the award of benefits will be assigned if service connection is awarded. '

# Manual Tokenization Example

In [None]:
def tokenize(txt):
    dirty_tokens = re.split(' +', txt)  # split words
    # remove all non-alphanumerics
    clean_tokens = [re.sub(r'\W', '', t).lower() 
                    for t in dirty_tokens]
    if '' in clean_tokens:  # remove empty tokens
        clean_tokens.remove('')
    return clean_tokens


def tokenize_spans(spans):
    for s in spans:
        s['tokens_manual'] = tokenize(s['txt'])
        
        
def build_vocabulary(spans):
    vocab_counts = {}
    for sd in spans:
        for t in tokenize(sd['txt']):
            if t in vocab_counts:
                vocab_counts[t] += 1
            else:
                vocab_counts[t] = 1
    return vocab_counts

We can use this basic tokenizer to do some surface statistics

In [None]:
tokenize(example_cit_1)

In [None]:
tokenize_spans(train_spans)

In [None]:
vocab_counts_manual = build_vocabulary(train_spans)
unique_tokens_manual = [token for token, count in vocab_counts_manual.items() 
                        if count == 1]

In [None]:
print(len(vocab_counts_manual))
print(vocab_counts_manual['veteran'])
print(len(unique_tokens_manual))

In [None]:
span_lengths_manual = [len(s['tokens_manual']) for s in train_spans]
plt.hist(span_lengths_manual, bins=30)
plt.show()

In [None]:
vocab_df = pd.DataFrame([{'token': t, 'count': c} for t, c in vocab_counts_manual.items()])
vocab_df = vocab_df.set_index(['token'])
vocab_df.sort_values('count', ascending=False)[:30].plot.bar()
plt.show()

In [None]:
min_freq = 6
max_freq = 100
feature_names_manual = sorted(token for token, count in vocab_counts_manual.items() 
                       if min_freq <= count <= max_freq)
print(f'number of thresholded with {min_freq} < n < {max_freq}: {len(feature_names_manual)}')

# Basic TFIDF vectorization with sklearn

We are using sklear's [TFIDF vectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html#sklearn.feature_extraction.text.TfidfVectorizer.fit) here, because it is faster than doing it all by ourselves and has useful default parameters

In [None]:
vectorizer = TfidfVectorizer(min_df=5)
vectorizer = vectorizer.fit(train_spans_txt)
tfidf_features_skl = vectorizer.get_feature_names()

In [None]:
train_tfidf_skl = vectorizer.transform(train_spans_txt).toarray()
test_tfidf_skl = vectorizer.transform(test_spans_txt).toarray()
train_spans_labels = np.array([s['type'] for s in train_spans])
test_spans_labels = np.array([s['type'] for s in test_spans])

... and we get numpy arrays

In [None]:
train_tfidf_skl.shape

Examine the top TFIDF values of tokens in a sentence

In [None]:
span_top_tfidf(train_spans_txt, 
               train_tfidf_skl,
               tfidf_features_skl,
               random.randint(0, len(train_spans)))

Examine features with highest average TFIDF score per class

In [None]:
dfs = top_features_by_class(train_tfidf_skl, 
                            train_spans_labels,
                            tfidf_features_skl)

In [None]:
dfs.keys()

In [None]:
dfs['Citation']

## Train & Evaluate Model

Models:  
[Gaussian Naive Bayes](https://scikit-learn.org/stable/modules/generated/sklearn.naive_bayes.GaussianNB.html)  
[Support Vector Machine Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html)  
[Decision Tree](https://scikit-learn.org/stable/modules/generated/sklearn.tree.DecisionTreeClassifier.html)  
[Random Forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html)

In [None]:
#clf = SVC(gamma='auto')
#clf = RandomForestClassifier(n_estimators=100, max_depth=12)
#clf = tree.DecisionTreeClassifier(max_depth=8)
clf_skl = GaussianNB()
clf_skl = clf_skl.fit(train_tfidf_skl, train_spans_labels)

In [None]:
print('TRAIN:\n'+classification_report(train_spans_labels, 
                                       clf_skl.predict(train_tfidf_skl)))
print('TEST:\n'+classification_report(test_spans_labels,
                                      clf_skl.predict(test_tfidf_skl)))

# Build Feature Vector using Spacy
More information: [Spacy](https://spacy.io)

## Tokenization

In [None]:
# basic English pipeline provided by spacy
nlp = spacy.load("en_core_web_sm")

More information on customizing tokenizers [at the Spacy docs](https://spacy.io/usage/linguistic-features#tokenization)  
Note that you may not alter the text, but you can use it to suppress or force tokenization.

In [None]:
nlp.tokenizer.add_special_case('Vet. App.', [{ORTH: 'Vet. App.'}])

In [None]:
#test_txt = example_basic_1
#test_txt = example_cit_1
test_txt = example_mixed_1
print(test_txt)
doc = nlp(test_txt)
print(f'{len(list(doc.sents))} sentences:')
print('===')
for sent in doc.sents:
    print(sent)
print('===')
for t in doc:
    print(f'{t.text} | {t.lemma_} | {t.pos_}')

In [None]:
def spacy_tokenize(txt):
    doc = nlp(txt)
    tokens = list(doc)
    clean_tokens = []
    for t in tokens:
        if t.pos_ == 'PUNCT':
            pass
        elif t.pos_ == 'NUM':
            clean_tokens.append(f'<NUM{len(t)}>')
        else:
            clean_tokens.append(t.lemma_)
    return clean_tokens

def spans_add_spacy_tokens(spans):
    for s in spans:
        s['tokens_spacy'] = spacy_tokenize(s['txt'])

In [None]:
train_spans[0]

In [None]:
# will take a moment
spans_add_spacy_tokens(train_spans)
spans_add_spacy_tokens(test_spans)

In [None]:
random.choice(train_spans)

## Vectorization

In [None]:
# suboptimal: tokenizer gets called twice
spacy_tfidf_vectorizer = TfidfVectorizer(tokenizer=spacy_tokenize,
                                         min_df=3,
                                         ngram_range=(1,1))
spacy_tfidf_vectorizer = spacy_tfidf_vectorizer.fit(train_spans_txt)
tfidf_features_spacy = spacy_tfidf_vectorizer.get_feature_names()

### Small walkthrough of how to extend/compose feature vectors

In [None]:
# numpy feature vector extension
train_tfidf_spacy = spacy_tfidf_vectorizer.transform(train_spans_txt).toarray()
print(train_tfidf_spacy.shape)
train_starts_normalized = np.array([s['start_normalized'] for s in train_spans])
print(train_starts_normalized.shape)
print(np.expand_dims(train_starts_normalized, axis=1).shape)
ext = np.concatenate((train_tfidf_spacy, 
                      np.expand_dims(train_starts_normalized, axis=1)), axis=1)
print(ext.shape)

### Examine highest average TFIDF features by class

In [None]:
dfs = top_features_by_class(train_tfidf_spacy, train_spans_labels, tfidf_features_spacy)

In [None]:
dfs['Citation']

In [None]:
def make_feature_vectors_and_labels(spans, vectorizer):
    # function takes long to execute
    # note: we un-sparse the matrix here to be able to manipulate it
    tfidf = spacy_tfidf_vectorizer.transform([s['txt'] for s in spans]).toarray()
    starts_normalized = np.array([s['start_normalized'] for s in spans])
    num_tokens = np.array([len(s['tokens_spacy']) for s in spans])
    y = np.array([s['type'] for s in spans])
    X = np.concatenate((tfidf, np.expand_dims(starts_normalized, axis=1)), axis=1)
    return X, y

In [None]:
train_X, train_y = make_feature_vectors_and_labels(train_spans, spacy_tfidf_vectorizer)
test_X, test_y = make_feature_vectors_and_labels(test_spans, spacy_tfidf_vectorizer)

In [None]:
print(f'{train_X.shape} {train_y.shape}')
print(f'{test_X.shape} {test_y.shape}')

In [None]:
#clf = GaussianNB()
clf = tree.DecisionTreeClassifier(max_depth=12)
clf = clf.fit(train_X, train_y)

In [None]:
print('TRAIN:\n'+classification_report(train_spans_labels, clf.predict(train_X)))
print('TEST:\n'+classification_report(test_spans_labels, clf.predict(test_X)))

In [None]:
plot_confusion_matrix(test_spans_labels, clf.predict(test_X), classes=list(clf.classes_),
                      title='Confusion matrix for training data')
plt.show()

In [None]:
type_names_by_id

In [None]:
def prediction_errors(clf, eval_spans, vectorizer, 
                      select_true_label=None, 
                      select_pred_label=None):
    eval_X, eval_y = make_feature_vectors_and_labels(eval_spans, vectorizer)
    eval_spans_txt = [s['txt'] for s in eval_spans]
    eval_spans_labels = [s['type'] for s in eval_spans]
    pred_y = clf.predict(eval_X)
    for i in range(len(eval_spans)):
        true_label = eval_spans_labels[i]
        pred_label = pred_y[i]
        if true_label != pred_label:
            if select_true_label and true_label != select_true_label: continue
            if select_pred_label and pred_label != select_pred_label: continue
            doc_name = documents_by_id[eval_spans[i]['document']]['name']
            print('sentence # '+str(i)+' / case '+doc_name+' / @'+str(eval_spans[i]['start']))
            print('pred: '+pred_label+' / true: '+true_label)
            print(eval_spans[i]['txt'])
            print()

In [None]:
prediction_errors(clf,
                  random.sample(train_spans, 100),
                  spacy_tfidf_vectorizer,
                  select_pred_label='Evidence')