In [None]:
import numpy as np
import pandas as pd
import json
from pathlib import Path
import re
import spacy

### load variable mapping

In [None]:
with Path("variable_mapping.json").open("r") as f:
    mapping = json.load(f, parse_int=True)

In [None]:
# convert the encoded values in the mapping to integers since they get read in as strings
for c in mapping.keys():
    mapping[c] = {int(k): v for k, v in mapping[c].items()}

### load primary data

In [None]:
df = pd.read_csv(
    "corrected_narrative_primary2.csv",
    # set columns that can be null to nullable ints
    dtype={"body_part_2": "Int64", "diagnosis_2": "Int64"},
)
df.head()

### replace numeric values with corresponding strings

In [None]:
decoded_df = df.copy()

for col in mapping.keys():
    decoded_df[col] = decoded_df[col].map(mapping[col])

In [None]:
decoded_df.head(3)

In [None]:
# ensure mappings were applied correctly by checking that the number of missing values did not change
assert (decoded_df.isnull().sum() == df.isnull().sum()).all()

In [None]:
decoded_df.to_csv("decoded_primary_data3.csv")

In [None]:
random_sample = decoded_df.sample(n=5000, replace=False, random_state=42)

In [None]:
random_sample_sub = random_sample.iloc[:,0:2]

In [None]:
random_sample_sub.head()

# NLP

In [None]:
print(spacy.__version__)

# My Model

Corrected data

In [None]:
fall_in = pd.read_csv('double_corrected_narrative_primary3.csv')

In [None]:
fall_in.head()

In [None]:
len(fall_in['key_entry'].unique())

In [None]:
fall = fall_in.iloc[:,1:5]
fall.tail(3)

In [None]:
fall.drop_duplicates(inplace=True)
len(fall)

In [None]:
fall_nas = fall[fall['span'].isna()]
fall_nas.head()

In [None]:
len(fall_nas)

In [None]:
all_labels = fall.label.unique()
all_labels

In [None]:
# subset to only the labels you want to build model with
# note: this chunk of code will eliminate a lot of useful texts without these labels
#    it would be good to modify it so that it doesn't
labels_to_use = ['SO', 'WT', 'SF', 'TRS', 'STR', 'SU', 'CHR', 'OBJ', 'SHW', 'RCH', 'LAD', 'WF']
fall_sub = fall[fall['label'].isin(labels_to_use)]
len(fall_sub)

In [None]:
# add back in the NA rows
fall_all = pd.concat((fall_sub, fall_nas))
fall_all.tail(3)

In [None]:
fall_all_random = fall_all.sample(n=len(fall_all), replace=False, random_state=99)
fall_all_random.tail(3)

In [None]:
key_list = fall_all_random.key_entry.unique()
len(key_list)

In [None]:
fall_all_random[fall_all_random.key_entry == key_list[0]]

In [None]:
nlp = spacy.blank("en")
# special_case = [{Token.ORTH: "CLOSED-HEAD"}]
# nlp.tokenizer.add_special_case("CLOSED-HEAD", special_case)

from spacy.tokens import Span

# keeping span token lengths to appropriately set config
token_lengths = []
label_list = []

docs=[] # this will hold the processed strings and spans
for k in key_list:
    print(k)
    doc = nlp(k)
    temp_df = fall_all_random[fall_all_random.key_entry == k]

    if len(temp_df)==1:    
        if pd.isna(temp_df.iloc[0,2]):
            doc.spans["sc"] = []
            docs.append(doc)
        else:     
            span_text = temp_df.iloc[0,1]
            temp_label = temp_df.iloc[0,2]       
            span_start_char = k.find(span_text)
            span_end_char = span_start_char + len(span_text)
    
            # Finding the start and end tokens using character offsets
            start_token = None
            end_token = None
            for token in doc:
                if token.idx == span_start_char:
                     start_token = token.i
                if token.idx + len(token.text) == span_end_char:
                    end_token = token.i
                    break
            if start_token is not None and end_token is not None:
                temp_start = start_token
                temp_end = end_token + 1
                doc.spans["sc"] = [Span(doc, temp_start, temp_end, temp_label)]
                docs.append(doc)
                token_lengths.append(temp_end - temp_start)
                label_list.append(temp_label)
            else:
                print(k, "span=", span_text,"couldn't find tokens")
    else:
        print("temp_df has length > 1")
        span_list = []
        for ent in range(len(temp_df)):
            span_text = temp_df.iloc[ent,1]
            temp_label = temp_df.iloc[ent,2]
            span_start_char = k.find(span_text)
            span_end_char = span_start_char + len(span_text)
            print(span_start_char, span_end_char)

            # Finding the start and end tokens using character offsets
            start_token = None
            end_token = None
            for token in doc:
                if token.idx == span_start_char:
                     start_token = token.i
                if token.idx + len(token.text) == span_end_char:
                    end_token = token.i
                    break
            if start_token is not None and end_token is not None:
                temp_start = start_token
                temp_end = end_token + 1             
                span_list.append(Span(doc, temp_start, temp_end, temp_label))
                token_lengths.append(temp_end - temp_start)
                label_list.append(temp_label)
            else:
                print(k, "span=",span_text, "couldn't find tokens")
        
        doc.spans["sc"] = span_list
    docs.append(doc)

