# Task: `Natural Language Processing`

Given an audio transcription of a turret command instruction, return a JSON object corresponding to the specified target’s description, heading, and the tool to be deployed against it.

**For Advanced teams**, the transcript will be a turret instruction in natural language.

**For Novice teams**, the transcript will follow a relatively structured format with the turret operator explicitly reading out the turret, heading, and tool, though the order may vary.

_Insert Code Here_

In [1]:
# !pip install -q seqeval spacy

In [2]:
import os
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
import accelerate
import torch
import json
from datasets import Dataset
from sklearn.model_selection import train_test_split
import spacy
import numpy as np
from seqeval.metrics import classification_report

In [3]:
cur_dir = os.getcwd()
nlp_dir = os.path.dirname(cur_dir)
til_dir = os.path.dirname(nlp_dir)
home_dir = os.path.dirname(til_dir)
test_dir = os.path.join(home_dir, 'novice')
audio_dir = os.path.join(test_dir, 'audio')

til_dir

'/home/jupyter/til-24-base'

## Fine Tuning bert-base-uncased

In [4]:
label_list = ["O", "B-TARGET", "I-TARGET", "B-HEADING", "I-HEADING", "B-TOOL", "I-TOOL"]
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

In [29]:
import re

def clean_transcript(transcript):
    # Convert to lowercase
    transcript = transcript.lower()
    
    # Remove punctuation except hyphens
    cleaned_transcript = re.sub(r'[^\w\s-]', '', transcript)

    # Remove extra whitespace
    cleaned_transcript = re.sub(r'\s+', ' ', cleaned_transcript).strip()
    
    return cleaned_transcript

def word_to_num(word):
    """Convert number words to digits if possible, and handle variations."""
    word = word.replace(',', '')  # Remove commas
    num_dict = {
        'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
        'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9'
    }
    
    # Regular expression patterns for numbers
    pattern_actual = r'^(zero|one|two|three|four|five|six|seven|eight|nine)$'
    pattern_variations = rf'^{pattern_actual}(er|s)?$'
    
    match = re.match(pattern_variations, word)
    
    if match:
        actual = match.group(1)
        return num_dict[actual]
    
    return word

def read_jsonl(file_path):
    tokens = []
    labels = []
    label_map = {
        "target": "TARGET",
        "heading": "HEADING",
        "tool": "TOOL"
    }

    with open(file_path, "r") as f:
        for idx, line in enumerate(f):
            data = json.loads(line)
            transcript = clean_transcript(data["transcript"]).split()
            transcript = [word_to_num(word) for word in transcript]
            token_list = []
            label_list = ["O"] * len(transcript)

            for key in ["target", "heading", "tool"]:
                if key in data:
                    entity = clean_transcript(data[key]).split()
                    if key == "heading":
                        entity = list(data[key])
                    entity_len = len(entity)
                    for i in range(len(transcript)):
                        if transcript[i:i + entity_len] == entity:
                            # if key == "heading" and idx < 5: # DEBUG
                            #     print(entity)
                            label_list[i] = f"B-{label_map[key].upper()}"
                            for j in range(1, entity_len):
                                label_list[i + j] = f"I-{label_map[key].upper()}"
                            break

            tokens.append(transcript)
            labels.append(label_list)

    return tokens, labels

tokens, labels = read_jsonl(os.path.join(test_dir, "nlp.jsonl"))

# Split data into train, validation, and test sets
train_tokens, temp_tokens, train_labels, temp_labels = train_test_split(tokens, labels, test_size=0.4, random_state=42)
val_tokens, test_tokens, val_labels, test_labels = train_test_split(temp_tokens, temp_labels, test_size=0.5, random_state=42)

# Create datasets
train_dataset = Dataset.from_dict({"tokens": train_tokens, "ner_tags": train_labels})
val_dataset = Dataset.from_dict({"tokens": val_tokens, "ner_tags": val_labels})
test_dataset = Dataset.from_dict({"tokens": test_tokens, "ner_tags": test_labels})

Longest token sequence: ['heading', 'is', '2', '6', '0', 'target', 'is', 'black', 'white', 'and', 'yellow', 'commercial', 'aircraft', 'tool', 'to', 'deploy', 'is', 'surface-to-air', 'missiles']


In [6]:
print(train_dataset[0]['tokens'])
print(train_dataset[0]['ner_tags'])

