In [199]:
import numpy as np
import pandas as pd
import jsonlines

import tensorflow as tf
tf.config.run_functions_eagerly(True)

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Flatten, Concatenate, ZeroPadding1D, Layer
from tensorflow.keras.losses import binary_crossentropy, categorical_crossentropy
from tensorflow.keras.preprocessing.text import Tokenizer
from keras import ops
from tensorflow import keras
from tensorflow.keras.saving import register_keras_serializable
import tensorflow_probability as tfp

from sklearn.preprocessing import label_binarize

import nltk
nltk.download("stopwords")
import string
from nltk.corpus import stopwords
from nltk.translate.bleu_score import sentence_bleu

from collections import Counter
import matplotlib.pyplot as plt

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\sh2482\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


ImportError: DLL load failed while importing _path: The specified module could not be found.

In [7]:
domain_weight = 0.3
@keras.saving.register_keras_serializable()
def generator_loss(y_true, y_pred):
    """
    Calculates a custom loss function incorporating uncertainty lexicon.

    Args:
        y_true: The ground truth labels.
        y_pred: The predicted labels.

    Returns:
        The combined loss value.
    """
    text_loss = categorical_crossentropy(y_true, y_pred)

    # Extract uncertainty words embeddings from the embedding layer
    uncertainty_word_embeddings = tf.nn.embedding_lookup(generator.get_layer('embedding').weights[0], finance_token)

    # Compute similarity between predicted embeddings and uncertainty word embeddings
    predicted_word_embeddings = tf.nn.embedding_lookup(generator.get_layer('embedding').weights[0], tf.argmax(y_pred, axis=-1))
    similarity = tf.matmul(predicted_word_embeddings, uncertainty_word_embeddings, transpose_b=True)

    # Calculate uncertainty loss based on similarity
    uncertainty_loss = tf.reduce_mean(tf.reduce_max(similarity, axis=-1))

    # Adjust the weights of the text loss and uncertainty loss
    combined_loss = text_loss + uncertainty_loss * domain_weight

    return combined_loss


In [8]:
@keras.saving.register_keras_serializable()
def discriminator_loss(real_output, fake_output):
    """
    Calculates loss function to distinguish between real and fake text.

    Args:
        real_output: The real text.
        fake_output: The generated tex.

    Returns:
        The loss value.
    """
    real_loss = binary_crossentropy(tf.ones_like(real_output), real_output)
    fake_loss = binary_crossentropy(tf.zeros_like(fake_output), fake_output)
    label_loss = binary_crossentropy(real_output, fake_output)
    return real_loss + fake_loss + label_loss

In [25]:
@keras.saving.register_keras_serializable()
class Generator(keras.Model):
    def __init__(self, vocab_size, embedding_dim, latent_dim, label_dim, **kwargs):
        super(Generator, self).__init__(**kwargs)
        self.embedding = Embedding(vocab_size, embedding_dim, name="embedding")
        self.lstm = LSTM(256, return_sequences=True)
        self.dense = Dense(vocab_size, activation="softmax")
        self.label_embedding = Dense(embedding_dim)
    
    def call(self, inputs, training=None):
        noise, label = inputs
        noise = tf.reshape(noise, (-1, latent_dim)) 
        label_embedding = self.label_embedding(label)
        embedded_noise = self.embedding(noise)
        label_embedding_repeated = tf.tile(tf.expand_dims(label_embedding, 1), [1, tf.shape(embedded_noise)[1], 1])
        combined_input = keras.layers.concatenate([embedded_noise, label_embedding_repeated])
        output = self.lstm(combined_input)
        output = self.dense(output)
        return output

In [10]:
@keras.saving.register_keras_serializable()
class Discriminator(keras.Model):
  def __init__(self, vocab_size, embedding_dim, label_dim, **kwargs):
    super(Discriminator, self).__init__(**kwargs)
    self.embedding = Embedding(vocab_size, embedding_dim)
    self.lstm = LSTM(256)
    self.label_embedding = Dense(embedding_dim)
    self.dense = Dense(1, activation="sigmoid")

  def call(self, inputs, training=None):
    text, label = inputs
    label_embedding = self.label_embedding(label)
    label_embedding_reshaped = tf.expand_dims(label_embedding, axis=1) 
    label_embedding_reshaped = tf.tile(label_embedding_reshaped, [1, max_len, 1]) 
    embedded_text = self.embedding(text)
    combined_input = keras.layers.concatenate([embedded_text, label_embedding_reshaped])
    output = self.lstm(combined_input)
    output = self.dense(output)
    return output


### Process Data

