In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [367]:
#Hyperparameters 

threat_path = '.../nuclear_threat.csv'

by_column = "text" #choose a column on which the data will be trained: "text", "text-medium", "text-short"
side = 'w' # w -- West, r -- Russia
assume = ('W', 'E') #must be a tuple of (label_to_change, suggested_label_instead). "W" -- Warning, "E" -- Escalatory, "D" -- De-escalatory

to_expand = True #if True -- trained sentence-wise
n_epochs = 8 #no. of epochs that model will be trained on
test_on = by_column

#Expand Data

In [368]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string

# Download necessary NLTK resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

def preprocess_text(text):
    # Tokenize the text into words
    tokens = word_tokenize(text.lower())
    
    # Remove punctuation
    tokens = [token for token in tokens if token not in string.punctuation]
    
    # Remove stop words
    stop_words = set(stopwords.words('english')).difference({"not", "no"})
    tokens = [token for token in tokens if token not in stop_words]
    
    # Lemmatize the tokens
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(token) for token in tokens]
    
    return " ".join(tokens)

# Example usage
text = "Hello, how not are you doing today? I hope everything is going well!"
preprocessed_text = preprocess_text(text)
print(preprocessed_text)


hello not today hope everything going well


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [369]:
def expand(data, type, side=None, binary_assumption=None, sent_level = to_expand):
  expanded_data = pd.DataFrame(columns=data.columns)

  if sent_level:
    #if sent_level is True, then we will split the text in a given cell
    #into sentences and split one row into multiple rows of corresponding values
    
    for index, row in data.iterrows():
        text = row[type]
        label = row["label"]
        
        # Split the text into sentences
        sentences = text.split(".")
        
        # Create a new row for each sentence
        for sentence in sentences:
            if sentence.strip() != "":
                sentence = preprocess_text(sentence)
                expanded_data = pd.concat([expanded_data, pd.DataFrame({**row, type: sentence.strip()}, index=[0])], ignore_index=True)
  else:
    expanded_data = data

  if side:
    expanded_data = expanded_data[expanded_data['side'] == side]
  if binary_assumption:
    prev_label, new_label = binary_assumption[0], binary_assumption[1]
    expanded_data.loc[expanded_data['label'] == prev_label, 'label'] = new_label
  return expanded_data

# Text Classification with SpaCy

In [370]:
from tkinter.constants import N
import pandas as pd

nuclear_threat = pd.read_csv(threat_path)
nuclear_threat = expand(nuclear_threat, by_column, side, binary_assumption=assume, sent_level=to_expand)
len(nuclear_threat)

3482

In [371]:
import pandas as pd
from sklearn.model_selection import train_test_split

# Splitting the DataFrame into train and test sets
nuclear_threat_train, nuclear_threat_test = train_test_split(nuclear_threat, test_size=0.1, random_state=40)

print("Train DataFrame:")
print(len(nuclear_threat_train))
print("\nTest DataFrame:")
print(len(nuclear_threat_test))
nuclear_threat_test

Train DataFrame:
3133

Test DataFrame:
349


Unnamed: 0,date,text,text-medium,text-short,side,label,comment
6696,2022-08-16,mil/about/fact-sheets/article/454593/air-force...,The US military apparently returned to its nor...,US test launch of the Minuteman III ICBM.\n,w,E,
2854,2022-03-11,thing ’ veto pen,"On the same day, Biden also stressed that whil...","Biden: NATO will be defended, but a direct co...",w,D,
4501,2022-04-19,not exaggeration,"On April 19, Pentagon spokesperson Kirby furth...",Kirby: Washington is monitoring Russian rheto...,w,D,
3049,2022-03-16,combined massive support ukrainian armed force...,In the press conference following the NATO def...,NATO decides not to send peacekeeping troops t...,w,D,
3005,2022-03-16,marilu lucrezio rai tv comment italy 's role c...,In the press conference following the NATO def...,NATO decides not to send peacekeeping troops t...,w,D,
...,...,...,...,...,...,...,...
6460,2022-07-20,” u military ’ responsibility “ provide presid...,"Against this background, US military officials...",Brown: If Putin uses nuclear weapons in Ukrai...,w,D,video; copied from medium
3953,2022-04-01,"even lower end hypersonic range – 3,800 mile p...","Nevertheless, the US military took no chances ...",Pentagon cancels Minuteman III test.\n,w,D,
3607,2022-03-23,president no ’ not thanking,"Echoing earlier statements from US officials, ...",Stoltenberg: „Any use of nuclear weapons will ...,w,E,
5390,2022-04-27,,"In an April 27 briefing, Pentagon spokesperson...",Kirby: United States do not want “to see Russi...,w,E,