['heading', 'is', '1', '6', '5', 'target', 'is', 'grey', 'and', 'purple', 'drone', 'tool', 'to', 'deploy', 'is', 'anti-air', 'artillery']
['O', 'O', 'B-HEADING', 'I-HEADING', 'I-HEADING', 'O', 'O', 'B-TARGET', 'I-TARGET', 'I-TARGET', 'I-TARGET', 'O', 'O', 'O', 'O', 'B-TOOL', 'I-TOOL']


In [7]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True, padding="max_length")
    labels = []

    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  # Special token like [CLS], [SEP], etc.
            elif word_idx != previous_word_idx:
                label_ids.append(label2id[label[word_idx]])
            else:
                label_ids.append(-100)  # Set padding tokens' label to -100
            previous_word_idx = word_idx

        labels.append(label_ids)

    # Ensure all label sequences are the same length
    max_len = len(tokenized_inputs["input_ids"][0])
    for label_ids in labels:
        if len(label_ids) < max_len:
            label_ids.extend([-100] * (max_len - len(label_ids)))

    tokenized_inputs["labels"] = labels
    tokenized_inputs["word_ids"] = [tokenized_inputs.word_ids(batch_index=i) for i in range(len(examples["tokens"]))]

    return tokenized_inputs

# Tokenize datasets
tokenized_train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_val_dataset = val_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_test_dataset = test_dataset.map(tokenize_and_align_labels, batched=True)



Map:   0%|          | 0/2100 [00:00<?, ? examples/s]

Map:   0%|          | 0/700 [00:00<?, ? examples/s]

Map:   0%|          | 0/700 [00:00<?, ? examples/s]

In [8]:
# first_element = tokenized_train_dataset[0]
# print(first_element)

# def print_word_ids(tokenized_inputs, idx):
#     word_ids = tokenized_inputs.word_ids(batch_index=idx)
#     tokens = tokenized_inputs.tokens(batch_index=idx)
#     print(f"Tokens: {tokens}")
#     print(f"Word IDs: {word_ids}")

# for i in range(len(tokenized_test_dataset)):
#     tokenized_example = tokenizer(tokenized_test_dataset[i]['tokens'], truncation=True, is_split_into_words=True, padding="max_length")
#     print(f"\nExample {i}:")
#     print_word_ids(tokenized_example, 0)

# print(labels)

### Original Model

In [9]:
model = AutoModelForTokenClassification.from_pretrained("bert-base-uncased", num_labels=len(label_list))
model.gradient_checkpointing_enable() # using gradient checkpointing to reduce memory usage by trading off compute time

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### Training

In [10]:
# Set up the training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    logging_strategy="steps",  # Enable logging
    logging_steps=50,  # Log every 50 steps
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_val_dataset,
    tokenizer=tokenizer,
)

# Train the model
trainer.train()

trainer.save_model("./models/nlp_4")
tokenizer.save_pretrained("./models/nlp_4")

Epoch,Training Loss,Validation Loss
1,0.0053,0.00107
2,0.0017,0.000586
3,0.0014,0.000501


('./models/nlp_4/tokenizer_config.json',
 './models/nlp_4/special_tokens_map.json',
 './models/nlp_4/vocab.txt',
 './models/nlp_4/added_tokens.json',
 './models/nlp_4/tokenizer.json')

### Loading Model

In [11]:
# Load the model and tokenizer
model = AutoModelForTokenClassification.from_pretrained("./models/nlp_4")
tokenizer = AutoTokenizer.from_pretrained("./models/nlp_4")

training_args = TrainingArguments(
    output_dir='./results',
    per_device_eval_batch_size=8,
)

# Define the Trainer for evaluation
trainer = Trainer(
    model=model,
    args=training_args,
    tokenizer=tokenizer,
)

In [12]:
# Evaluate the model on the test set
predictions, label_ids, _ = trainer.predict(tokenized_test_dataset)

In [13]:
# USE SPACY IF MORE THAN 2 PREDICTIONS
from sklearn.metrics.pairwise import cosine_similarity

nlp = spacy.load("en_core_web_sm")

def get_mean_embedding_spacy(text):
    doc = nlp(text)
    return np.mean([token.vector for token in doc if token.has_vector], axis=0)