In [12]:
def preprocess(text):
    """
    Preprocesses text string by removing capitalization, punctuation, and stop words.

    Args:
        text: The text string to be preprocessed.

    Returns:
        The preprocessed text string.
    """

    text = text.lower()
    text = "".join(c for c in text if c not in string.punctuation)
    stop_words = stopwords.words("english")
    text = " ".join([word for word in text.split() if word not in stop_words])
    text = start_token + " " + text + " " + stop_token
    return text

In [124]:
def tokenize(text_main, text_finance, max_len):
    """
    Tokenizes preprocessed text string.

    Args:
        text: The preprocessed text string.
        max_len: The maximum length for tokenized sequences

    Returns:
        A list of integers representing the tokenized text sequence.
    """
    all_text = text_main + text_finance
    
    tokenizer = Tokenizer(num_words=vocab_size)
    tokenizer.fit_on_texts(all_text)
    vocab = tokenizer.word_index
    
    main_sequences = [tokenizer.texts_to_sequences([line])[0] for line in text_main] 
    for i in range(len(main_sequences)):
        if len(main_sequences[i]) > max_len:
            main_sequences[i] = main_sequences[i][:max_len]
        else:
            main_sequences[i] = tf.keras.utils.pad_sequences([main_sequences[i]], max_len, padding="post")[0]

    finance_tokens = [tokenizer.texts_to_sequences([word])[0] for word in text_finance]
    
    return main_sequences, finance_tokens, vocab, tokenizer

In [125]:
file = "data/aspect-level-certainty.jsonl"

annotate_science = []

with jsonlines.open(file) as f:
    for line in f.iter():
        annotate_science.append(line)

finance = pd.read_csv("data/10K_sentiment_list.csv")
uncertain_finance = finance[finance["Uncertainty"] != 0]["Word"].values

# Restructure annotated data from scientific set
label_codes = {"NotPresent": 0, "Certain": 1, "Uncertain": 2}
dims = {"Number": None, "Extent": None, "Probability": None, "Condition": None, "Suggestion": None, "Framing": None}
real_text = [x["finding"] for x in annotate_science]
real_labels = [[label_codes[x["aspect-level-certainty"][dim]] for dim in dims] for x in annotate_science]

latent_dim = 100
vocab_size = 10000
embedding_dim = 128
num_label_dims = 6
label_dim = 3
label_values = [0,1,2]
max_len = 64
start_token = "<START>"
start_index = 2
stop_token = "<STOP>"
stop_index = 1

real_text_process = [preprocess(x) for x in real_text]
finance_text_process = [preprocess(x) for x in uncertain_finance]

real_text_token, finance_token, vocab, tokenizer = tokenize(real_text_process, finance_text_process, max_len)
vocab_search = {value: key for key, value in vocab.items()}

In [64]:
def map_to_latent_space(token_embedding, latent_dim):
    """
    Map a token embedding to the latent space using a neural network.
    
    Args:
    - token_embedding (tf.Tensor): The token embedding from the embedding space.
    - latent_dim (int): The dimension of the latent space.
    
    Returns:
    - noise_vector (tf.Tensor): The noise vector in the latent space.
    """
    # Define a neural network to map token embedding to latent space
    mapping_network = tf.keras.Sequential([
        tf.keras.layers.Dense(512, activation='relu', input_shape=(token_embedding.shape[-1],)),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dense(latent_dim)
    ])
    
    # Map token embedding to latent space
    token_embedding = tf.squeeze(token_embedding, axis=1)
    noise_vector = mapping_network(token_embedding)
    return noise_vector

In [139]:
def generate(generator, label, max_length, temperature=0.5):
    """
    Generates a sequence using the provided generator model and parameters.
    Args:
        generator: A trained Generator model instance.
        noise: A tensor representing random noise (shape: (batch_size, latent_dim)).
        label: A tensor representing the label conditioning the generation.
        start_token: The integer value representing the token to start the sequence generation.
        max_length: The maximum length of the sequence to generate.
        temperature (optional): A scalar value controlling the randomness of the generated sequence (defaults to 1.0).
    Returns:
        A list containing the generated sequence (excluding the start token).
    """
    generated_sequence = []
    # Initialize generated sequence with start token
    start_embedding = generator.embedding(tf.expand_dims([start_index], 0))
    start_noise = map_to_latent_space(start_embedding, latent_dim)
    
      # Get initial predictions (logits)
    predictions = generator.predict([start_noise, label])[:, -1:, :]  # Take only last token logits
      # Loop for generating sequence
    for _ in range(max_length):
        # Sample a new token based on categorical distribution
        probs = tfp.distributions.Categorical(logits=predictions[0] / temperature).sample().numpy()[0]  
        generated_token = int(probs)

        while generated_token >= len(vocab_search):
            probs = tfp.distributions.Categorical(logits=predictions[0] / temperature).sample().numpy()[0]  
            generated_token = int(probs)
        
        # Stop if end token is generated or max length is reached
        if generated_token == stop_index or len(generated_sequence) >= max_length + 1:
            break
            

        generated_sequence.append(generated_token)
    
        next_token_embedding = generator.embedding(tf.expand_dims([generated_token], 0))
    
        next_token_noise = map_to_latent_space(next_token_embedding, latent_dim)
    
        # Get predictions for the next step based on the embedding and label
        predictions = generator.predict([next_token_noise, label])[:, -1:, :]
    
      # Return generated sequence excluding start token
    return generated_sequence[1:]