# Building a Bag of Words model


In [372]:
import spacy
# Create an empty model
nlp = spacy.blank("en")

In [373]:
if assume:
  #binary classification
  # Add the TextCategorizer to the empty model
  textcat = nlp.add_pipe("textcat")
  # Add labels to text classifier
  textcat.add_label("E")
  textcat.add_label("D")
else:
  #multiple labels
  textcat = nlp.add_pipe("textcat_multilabel")
  textcat.add_label("E")
  textcat.add_label("D")
  textcat.add_label("W")

# Training a Text Categorizer Model

In [374]:
train_texts = nuclear_threat_train[by_column].values
if assume:
  train_labels = [{'cats': {'E': label == 'E',
                            'D': label == 'D'}} 
                  for label in nuclear_threat_train['label']]
else:
  train_labels = [{'cats': {'E': label == 'E',
                          'D': label == 'D', 'W':label == 'W'}} 
                for label in nuclear_threat_train['label']]

Then we combine the texts and labels into a single list.

In [375]:
train_data = list(zip(train_texts, train_labels))
train_data[:10]

[("'re seeing today roughly consistent saw yesterday",
  {'cats': {'E': False, 'D': True}}),
 ('“ understandable concern ” polish proposal',
  {'cats': {'E': False, 'D': True}}),
 ("'s lot plate", {'cats': {'E': False, 'D': True}}),
 ('think forgiven wanting make sure get stuff use quickly effectively field',
  {'cats': {'E': False, 'D': True}}),
 ('condemn sham “ trial ” resulting application death sentence member ukrainian armed force',
  {'cats': {'E': True, 'D': False}}),
 ("think 's also important add n't lot rhetoric following initial salvo early day",
  {'cats': {'E': False, 'D': True}}),
 ("clearly n't appear interested 're still fighting donbas south",
  {'cats': {'E': True, 'D': False}}),
 ('welcome effort increase cooperation investigation taking place ukraine intensify police judicial cooperation ukraine well relevant organisation',
  {'cats': {'E': True, 'D': False}}),
 ('russian influence start working neighbour country moldova georgia also impact western balkan',
  {'cat

In [376]:
from spacy.util import minibatch
from spacy.training.example import Example

spacy.util.fix_random_seed(1)
optimizer = nlp.begin_training()

# Create the batch generator with batch size = 8
batches = minibatch(train_data, size=8)
# Iterate through minibatches
for batch in batches:
    # Each batch is a list of (text, label) 
    for text, labels in batch:
        doc = nlp.make_doc(text)
        example = Example.from_dict(doc, labels)
        nlp.update([example], sgd=optimizer)

This is just one training loop (or epoch) through the data. The model will typically need multiple epochs. Use another loop for more epochs, and optionally re-shuffle the training data at the begining of each loop. 

In [377]:
import random

random.seed(1)
spacy.util.fix_random_seed(1)
optimizer = nlp.begin_training()

losses = {}
for epoch in range(n_epochs):
    random.shuffle(train_data)
    # Create the batch generator with batch size = 8
    batches = minibatch(train_data, size=8)
    # Iterate through minibatches
    for batch in batches:
        for text, labels in batch:
            doc = nlp.make_doc(text)
            example = Example.from_dict(doc, labels)
            nlp.update([example], sgd=optimizer, losses=losses)
    print(losses)

{'textcat': 852.7124040069709}
{'textcat': 1488.756337341253}
{'textcat': 1921.5418028594784}
{'textcat': 2206.393144192325}
{'textcat': 2411.3974209487005}
{'textcat': 2578.143257811827}
{'textcat': 2716.080651251046}
{'textcat': 2831.941919103612}


# Making Predictions

In [378]:
texts = nuclear_threat_test[test_on]
docs = [nlp.tokenizer(text) for text in texts]

# Use textcat to get the scores for each doc
if assume:
  textcat = nlp.get_pipe('textcat')
else:
  textcat = nlp.get_pipe('textcat_multilabel')
scores = textcat.predict(docs)

The scores are used to predict a single class or label by choosing the label with the highest probability. You get the index of the highest probability with `scores.argmax`, then use the index to get the label string from `textcat.labels`.

In [379]:
# From the scores, find the label with the highest score/probability
predicted_labels = scores.argmax(axis=1)
predicted_labels = [textcat.labels[label] for label in predicted_labels]

Examples

In [380]:
# texts = ["russia has a right for defensive nuclear alert"]
# docs = [nlp.tokenizer(text) for text in texts]
    
# # Use textcat to get the scores for each doc
# textcat = nlp.get_pipe('textcat')
# scores = textcat.predict(docs)
# # From the scores, find the label with the highest score/probability
# predicted_labels = scores.argmax(axis=1)
# predicted_labels = [textcat.labels[label] for label in predicted_labels]
# predicted_labels

#Evaluation. Confusion matrix

In [381]:
labels_test = list(nuclear_threat_test['label'])
len(labels_test)

349

In [382]:
tp = 0
tn = 0
fp = 0
fn = 0

for i in range(len(labels_test)):
  print(40*"-")
  print("Event ", i, predicted_labels[i], labels_test[i])
  if predicted_labels[i] == "E" and labels_test[i] == "E":
    tp += 1
    print("TP")
  if predicted_labels[i] != "E" and labels_test[i] != "E":
    tn += 1
    print("TN")
  if predicted_labels[i] == "E" and labels_test[i] != "E":
    fp += 1
    print("FP")
  if predicted_labels[i] != "E" and labels_test[i] == "E":
    fn += 1
    print("FN")
  print(docs[i])
tp, tn, fp, fn

----------------------------------------
Event  0 D E
FN
mil/about/fact-sheets/article/454593/air-force-global-strike-command-air-forces-strategic-air/
----------------------------------------
Event  1 D D
TN
thing ’ veto pen
----------------------------------------
Event  2 D D
TN
not exaggeration
----------------------------------------
Event  3 D D
TN
combined massive support ukrainian armed force receiving actually received nato allied country many year forced president putin change stance realise quick victory blitzkrieg taking control kiev within day actually failed
----------------------------------------
Event  4 D D
TN
marilu lucrezio rai tv comment italy 's role current crisis happy birthday today nato secretary general thank
----------------------------------------
Event  5 D D
TN
thought could divide democrat republican home
----------------------------------------
Event  6 E E
TP
supposed hour ago european union meeting — speak
----------------------------------------
Even

(80, 155, 57, 57)

In [383]:
def calculate_f1_score(tp, tn, fp, fn):
    if tp+fp == 0:
      precision = 0.0
    else:
      precision = tp / (tp + fp)
    
    if tp+fn == 0:
      recall = 0.0
    else:
      recall = tp / (tp + fn)
    if precision + recall == 0:
      f1_score = 0.0
    else:
      f1_score = 2 * (precision * recall) / (precision + recall)
    return f1_score

def calculate_accuracy(tp, tn, fp, fn):
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    return accuracy

print("F1: ", calculate_f1_score(tp, tn, fp, fn))
print("Accuracy: ", calculate_accuracy(tp, tn, fp, fn))

F1:  0.583941605839416
Accuracy:  0.673352435530086


#Top words associated with each class

In [384]:
import pandas as pd
import re

df = nuclear_threat
# Assuming 'df' is your dataframe and 'text_column' is the column containing the text data
text_data = ' '.join(df['text'].astype(str).tolist())

# Tokenize the text into individual words
tokens = nltk.word_tokenize(text_data)

# Generate ngrams from the tokens
bigrams = nltk.ngrams(tokens, 2)

# Get unique bigrams using a set
unique_bigrams = set(bigrams)

# Convert the set back to a list
unique_bigrams_list = list(unique_bigrams)
unique_bigrams_list = [" ".join(b) for b in unique_bigrams]

texts = unique_bigrams_list
docs = [nlp.tokenizer(text) for text in texts]

# Use textcat to get the scores for each doc
if assume:
  textcat = nlp.get_pipe('textcat')
else:
  textcat = nlp.get_pipe('textcat_multilabel')
scores = textcat.predict(docs)

dictionary = {k: v[0] for k, v in zip(texts, scores)}

sorted_dict = dict(sorted(dictionary.items(), key=lambda x: x[1],reverse=True))

top_k = 20
i = 0
for key, value in sorted_dict.items():
    print(key, value)
    i+=1
    if i == top_k:
      break

zelensky participate 1.0
straight answer 1.0
really answer 1.0
invade surely 0.9999999
union demonstrate 0.9999999
answer think 0.9999999
test series 0.9999999
protect refugee 0.9999999
production certain 0.9999999
month demonstrate 0.9999999
ukraine destroyed 0.9999999
border express 0.9999999
ukraine record 0.9999999
legislative process 0.99999976
significant timely 0.99999976
redeployment yet 0.99999976
say answer 0.99999976
southern border 0.99999976
debate public 0.99999976
integration refugee 0.99999976