# Representative vectors for descriptive and weapon concepts
descriptive_vector = get_mean_embedding_spacy("colorful vibrant bright vivid") # TODO since data is mostly color describing targets
weapon_vector = get_mean_embedding_spacy("gun rifle knife bomb missile grenade weapon")

def cosine_sim(vec1, vec2):
    return cosine_similarity([vec1], [vec2])[0][0]

def select_best_entity(entities, representative_vector_fn, concept_vector):
    best_entity = None
    best_similarity = -1
    for entity in entities:
        entity_vector = representative_vector_fn(entity)
        similarity = cosine_sim(entity_vector, concept_vector)
        if similarity > best_similarity:
            best_similarity = similarity
            best_entity = entity
    return best_entity

In [15]:
def align_predictions(predictions, label_ids, tokenized_inputs):
    preds = np.argmax(predictions, axis=2)

    # Initialize lists to hold labels, entities, and tokens
    label_list = [[] for _ in range(len(label_ids))]
    pred_list = [[] for _ in range(len(label_ids))]
    entity_list = [[] for _ in range(len(label_ids))]
    token_list = [[] for _ in range(len(label_ids))]

    for i in range(len(label_ids)):
        current_entity = []
        current_label = None
        word_ids = tokenized_inputs["word_ids"][i]  # Get word IDs for the current example
        tokens = tokenizer.convert_ids_to_tokens(tokenized_inputs["input_ids"][i])  # Get tokens for the current example

        entity_types = {}

        for j, word_idx in enumerate(word_ids):
            # Skip special tokens and padding
            if word_idx is None or label_ids[i][j] == -100:
                continue

            true_label = id2label[label_ids[i][j]]
            pred_label = id2label[preds[i][j]]

            label_list[i].append(true_label)
            pred_list[i].append(pred_label)
            token_list[i].append(tokens[j])

            # Extract entities based on predicted labels
            if pred_label.startswith("B-"):
                if current_entity and current_label:
                    if current_label not in entity_types:
                        entity_types[current_label] = []
                    entity_types[current_label].append(" ".join(current_entity))
                current_entity = [tokens[j]]
                current_label = pred_label[2:]
            elif pred_label.startswith("I-") and current_label == pred_label[2:]:
                current_entity.append(tokens[j])
            else:
                if current_entity and current_label:
                    if current_label not in entity_types:
                        entity_types[current_label] = []
                    entity_types[current_label].append(" ".join(current_entity))
                current_entity = []
                current_label = None

        if current_entity and current_label:
            if current_label not in entity_types:
                entity_types[current_label] = []
            entity_types[current_label].append(" ".join(current_entity))

        for entity_type, entities in entity_types.items():
            if entity_type == "TARGET":
                best_entity = sorted(entities, key=lambda x: cosine_sim(get_mean_embedding_spacy(x), descriptive_vector), reverse=True)[0]
            elif entity_type == "TOOL":
                best_entity = sorted(entities, key=lambda x: cosine_sim(get_mean_embedding_spacy(x), weapon_vector), reverse=True)[0]
            else:
                best_entity = entities[0]
            entity_list[i].append((entity_type, best_entity))

    return token_list, pred_list, label_list, entity_list

tokens, pred_labels, true_labels, entities = align_predictions(predictions, label_ids, tokenized_test_dataset)

In [16]:
zipped_results = list(zip(tokens, pred_labels, true_labels, entities))

# Print the zipped results for inspection
for token, pred, true, entity in zipped_results[:20]:
    print("Tokens:", token)
    print("Predicted Labels: ", pred)
    print("True Labels: ", true)
    print("Entities: ", entity)
    print("\n")

Tokens: ['heading', 'is', '2', '0', '0', 'target', 'is', 'silver', 'and', 'black', 'drone', 'tool', 'to', 'deploy', 'is', 'drone', 'catcher']
Predicted Labels:  ['O', 'O', 'B-HEADING', 'I-HEADING', 'I-HEADING', 'O', 'O', 'B-TARGET', 'I-TARGET', 'I-TARGET', 'I-TARGET', 'O', 'O', 'O', 'O', 'B-TOOL', 'I-TOOL']
True Labels:  ['O', 'O', 'B-HEADING', 'I-HEADING', 'I-HEADING', 'O', 'O', 'B-TARGET', 'I-TARGET', 'I-TARGET', 'I-TARGET', 'O', 'O', 'O', 'O', 'B-TOOL', 'I-TOOL']
Entities:  [('HEADING', '2 0 0'), ('TARGET', 'silver and black drone'), ('TOOL', 'drone catcher')]