### Metric

In [176]:
def calculate_probability_distribution(tokens):
    flat_tokens = [item for sublist in tokens for item in sublist]
    # Count the occurrences of each token
    token_counts = Counter(flat_tokens)
    total_tokens = sum(token_counts.values())
    # Convert counts to probabilities
    probabilities = {token: count / total_tokens for token, count in token_counts.items()}
    return probabilities

In [165]:
def kl_divergence(p, q, smoothing = 1e-9):
    """
    Calculate the Kullback-Leibler Divergence between two probability distributions given as dictionaries of tokens
    and their probabilities.

    Args:
    - p (dict): Dictionary representing the probability distribution p.
    - q (dict): Dictionary representing the probability distribution q.

    Returns:
    - kl_div (float): Kullback-Leibler Divergence.
    """
    kl_div = 0
    all_tokens = set(p.keys()) | set(q.keys())

    for token in all_tokens:
        p_prob = p.get(token, 0) + smoothing
        q_prob = q.get(token, 0) + smoothing
        kl_div += p_prob * np.log(p_prob / q_prob)

    return kl_div

### Test Number Uncertainty Model

In [168]:
number_generator = Generator(vocab_size, embedding_dim, latent_dim, label_dim)
number_discriminator = Discriminator(vocab_size, embedding_dim, label_dim)
number_generator.load_weights("model_weights/generator_Number.weights.h5")

In [None]:
test = 30
label_uncertain = label_binarize([label_codes["Uncertain"]], classes=[0,1,2])
label_certain = label_binarize([label_codes["Certain"]], classes=[0,1,2])


number_results_uncertain = [generate(number_generator, label_uncertain, max_len, temperature = 0.5) for i in range(test)]
number_results_certain = [generate(number_generator, label_certain, max_len, temperature = 0.5) for i in range(test)]

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 304ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 267ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 241ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 249ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 230ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 238ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 253ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 254ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 259ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 252ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 267ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 254ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

In [179]:
original = calculate_probability_distribution(real_text_token)
generated_number_certain = calculate_probability_distribution(number_results_certain)
generated_number_uncertain = calculate_probability_distribution(number_results_uncertain)

In [180]:
kl_div = kl_divergence(original, generated_number_certain)
print("KL Divergence - Original and Number (Certain):", kl_div)

KL Divergence - Original and Number (Certain): 17.687713121949034


In [181]:
kl_div = kl_divergence(original, generated_number_uncertain)
print("KL Divergence - Original and Number (Uncertain):", kl_div)

KL Divergence - Original and Number (Uncertain): 7.52185765111166


In [182]:
kl_div = kl_divergence(generated_number_certain, generated_number_uncertain)
print("KL Divergence - Number Certain and Uncertain:", kl_div)

KL Divergence - Number Certain and Uncertain: 10.769277623302795


### Test Extent Uncertainty Model

In [202]:
extent_generator = Generator(vocab_size, embedding_dim, latent_dim, label_dim)
extent_discriminator = Discriminator(vocab_size, embedding_dim, label_dim)
extent_generator.load_weights("model_weights/generator_Extent.weights.h5")

In [None]:
test = 30

extent_results_uncertain = [generate(number_generator, label_uncertain, max_len, temperature = 0.5) for i in range(test)]
extent_results_certain = [generate(number_generator, label_certain, max_len, temperature = 0.5) for i in range(test)]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 331ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 321ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 258ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 268ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 253ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 252ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 257ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 269ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 261ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 260ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 269ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 322ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 272ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

In [200]:
generated_extent_certain = calculate_probability_distribution(extent_results_certain)
generated_extent_uncertain = calculate_probability_distribution(extent_results_uncertain)

In [None]:
kl_div = kl_divergence(original, generated_extent_certain)
print("KL Divergence - Original and Extent (Certain):", kl_div)

In [None]:
kl_div = kl_divergence(original, generated_extent_certain)
print("KL Divergence - Original and Extent (Certain):", kl_div)