In [None]:
len(docs)

In [None]:
for doc in docs[0:5]:
    print(doc, doc.spans)

In [None]:
len(token_lengths)

In [None]:
print(np.min(token_lengths), np.max(token_lengths), np.median(token_lengths))

In [None]:
np.quantile(token_lengths, q =[0.05,0.95])

In [None]:
docs[800]

In [None]:
docs[799]

Make training and test sets with the docs

In [None]:
from spacy.tokens import DocBin

In [None]:
doc_bin = DocBin(docs=docs[0:800])
doc_bin.to_disk("./train_231001_1117.spacy")

doc_bin = DocBin(docs=docs[800:])
doc_bin.to_disk("./dev_231001_1117.spacy")

python -m spacy init config ./config.cfg --lang en --pipeline spancat

## Using model

In [None]:
nlp_spancat = spacy.load("outputs_231001_1117/model-best")

In [None]:
nlp_spancat.pipeline

In [None]:
test_text = random_sample_sub.iloc[2102,1]
doc = nlp_spancat(test_text)
doc.text

In [None]:
doc.spans

In [None]:
for span in doc.spans["sc"]:
    print(span.label_, span.start, span.end, span.text)

In [None]:
random_sample_sub.head()

**Test model on a subset of the samples**

In [None]:
random_sample_sub2 = random_sample_sub.iloc[2000:3500,]

# Create an empty DataFrame with column names
output_df = pd.DataFrame(columns=['text', 'span_label', 'span_text'])

for text in random_sample_sub2['narrative']:
    doc = nlp_spancat(text)
    
    if len(doc.spans["sc"]) == 0:
        df2 = pd.DataFrame([[text, "NA", "NA"]], columns=['text', 'span_label', 'span_text'])
        # Append the new row to the DataFrame
        output_df = pd.concat([output_df, df2])
    else:
        for span in doc.spans["sc"]:
        # Create a new row as a dictionary
        
            df2 = pd.DataFrame([[text, span.label_, span.text]], columns=['text', 'span_label', 'span_text'])
            # Append the new row to the DataFrame
            output_df = pd.concat([output_df, df2])

In [None]:
output_df.to_csv("predictions_strikes.csv")

## Run model on all samples

In [None]:
# Create an empty DataFrame with column names
output_df = pd.DataFrame(columns=['cpsc_case_number','text', 'span_label', 'span_text'])

for row in decoded_df.iloc[:,0:2].iterrows():
    cpsc = (row[1]['cpsc_case_number'])
    text = (row[1]['narrative'])
    doc = nlp_spancat(text)

    
    if len(doc.spans["sc"]) == 0:
        df2 = pd.DataFrame([[cpsc,text, "NA", "NA"]], columns=['cpsc_case_number','text', 'span_text', 'span_label'])
        # Append the new row to the DataFrame
        output_df = pd.concat([output_df, df2])
    else:
        for span in doc.spans["sc"]:
        # Create a new row as a dictionary
        
            df2 = pd.DataFrame([[cpsc,text, span.label_, span.text]], columns=['cpsc_case_number','text', 'span_label', 'span_text'])
            # Append the new row to the DataFrame
            output_df = pd.concat([output_df, df2])

In [None]:
output_df.head()

In [None]:
len(output_df)

In [None]:
output_df.to_csv("predictions_full_set.csv")

In [None]:
# Pivot the DataFrame
def tuple_aggregator(series):
    return tuple(series)

pivot_output_df = output_df.pivot_table(index=['cpsc_case_number', 'text'], columns='span_label', values='span_text', aggfunc=tuple_aggregator).reset_index()
pivot_output_df.columns.name = None 
pivot_output_df.head()

In [None]:
# Make a copy of the original DataFrame
binary_df = pivot_output_df.copy()

# Update the DataFrame to have 0 for NaN and 1 for actual values
# We're excluding the first two columns ('A' and 'B') as they are index columns
binary_df.iloc[:, 2:] = binary_df.iloc[:, 2:].map(lambda x: 0 if x == 'NA' or pd.isna(x) else 1)
binary_df.head()


In [None]:
combo_df = pd.merge(binary_df, decoded_df, how="left", on="cpsc_case_number")

Gather rows for specific strike types

In [None]:
struck_object = combo_df[(combo_df['SO']==1) & (combo_df['SF']!=1) & (combo_df['SU']!=1)]
struck_floor = combo_df[(combo_df['SO']!=1) & (combo_df['SF']==1) & (combo_df['SU']!=1)]
struck_unknown = combo_df[(combo_df['SO']!=1) & (combo_df['SF']!=1) & (combo_df['SU']==1)]

In [None]:
print(len(struck_floor), len(struck_object), len(struck_unknown))

Look at frequencies of dispositon and diagnosis by strike type

In [None]:
struck_floor['disposition'].value_counts(normalize=True)

In [None]:
struck_unknown['disposition'].value_counts(normalize=True)

In [None]:
struck_object['diagnosis'].value_counts()

In [None]:
struck_floor['diagnosis'].value_counts()

In [None]:
struck_unknown['diagnosis'].value_counts()