Tokens: ['heading', 'is', '2', '2', '0', 'target', 'is', 'white', 'cargo', 'aircraft', 'tool', 'to', 'deploy', 'is', 'electromagnetic', 'pulse']
Predicted Labels:  ['O', 'O', 'B-HEADING', 'I-HEADING', 'I-HEADING', 'O', 'O', 'B-TARGET', 'I-TARGET', 'I-TARGET', 'O', 'O', 'O', 'O', 'B-TOOL', 'I-TOOL']
True Labels:  ['O', 'O', 'B-HEADING', 'I-HEADING', 'I-HEADING', 'O', 'O', 'B-TARGET', 'I-TARGET', 'I-TARGET', 'O', 'O', 'O', 'O'

In [17]:
print(classification_report(true_labels, pred_labels))

              precision    recall  f1-score   support

     HEADING       1.00      1.00      1.00       637
      TARGET       1.00      1.00      1.00       700
        TOOL       1.00      1.00      1.00       700

   micro avg       1.00      1.00      1.00      2037
   macro avg       1.00      1.00      1.00      2037
weighted avg       1.00      1.00      1.00      2037



In [26]:
def predict_on_random_sentence(sentence, model):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    sentence = clean_transcript(sentence)
    tokens = sentence.split()
    tokens = [word_to_num(word) for word in tokens]
    inputs = tokenizer(tokens, is_split_into_words=True, return_tensors="pt", padding="max_length", truncation=True).to(device)
    model.to(device)
    
    with torch.no_grad():
        outputs = trainer.model(**inputs)
    predictions = outputs.logits.cpu().numpy()
    preds = np.argmax(predictions, axis=2)
    word_ids = inputs.word_ids(batch_index=0)

    pred_labels = [id2label[pred] if word_idx is not None else 'O' for pred, word_idx in zip(preds[0], word_ids)]
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])

    entity_list = []
    entity_types = {}
    current_entity = []
    current_label = None

    for token, label in zip(tokens, pred_labels):
        if label.startswith("B-"):
            if current_entity and current_label:
                if current_label not in entity_types:
                    entity_types[current_label] = []
                entity_types[current_label].append(" ".join(current_entity))
            current_entity = [token]
            current_label = label[2:]
        elif label.startswith("I-") and current_label == label[2:]:
            current_entity.append(token)
        else:
            if current_entity and current_label:
                if current_label not in entity_types:
                    entity_types[current_label] = []
                entity_types[current_label].append(" ".join(current_entity))
            current_entity = []
            current_label = None

    if current_entity and current_label:
        if current_label not in entity_types:
            entity_types[current_label] = []
        entity_types[current_label].append(" ".join(current_entity))

    for entity_type, entities in entity_types.items():
        if entity_type == "TARGET":
            best_entity = sorted(entities, key=lambda x: cosine_sim(get_mean_embedding_spacy(x), descriptive_vector), reverse=True)[0]
        elif entity_type == "TOOL":
            best_entity = sorted(entities, key=lambda x: cosine_sim(get_mean_embedding_spacy(x), weapon_vector), reverse=True)[0]
        else:
            best_entity = entities[0]
        entity_list.append((entity_type, best_entity))

    return tokens, pred_labels, entity_list

In [31]:
random_sentence = "Heading is one six seven, target is a large red and white cargo ship with black stripes, tool to deploy is radar surveillance system with advanced tracking capabilities and high-resolution imaging for detailed monitoring."
tokens, pred_labels, entity_list = predict_on_random_sentence(random_sentence, trainer.model)
print("Tokens:", tokens)
print("Predicted Labels:", pred_labels)
print("Entities:", entity_list)

Tokens: ['[CLS]', 'heading', 'is', '1', '6', '7', 'target', 'is', 'a', 'large', 'red', 'and', 'white', 'cargo', 'ship', 'with', 'black', 'stripes', 'tool', 'to', 'deploy', 'is', 'radar', 'surveillance', 'system', 'with', 'advanced', 'tracking', 'capabilities', 'and', 'high', '-', 'resolution', 'imaging', 'for', 'detailed', 'monitoring', '[SEP]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PA

TO IMPROVE