In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Embedding, LSTM, Bidirectional, Dense, TimeDistributed
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import LabelEncoder
from tensorflow_addons.text.crf import crf_log_likelihood, viterbi_decode
# Load and preprocess data
def load_text(file_path):
    with open(file_path, 'r') as f:
        return f.read()
def preprocess_data(text, entities):
    tokens = text.split()  # Simple whitespace tokenization
    labels = [entities.get(token, 'O') for token in tokens]
    return tokens, labels
# Define CRF Layer
class CRFLayer(tf.keras.layers.Layer):
    def __init__(self, num_tags, **kwargs):
        super(CRFLayer, self).__init__(**kwargs)
        self.num_tags = num_tags
    def build(self, input_shape):
        self.transitions = self.add_weight(
            name="transitions",
            shape=(self.num_tags, self.num_tags),
            initializer="glorot_uniform"
        )
        super(CRFLayer, self).build(input_shape)
    def call(self, logits):
        return logits
    def get_loss(self, logits, labels, sequence_lengths):
        sequence_lengths = tf.cast(sequence_lengths, tf.int32)
        log_likelihood, _ = crf_log_likelihood(
            logits,
            labels,
            sequence_lengths,
            self.transitions
        )
        return -tf.reduce_mean(log_likelihood)
# Decode CRF predictions
def crf_decode(logits, sequence_lengths, transitions):
    """Decodes logits using Viterbi algorithm."""
    decoded_sequences = []
    for logit, seq_len in zip(logits, sequence_lengths):
        viterbi_path, _ = viterbi_decode(logit[:seq_len], transitions)
        decoded_sequences.append(viterbi_path)
    return decoded_sequences
# Load and process data
def load_and_process_data(file_path, entities_example, max_len=50):
    text = load_text(file_path)
    tokens, labels = preprocess_data(text, entities_example)
    # Encode tokens and labels
    word_encoder = LabelEncoder()
    word_encoder.fit(tokens)
    encoded_tokens = word_encoder.transform(tokens)
    label_encoder = LabelEncoder()
    label_encoder.fit(labels)
    encoded_labels = label_encoder.transform(labels)
    # Pad sequences for model input
    X = tf.keras.preprocessing.sequence.pad_sequences([encoded_tokens], maxlen=max_len, padding='post')
    Y = tf.keras.preprocessing.sequence.pad_sequences([encoded_labels], maxlen=max_len, padding='post')
    return X, Y, tokens, label_encoder
# Define entities dictionary for NER classification
entities_example = {
    'discovery': 'B-EVENT',
    'Italian': 'B-LOCATION',
    'Wired': 'B-ORG',
    'Aimee': 'B-PERSON',
    'Mullins': 'I-PERSON',
}
label_list = [
    'O', 'B-PERSON', 'I-PERSON', 'B-ORG', 'I-ORG', 'B-LOCATION', 'I-LOCATION', 'B-EVENT', 'I-EVENT'
]
# Load and process data
max_len = 1028
X_train, Y_train, train_tokens, label_encoder = load_and_process_data("transcription_train.txt", entities_example, max_len)
X_test, Y_test, test_tokens, _ = load_and_process_data("transcription_test_AimeeMullins_1249s.txt", entities_example, max_len)
# Define model
input_layer = Input(shape=(max_len,))
embedding_layer = Embedding(input_dim=10000, output_dim=1028, mask_zero=True)(input_layer)
bi_lstm_layer = Bidirectional(LSTM(1028, return_sequences=True))(embedding_layer)
dense_layer = TimeDistributed(Dense(len(label_list)))(bi_lstm_layer)
# CRF Layer
crf_layer = CRFLayer(num_tags=len(label_list))
logits = crf_layer(dense_layer)
# Build model
model = Model(inputs=input_layer, outputs=logits)
# Compile with dummy loss (manual loss handling below)
model.compile(optimizer=Adam(learning_rate=0.001), loss="sparse_categorical_crossentropy")
# Compute sequence lengths
sequence_lengths = tf.reduce_sum(tf.cast(tf.not_equal(X_train, 0), tf.int32), axis=-1)
# Training step with custom CRF loss
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss = crf_layer.get_loss(logits, y, sequence_lengths)
    gradients = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss
# Training loop
epochs = 10
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")
    loss = train_step(X_train, Y_train)
    print(f"Loss: {loss.numpy()}")
# Decoding predictions
sequence_lengths_test = tf.reduce_sum(tf.cast(tf.not_equal(X_test, 0), tf.int32), axis=-1).numpy()
predictions = model.predict(X_test)
decoded_predictions = crf_decode(predictions, sequence_lengths_test, crf_layer.transitions)
# Map predictions back to labels
decoded_labels = []
for pred in decoded_predictions:
    decoded_labels.append(label_encoder.inverse_transform(pred))
# Display tokens and predicted NER labels
for token, label in zip(test_tokens, decoded_labels[0]):
    print(f"Token: {token}, Predicted Label: {label}")
 
has context menu