In [None]:
!pip install transformers datasets scikit-learn nltk

In [None]:
# hyperdimensional embeddings

import numpy as np
import logging
from datasets import load_dataset

logging.basicConfig(level=logging.INFO, force=True)
logger = logging.getLogger(__name__)

EMBEDDING_DIM = 1024
SEED = 42
DATASET_NAME = "yuan-yang/MALLS-v0"
SPLIT = "train"

PLACEHOLDER_TO_SYMBOL = {
    "[forall]": "∀",
    "[exists]": "∃",
    "[not]": "¬",
    "[implies]": "→",
    "[and]": "∧",
    "[or]": "∨",
    "[oplus]": "⊕",
    "[iff]": "↔",
}

SYMBOL_TO_PLACEHOLDER = {v: k for k, v in PLACEHOLDER_TO_SYMBOL.items()}

def generate_random_vector(dim, seed):
    """
    Generate a random bipolar vector of dimension 'dim' with elements -1 or 1.
    """
    rng = np.random.default_rng(seed)
    return rng.choice([-1, 1], size=dim)

def main():
    try:
        dataset = load_dataset(DATASET_NAME, split=SPLIT)
        logger.info(f"Loaded dataset '{DATASET_NAME}' with split '{SPLIT}' containing {len(dataset)} examples.")
    except Exception as e:
        logger.error(f"Error loading dataset '{DATASET_NAME}': {e}")
        return

    # Unique symbols
    predicates = set()
    variables = set()
    constants = set()
    operators = set()

    # Parse the FOL to extract unique predicates, variables, constants, and operators
    for idx, example in enumerate(dataset):
        fol = example.get('FOL', '')
        if not fol:
            logger.warning(f"Empty FOL expression at index {idx}. Skipping.")
            continue

        # Replace placeholders with actual symbols
        for ph, sym in PLACEHOLDER_TO_SYMBOL.items():
            fol = fol.replace(ph, sym)

        # Simple parsing
        tokens = fol.replace('(', ' ').replace(')', ' ').replace(',', ' ').split()
        for token in tokens:
            if token in SYMBOL_TO_PLACEHOLDER.values() or token in SYMBOL_TO_PLACEHOLDER:
                operators.add(token)
            elif token.islower():
                variables.add(token)
            elif token[0].isupper() and token[1:].islower():
                predicates.add(token)
            else:
                constants.add(token)

    logger.info(f"Extracted {len(predicates)} predicates, {len(variables)} variables, {len(constants)} constants, and {len(operators)} operators.")

    #---SYMBOL EMBEDDINGS---#

    symbol_embeddings = {}

    # Generate embeddings for operators and quantifiers
    operators_list = list(operators)
    quantifiers = ["∀", "∃"]
    operators_extended = operators_list + quantifiers

    for sym in operators_extended:
        seed = SEED + hash(sym) % 1000
        symbol_embeddings[sym] = generate_random_vector(EMBEDDING_DIM, seed=seed)

    # Generate embeddings for predicates
    for pred in predicates:
        seed = SEED + hash(pred) % 1000
        symbol_embeddings[pred] = generate_random_vector(EMBEDDING_DIM, seed=seed)

    # Generate embeddings for variables
    for var in variables:
        seed = SEED + hash(var) % 1000
        symbol_embeddings[var] = generate_random_vector(EMBEDDING_DIM, seed=seed)

    # Generate embeddings for constants
    for const in constants:
        seed = SEED + hash(const) % 1000
        symbol_embeddings[const] = generate_random_vector(EMBEDDING_DIM, seed=seed)

    # Save symbol embeddings
    os.makedirs("hyperdimensional_embeddings", exist_ok=True)
    np.save("hyperdimensional_embeddings/symbol_embeddings.npy", symbol_embeddings)
    logger.info("Saved 'symbol_embeddings.npy' successfully.")

    #---TYPE EMBEDDINGS---#

    type_embeddings = {}

    # Define types including 'predicate'
    types = ['quantifier', 'variable', 'constant', 'operator', 'predicate']

    for typ in types:
        seed = SEED + hash(typ) % 1000
        type_embeddings[typ] = generate_random_vector(EMBEDDING_DIM, seed=seed)

    np.save("hyperdimensional_embeddings/type_embeddings.npy", type_embeddings)
    logger.info("Saved 'type_embeddings.npy' successfully.")

    with open("hyperdimensional_embeddings/variables.txt", "w") as f:
        for var in variables:
            f.write(f"{var}\n")
    logger.info("Saved 'variables.txt' successfully.")

if __name__ == "__main__":
    main()


In [None]:
# constrained decoding

import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration
from transformers.generation.logits_process import LogitsProcessor, LogitsProcessorList
import numpy as np
import os
import logging

logging.basicConfig(level=logging.INFO, force=True)
logger = logging.getLogger(__name__)

MODEL_REPO = "snewmanaa/flan-t5-NL-FOL-baseline"
MAX_LENGTH = 128
BEAM_SIZE = 5  # Ensure BEAM_SIZE is set to an integer

SYMBOL_TO_PLACEHOLDER = {
    "∀": "[forall]",
    "∃": "[exists]",
    "¬": "[not]",
    "→": "[implies]",
    "∧": "[and]",
    "∨": "[or]",
    "⊕": "[oplus]",
    "↔": "[iff]",
}

PLACEHOLDER_TO_SYMBOL = {v: k for k, v in SYMBOL_TO_PLACEHOLDER.items()}

def load_variables(file_path="hyperdimensional_embeddings/variables.txt"):
    """
    Load variables list from the text file saved in step 1
    """
    try:
        with open(file_path, "r") as f:
            variables = [line.strip() for line in f.readlines()]
        logger.info(f"Loaded {len(variables)} variables from '{file_path}'.")
        return variables
    except Exception as e:
        logger.error(f"Error loading variables from '{file_path}': {e}")
        return []

def postprocess(decoded_texts):
    """
    Replace placeholders with original symbols
    """
    restored_texts = []
    for text in decoded_texts:
        for ph, sym in PLACEHOLDER_TO_SYMBOL.items():
            text = text.replace(ph, sym)
        restored_texts.append(text)
    return restored_texts

class QuantifierFollowProcessor(LogitsProcessor):
    """
    Custom LogitsProcessor to enforce that quantifiers are followed by variables
    """
    def __init__(self, quantifier_ids, variable_ids, device):
        super().__init__()
        self.quantifier_ids = quantifier_ids
        self.variable_ids = variable_ids
        self.device = device

    def __call__(self, input_ids, scores):
        # Last generated
        last_token_ids = input_ids[:, -1]

        # Create a mask where the last token is a quantifier
        is_quantifier = torch.zeros_like(last_token_ids, dtype=torch.bool)
        for q_id in self.quantifier_ids:
            is_quantifier = is_quantifier | (last_token_ids == q_id)

        # For each sequence where the last token is a quantifier,
        # set all logits to -inf except for variable_ids to prevent
        # ouputing anything other than a variable
        for i, flag in enumerate(is_quantifier):
            if flag:
                scores[i] = torch.full_like(scores[i], -float("inf"))
                # Allow only variable_ids
                scores[i, self.variable_ids] = 0.0

        return scores

def main():
    try:
        tokenizer = T5Tokenizer.from_pretrained(MODEL_REPO)
        model = T5ForConditionalGeneration.from_pretrained(MODEL_REPO)
        logger.info(f"Loaded model '{MODEL_REPO}' successfully.")
    except Exception as e:
        logger.error(f"Error loading model '{MODEL_REPO}': {e}")
        return

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)
    logger.info(f"Model moved to {device}.")

    quantifiers = ["[forall]", "[exists]"]

    variables = load_variables()
    if not variables:
        logger.error("No variables loaded. Make sure you've run hyperdimensional_embeddings")
        return

    # Convert tokens to IDs
    quantifier_ids = tokenizer.convert_tokens_to_ids(quantifiers)
    variable_ids = tokenizer.convert_tokens_to_ids(variables)

    logger.debug(f"Quantifier IDs: {quantifier_ids}")
    logger.debug(f"Variable IDs: {variable_ids}")

    # Initialize custom logits processor
    custom_processor = QuantifierFollowProcessor(quantifier_ids, variable_ids, device)
    custom_processors = LogitsProcessorList([custom_processor])

    #---EXAMPLE INPUT---#
    nl_sentence = "Every student loves some course."
    input_text = "Translate English to FOL: " + nl_sentence
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
    logger.info(f"Input Text: {input_text}")
    logger.debug(f"Input IDs: {input_ids}")

    logger.info("Generating FOL with constrained decoding...")
    try:
        with torch.no_grad():
            output_ids = model.generate(
                input_ids=input_ids,
                max_length=MAX_LENGTH,
                num_beams=BEAM_SIZE,
                logits_processor=custom_processors,
                early_stopping=True,
                return_dict_in_generate=False,
                output_scores=False,
                output_hidden_states=False
            )
    except Exception as e:
        logger.error(f"Error during generation: {e}")
        return

    # Decode and postprocess
    try:
        decoded_output = tokenizer.decode(output_ids[0], skip_special_tokens=True)
        final_output = postprocess([decoded_output])[0]
        logger.info(f"Generated FOL: {final_output}")
        print("Generated FOL:", final_output)
    except Exception as e:
        logger.error(f"Error during decoding: {e}")

if __name__ == "__main__":
    main()


In [30]:
# ranking with hyperdimensional embeddings

import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration
from transformers.generation.logits_process import LogitsProcessor, LogitsProcessorList
import numpy as np
import os
import logging
import random

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MODEL_REPO = "snewmanaa/flan-t5-NL-FOL-baseline"
MAX_LENGTH = 128
EMBEDDING_DIM = 1024
BEAM_SIZE = 5

SYMBOL_TO_PLACEHOLDER = {
    "∀": "[forall]",
    "∃": "[exists]",
    "¬": "[not]",
    "→": "[implies]",
    "∧": "[and]",
    "∨": "[or]",
    "⊕": "[oplus]",
    "↔": "[iff]",
}

PLACEHOLDER_TO_SYMBOL = {v: k for k, v in SYMBOL_TO_PLACEHOLDER.items()}

def load_embeddings(symbol_embeddings_path="hyperdimensional_embeddings/symbol_embeddings.npy",
                   type_embeddings_path="hyperdimensional_embeddings/type_embeddings.npy"):
    """
    Load hyperdimensional embeddings from .npy files from step 1
    """
    try:
        symbol_embeddings = np.load(symbol_embeddings_path, allow_pickle=True).item()
        type_embeddings = np.load(type_embeddings_path, allow_pickle=True).item()
        logger.info("Loaded hyperdimensional embeddings successfully.")
        return symbol_embeddings, type_embeddings
    except Exception as e:
        logger.error(f"Error loading embeddings: {e}")
        return None, None

def load_variables(file_path="hyperdimensional_embeddings/variables.txt"):
    """
    Load variables list from the text file saved in step 1
    """
    try:
        with open(file_path, "r") as f:
            variables = [line.strip() for line in f.readlines()]
        logger.info(f"Loaded {len(variables)} variables from '{file_path}'.")
        return variables
    except Exception as e:
        logger.error(f"Error loading variables from '{file_path}': {e}")
        return []

def postprocess(decoded_texts):
    """
    Replace placeholders with original symbols
    """
    restored_texts = []
    for text in decoded_texts:
        for ph, sym in PLACEHOLDER_TO_SYMBOL.items():
            text = text.replace(ph, sym)
        restored_texts.append(text)
    return restored_texts

def encode_fol(fol, symbol_embeddings, type_embeddings):
    """
    Encode a FOL string into a hyperdimensional vector
    """
    tokens = fol.replace('(', ' ').replace(')', ' ').replace(',', ' ').split()
    encoded = np.zeros(EMBEDDING_DIM)
    for token in tokens:
        if token in symbol_embeddings:
            symbol_vec = symbol_embeddings[token]
            if token in ['[forall]', '[exists]']:
                type_vec = type_embeddings['quantifier']
            elif token in ['[not]', '[implies]', '[and]', '[or]']:
                type_vec = type_embeddings['operator']
            elif token.islower():
                type_vec = type_embeddings['variable']
            elif token.isupper():
                type_vec = type_embeddings['predicate']
            else:
                type_vec = type_embeddings['constant']
            # Combine symbol and type vectors
            combined = symbol_vec * type_vec
            encoded += combined
    return encoded

def cosine_similarity(vec1, vec2):
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)
    if norm1 == 0 or norm2 == 0:
        return -1  # Minimal similarity
    return np.dot(vec1, vec2) / (norm1 * norm2)

def main():
    try:
        tokenizer = T5Tokenizer.from_pretrained(MODEL_REPO)
        model = T5ForConditionalGeneration.from_pretrained(MODEL_REPO)
        logger.info(f"Loaded model '{MODEL_REPO}' successfully.")
    except Exception as e:
        logger.error(f"Error loading model '{MODEL_REPO}': {e}")
        return

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)
    logger.info(f"Model moved to {device}.")

    # Load hyperdimensional embeddings
    symbol_embeddings, type_embeddings = load_embeddings()
    if symbol_embeddings is None or type_embeddings is None:
        logger.error("Failed to load embeddings. Exiting.")
        return

    # Load variables
    variables = load_variables()
    if not variables:
        logger.error("No variables loaded. Exiting.")
        return

    variable_ids = tokenizer.convert_tokens_to_ids(variables)
    logger.debug(f"Variable IDs: {variable_ids}")

    quantifiers = ["[forall]", "[exists]"]
    quantifier_ids = tokenizer.convert_tokens_to_ids(quantifiers)
    logger.debug(f"Quantifier IDs: {quantifier_ids}")

    #---EXAMPLES---#
    test_cases = [
        "Every student loves some course.",
        "No teacher dislikes any subject.",
        "Some professor teaches all classes.",
        "All humans are mortal.",
        "There exists a student who aces every test."
    ]

    for nl_sentence in test_cases:
        logger.info(f"\nProcessing NL Input: {nl_sentence}")
        input_text = "Translate English to FOL: " + nl_sentence
        input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
        logger.info(f"Input Text: {input_text}")

        logger.info("Generating beam outputs...")
        try:
            with torch.no_grad():
                outputs = model.generate(
                    input_ids=input_ids,
                    max_length=MAX_LENGTH,
                    num_beams=BEAM_SIZE,
                    early_stopping=False,  # For beam search
                    return_dict_in_generate=True,
                    output_scores=False,
                    output_hidden_states=False
                )
            beam_outputs = outputs.sequences
            decoded_outputs = tokenizer.batch_decode(beam_outputs, skip_special_tokens=True)
            decoded_outputs = postprocess(decoded_outputs)
            logger.info(f"Generated {len(decoded_outputs)} beam hypotheses.")
        except Exception as e:
            logger.error(f"Error during generation: {e}")
            continue

        logger.info("Encoding generated FOL statements into hyperdimensional vectors...")
        encoded_generated = [encode_fol(fol, symbol_embeddings, type_embeddings) 
                             for fol in decoded_outputs]

        templates = [
                "∀x (Predicate(x) → Predicate(x))",
                "∃x (Predicate(x) ∧ Predicate(x))",
                "∀x (Predicate(x) ↔ Predicate(x))",
                "∃x (Predicate(x) → Predicate(x))",
                "∀x ∃y (Predicate(x, y))",
                "∀x (Predicate1(x) → ∃y Predicate2(y))",
                "∃x ∀y (Predicate(x) → Predicate(y))",
                "∀x (Predicate(x) ∨ Predicate(x))",
                "∃x (¬Predicate(x))",
                "∀x ∀y (Predicate(x, y) → Predicate(y, x))",
                "∀x ∃y (Predicate1(x) ∧ Predicate2(y))",
                "∃x ∃y (Predicate1(x) ∨ Predicate2(y))",
                "∀x (¬Predicate(x))",
                "∃x (Predicate(x) → ∃y Predicate(y))",
                "∀x ∀y ∃z (Predicate(x, y, z))"
            ]

        reference_embeddings = [encode_fol(template, symbol_embeddings, type_embeddings) for template in templates]

        # Compute similarity of each hypothesis with all templates and select
        # the hypothesis with the highest similarity
        logger.info("Calculating similarities with templatic references...")
        best_similarity = -1
        best_fol = None
        for fol, gen_vec in zip(decoded_outputs, encoded_generated):
            sims = [cosine_similarity(gen_vec, ref_vec) for ref_vec in reference_embeddings]
            max_sim = max(sims)
            if max_sim > best_similarity:
                best_similarity = max_sim
                best_fol = fol

        logger.info("Ranking completed.")

        print("\n---")
        print(f"NL Input: {nl_sentence}")
        print(f"Standard Decoding FOL: {decoded_outputs[0]}")
        print(f"Constrained Decoding FOL: {decoded_outputs[0]}")
        print(f"Hyperdimensional Ranking FOL: {best_fol}")
        print("All Beam Hypotheses and Similarities:")
        for fol, gen_vec in zip(decoded_outputs, encoded_generated):
            sims = [cosine_similarity(gen_vec, ref_vec) for ref_vec in reference_embeddings]
            max_sim = max(sims)
            print(f"Similarity: {max_sim:.4f}, FOL: {fol}")
        print("---\n")

if __name__ == "__main__":
    main()


INFO:__main__:Loaded model 'snewmanaa/flan-t5-NL-FOL-baseline' successfully.
INFO:__main__:Model moved to cuda.
INFO:__main__:Loaded hyperdimensional embeddings successfully.
INFO:__main__:Loaded 620 variables from 'hyperdimensional_embeddings/variables.txt'.
INFO:__main__:Variable IDs: [2, 2, 2, 26, 4250, 7325, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 19747, 2, 2, 2, 22288, 2, 2, 2, 2, 2, 2, 2, 15974, 2, 107, 2, 2, 20779, 2, 2256, 2, 2, 20113, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3009, 2, 5842, 2, 2, 2, 2, 2, 2, 63, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1135, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 29, 2, 28188, 2, 2, 2, 2, 14700, 20309, 2, 2, 2, 2857, 2, 2, 2, 2, 2, 2, 2, 2, 2, 7149, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 5556, 2, 6075, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2242, 2, 2, 26809, 2, 2, 6633, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2138, 2, 2, 2, 2, 2, 5165, 2, 2, 2, 2, 8115, 2, 2, 2, 2, 2, 2, 2, 6779, 2, 2, 2, 2, 2, 2, 2, 2, 15727, 2, 28491, 2, 2, 2, 2, 2, 2, 2, 24671, 2, 2, 2


---
NL Input: Every student loves some course.
Standard Decoding FOL: ∀x (Student(x) → LovesCourses(x))
Constrained Decoding FOL: ∀x (Student(x) → LovesCourses(x))
Hyperdimensional Ranking FOL: ∀x (Student(x) → LovesCourses(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9291, FOL: ∀x (Student(x) → LovesCourses(x))
---


INFO:__main__:Generated 1 beam hypotheses.
INFO:__main__:Encoding generated FOL statements into hyperdimensional vectors...
INFO:__main__:Calculating similarities with templatic references...
INFO:__main__:Ranking completed.
INFO:__main__:
Processing NL Input: Some professor teaches all classes.
INFO:__main__:Input Text: Translate English to FOL: Some professor teaches all classes.
INFO:__main__:Generating beam outputs...



---
NL Input: No teacher dislikes any subject.
Standard Decoding FOL: ∀x (Teacher(x) → ¬DislikesSubject(x))
Constrained Decoding FOL: ∀x (Teacher(x) → ¬DislikesSubject(x))
Hyperdimensional Ranking FOL: ∀x (Teacher(x) → ¬DislikesSubject(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9282, FOL: ∀x (Teacher(x) → ¬DislikesSubject(x))
---


INFO:__main__:Generated 1 beam hypotheses.
INFO:__main__:Encoding generated FOL statements into hyperdimensional vectors...
INFO:__main__:Calculating similarities with templatic references...
INFO:__main__:Ranking completed.
INFO:__main__:
Processing NL Input: All humans are mortal.
INFO:__main__:Input Text: Translate English to FOL: All humans are mortal.
INFO:__main__:Generating beam outputs...



---
NL Input: Some professor teaches all classes.
Standard Decoding FOL: ∃x (Professor(x) ∧ TeachesClasses(x))
Constrained Decoding FOL: ∃x (Professor(x) ∧ TeachesClasses(x))
Hyperdimensional Ranking FOL: ∃x (Professor(x) ∧ TeachesClasses(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9269, FOL: ∃x (Professor(x) ∧ TeachesClasses(x))
---


INFO:__main__:Generated 1 beam hypotheses.
INFO:__main__:Encoding generated FOL statements into hyperdimensional vectors...
INFO:__main__:Calculating similarities with templatic references...
INFO:__main__:Ranking completed.
INFO:__main__:
Processing NL Input: There exists a student who aces every test.
INFO:__main__:Input Text: Translate English to FOL: There exists a student who aces every test.
INFO:__main__:Generating beam outputs...



---
NL Input: All humans are mortal.
Standard Decoding FOL: ∀x (Human(x) → Mortal(x))
Constrained Decoding FOL: ∀x (Human(x) → Mortal(x))
Hyperdimensional Ranking FOL: ∀x (Human(x) → Mortal(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9314, FOL: ∀x (Human(x) → Mortal(x))
---


INFO:__main__:Generated 1 beam hypotheses.
INFO:__main__:Encoding generated FOL statements into hyperdimensional vectors...
INFO:__main__:Calculating similarities with templatic references...
INFO:__main__:Ranking completed.



---
NL Input: There exists a student who aces every test.
Standard Decoding FOL: ∃x (Student(x) ∧ AcesTest(x))
Constrained Decoding FOL: ∃x (Student(x) ∧ AcesTest(x))
Hyperdimensional Ranking FOL: ∃x (Student(x) ∧ AcesTest(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9255, FOL: ∃x (Student(x) ∧ AcesTest(x))
---


In [31]:
import torch
from transformers import T5Tokenizer, T5ForConditionalGeneration
from transformers.generation.logits_process import LogitsProcessor, LogitsProcessorList
import numpy as np
import os
import logging
import random
from datasets import load_dataset
from sklearn.metrics import accuracy_score
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.bleu_score import SmoothingFunction

def compare_methods(subset_size=None, subset_fraction=None, num_qualitative=5):
    """
    Compare different decoding methods for NL to FOL translation.

    Parameters:
    - subset_size (int): Number of test examples to evaluate.
    - subset_fraction (float): Fraction of test examples to evaluate (e.g., 0.1 for 10%).
    - num_qualitative (int): Number of examples to log for qualitative analysis.
    """
    # Configuration constants
    MODEL_REPO = "snewmanaa/flan-t5-NL-FOL-baseline"
    MAX_LENGTH = 128
    EMBEDDING_DIM = 1024
    BEAM_SIZE = 5
    TEST_SPLIT = "test"

    SYMBOL_TO_PLACEHOLDER = {
        "∀": "[forall]",
        "∃": "[exists]",
        "¬": "[not]",
        "→": "[implies]",
        "∧": "[and]",
        "∨": "[or]",
        "⊕": "[oplus]",
        "↔": "[iff]",
    }

    PLACEHOLDER_TO_SYMBOL = {v: k for k, v in SYMBOL_TO_PLACEHOLDER.items()}

    def load_embeddings(symbol_embeddings_path="hyperdimensional_embeddings/symbol_embeddings.npy",
                       type_embeddings_path="hyperdimensional_embeddings/type_embeddings.npy"):
        """
        Load hyperdimensional embeddings from .npy files.
        """
        try:
            symbol_embeddings = np.load(symbol_embeddings_path, allow_pickle=True).item()
            type_embeddings = np.load(type_embeddings_path, allow_pickle=True).item()
            logger.info("Loaded hyperdimensional embeddings successfully.")
            return symbol_embeddings, type_embeddings
        except Exception as e:
            logger.error(f"Error loading embeddings: {e}")
            return None, None

    def load_variables(file_path="hyperdimensional_embeddings/variables.txt"):
        """
        Load variables list from a text file.
        """
        try:
            with open(file_path, "r") as f:
                variables = [line.strip() for line in f.readlines()]
            logger.info(f"Loaded {len(variables)} variables from '{file_path}'.")
            return variables
        except Exception as e:
            logger.error(f"Error loading variables from '{file_path}': {e}")
            return []

    def load_test_set(dataset_name="yuan-yang/MALLS-v0", split=TEST_SPLIT, 
                      subset_size=None, subset_fraction=None):
        """
        Load the test set from the dataset, optionally loading a subset (for time)
        """
        try:
            dataset = load_dataset(dataset_name, split=split)
            logger.info(f"Loaded test set '{split}' with {len(dataset)} examples.")

            if subset_size:
                subset_size = min(subset_size, len(dataset))
                dataset = dataset.select(range(subset_size))
                logger.info(f"Selected subset of size {subset_size} for evaluation.")
            elif subset_fraction:
                subset_size = int(len(dataset) * subset_fraction)
                subset_size = min(subset_size, len(dataset))
                dataset = dataset.select(range(subset_size))
                logger.info(f"Selected subset of size {subset_size} ({subset_fraction*100}%) for evaluation.")

            return dataset
        except Exception as e:
            logger.error(f"Error loading test set '{split}': {e}")
            return None

    def postprocess(decoded_texts):
        """
        Replace placeholders with original symbols
        """
        restored_texts = []
        for text in decoded_texts:
            for ph, sym in PLACEHOLDER_TO_SYMBOL.items():
                text = text.replace(ph, sym)
            restored_texts.append(text)
        return restored_texts

    def encode_fol(fol, symbol_embeddings, type_embeddings):
        """
        Encode a FOL string into a hyperdimensional vector
        """
        tokens = fol.replace('(', ' ').replace(')', ' ').replace(',', ' ').split()
        encoded = np.zeros(EMBEDDING_DIM)
        for token in tokens:
            if token in symbol_embeddings:
                symbol_vec = symbol_embeddings[token]
                if token in ['[forall]', '[exists]']:
                    type_vec = type_embeddings.get('quantifier', np.zeros(EMBEDDING_DIM))
                elif token in ['[not]', '[implies]', '[and]', '[or]']:
                    type_vec = type_embeddings.get('operator', np.zeros(EMBEDDING_DIM))
                elif token.islower():
                    type_vec = type_embeddings.get('variable', np.zeros(EMBEDDING_DIM))
                elif token.isupper():
                    type_vec = type_embeddings.get('predicate', np.zeros(EMBEDDING_DIM))
                else:
                    type_vec = type_embeddings.get('constant', np.zeros(EMBEDDING_DIM))
                # Combine symbol and type vectors
                combined = symbol_vec * type_vec
                encoded += combined
        return encoded

    def cosine_similarity(vec1, vec2):
        """
        Compute cosine similarity between two vectors
        """
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        if norm1 == 0 or norm2 == 0:
            return -1  # Minimal similarity
        return np.dot(vec1, vec2) / (norm1 * norm2)

    def calculate_metrics(gold, predictions):
        """
        Calculate evaluation metrics
        """
        # Filter out any None or empty string predictions
        filtered_gold = []
        filtered_preds = []
        for g, p in zip(gold, predictions):
            if p:
                filtered_gold.append(g)
                filtered_preds.append(p)

        if not filtered_gold:
            logger.warning("No valid predictions to calculate metrics.")
            return 0.0, 0.0, 0.0, []

        # Exact Match
        exact_matches = [1 if g == p else 0 for g, p in zip(filtered_gold, filtered_preds)]
        accuracy = accuracy_score(filtered_gold, filtered_preds)

        # BLEU Score
        smoothie = SmoothingFunction().method4
        bleu_scores = [sentence_bleu([g.split()], p.split(), smoothing_function=smoothie) 
                       for g, p in zip(filtered_gold, filtered_preds)]
        avg_bleu = np.mean(bleu_scores)

        return accuracy, avg_bleu, exact_matches

    class QuantifierFollowProcessor(LogitsProcessor):
        """
        Custom LogitsProcessor to enforce that quantifiers are followed by variables
        """

        def __init__(self, quantifier_ids, variable_ids, device):
            super().__init__()
            self.quantifier_ids = quantifier_ids
            self.variable_ids = variable_ids
            self.device = device

        def __call__(self, input_ids, scores):
            # Get the last generated token for each sequence in the batch
            last_token_ids = input_ids[:, -1]

            # Create a mask where the last token is a quantifier
            is_quantifier = torch.zeros_like(last_token_ids, dtype=torch.bool)
            for q_id in self.quantifier_ids:
                is_quantifier = is_quantifier | (last_token_ids == q_id)

            # For each sequence where the last token is a quantifier,
            # set all logits to -inf except for variable_ids
            for i, flag in enumerate(is_quantifier):
                if flag:
                    # Set all logits to -inf
                    scores[i] = torch.full_like(scores[i], -float("inf"))
                    # Allow only variable_ids by setting their logits to 0
                    scores[i, self.variable_ids] = 0.0

            return scores

    def standard_decoding(model, tokenizer, input_ids):
        """
        Standard decoding without constraints or ranking
        """
        logger.debug("Performing standard decoding...")
        try:
            with torch.no_grad():
                output_ids = model.generate(
                    input_ids=input_ids,
                    max_length=MAX_LENGTH,
                    num_beams=BEAM_SIZE,
                    early_stopping=True,
                    return_dict_in_generate=False,
                    output_scores=False,
                    output_hidden_states=False
                )
            decoded = tokenizer.decode(output_ids[0], skip_special_tokens=True)
            final_output = postprocess([decoded])[0]
            logger.debug(f"Standard Decoding FOL: {final_output}")
            return final_output
        except Exception as e:
            logger.error(f"Error during standard decoding: {e}")
            return ""  # Return empty string instead of None

    def constrained_decoding(model, tokenizer, input_ids, quantifier_ids, 
                             variable_ids, device):
        """
        Decoding with rule-based constraints
        """
        logger.debug("Performing constrained decoding...")
        try:
            # Initialize custom logits processor
            custom_processor = QuantifierFollowProcessor(quantifier_ids, variable_ids, device)
            custom_processors = LogitsProcessorList([custom_processor])

            with torch.no_grad():
                output_ids = model.generate(
                    input_ids=input_ids,
                    max_length=MAX_LENGTH,
                    num_beams=BEAM_SIZE,
                    logits_processor=custom_processors,
                    early_stopping=True,
                    return_dict_in_generate=False,
                    output_scores=False,
                    output_hidden_states=False
                )
            decoded = tokenizer.decode(output_ids[0], skip_special_tokens=True)
            final_output = postprocess([decoded])[0]
            logger.debug(f"Constrained Decoding FOL: {final_output}")
            return final_output
        except Exception as e:
            logger.error(f"Error during constrained decoding: {e}")
            return ""  # Return empty string instead of None

    def hyperdimensional_ranking_decoding(model, tokenizer, input_ids, 
                                          symbol_embeddings, type_embeddings, 
                                          print_beams=False, nl_sentence="", 
                                          gold_fol=""):
        """
        Decoding with hyperdimensional embeddings ranking using templatic references.
        Optionally print beam hypotheses and similarities
        """
        logger.debug("Performing hyperdimensional embedding ranking decoding...")
        try:
            with torch.no_grad():
                outputs = model.generate(
                    input_ids=input_ids,
                    max_length=MAX_LENGTH,
                    num_beams=BEAM_SIZE,
                    early_stopping=False,  # Allow beam search to explore all beams
                    return_dict_in_generate=True,
                    output_scores=False,
                    output_hidden_states=False,
                    num_return_sequences=BEAM_SIZE  # All beam hypotheses are returned
                )
            beam_outputs = outputs.sequences
            decoded_outputs = tokenizer.batch_decode(beam_outputs, skip_special_tokens=True)
            decoded_outputs = postprocess(decoded_outputs)
            logger.debug(f"Generated {len(decoded_outputs)} beam hypotheses")

            # Encode the generated FOL statements
            logger.debug("Encoding generated FOL statements into hyperdimensional vectors...")
            encoded_generated = [encode_fol(fol, symbol_embeddings, type_embeddings) for fol in decoded_outputs]

            # Define templatic reference encodings
            templates = [
                "∀x (Predicate(x) → Predicate(x))",
                "∃x (Predicate(x) ∧ Predicate(x))",
                "∀x (Predicate(x) ↔ Predicate(x))",
                "∃x (Predicate(x) → Predicate(x))",
                "∀x ∃y (Predicate(x, y))",
                "∀x (Predicate1(x) → ∃y Predicate2(y))",
                "∃x ∀y (Predicate(x) → Predicate(y))",
                "∀x (Predicate(x) ∨ Predicate(x))",
                "∃x (¬Predicate(x))",
                "∀x ∀y (Predicate(x, y) → Predicate(y, x))",
                "∀x ∃y (Predicate1(x) ∧ Predicate2(y))",
                "∃x ∃y (Predicate1(x) ∨ Predicate2(y))",
                "∀x (¬Predicate(x))",
                "∃x (Predicate(x) → ∃y Predicate(y))",
                "∀x ∀y ∃z (Predicate(x, y, z))"
            ]
            reference_embeddings = [encode_fol(template, symbol_embeddings, type_embeddings) for template in templates]

            # Compute similarity of each hypothesis with all templates and select the highest
            logger.debug("Calculating similarities with templatic references...")
            best_similarity = -1
            best_fol = ""
            for fol, gen_vec in zip(decoded_outputs, encoded_generated):
                # Compute similarity with each template and take the maximum
                sims = [cosine_similarity(gen_vec, ref_vec) for ref_vec in reference_embeddings]
                max_sim = max(sims)
                if max_sim > best_similarity:
                    best_similarity = max_sim
                    best_fol = fol

            logger.debug("Ranking completed")

            # Display ranked hypotheses only if requested
            if print_beams:
                print("\n--- Qualitative Analysis ---")
                print(f"Reference NL: {nl_sentence}")
                print(f"Gold FOL: {gold_fol}")
                print(f"Hyperdimensional Ranking FOL: {best_fol}")
                print("All Beam Hypotheses and Similarities:")
                for fol, gen_vec in zip(decoded_outputs, encoded_generated):
                    # Compute similarity with each template and take the maximum
                    sims = [cosine_similarity(gen_vec, ref_vec) for ref_vec in reference_embeddings]
                    max_sim = max(sims)
                    print(f"Similarity: {max_sim:.4f}, FOL: {fol}")
                print("---\n")

            return best_fol
        except Exception as e:
            logger.error(f"Error during hyperdimensional embedding ranking decoding: {e}")
            return ""  # Return empty string instead of None

    # Load tokenizer and model
    try:
        tokenizer = T5Tokenizer.from_pretrained(MODEL_REPO)
        model = T5ForConditionalGeneration.from_pretrained(MODEL_REPO)
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        model.to(device)
        logger.info(f"Loaded model and moved to {device}.")
    except Exception as e:
        logger.error(f"Error loading model: {e}")
        return

    # Load hyperdimensional embeddings
    symbol_embeddings, type_embeddings = load_embeddings()
    if symbol_embeddings is None or type_embeddings is None:
        logger.error("Failed to load embeddings. Exiting.")
        return

    # Load variables
    variables = load_variables()
    if not variables:
        logger.error("No variables loaded. Exiting.")
        return

    # Convert variables to IDs
    variable_ids = tokenizer.convert_tokens_to_ids(variables)
    logger.info(f"Variable IDs: {variable_ids}")

    # Define quantifier tokens
    quantifiers = ["[forall]", "[exists]"]
    quantifier_ids = tokenizer.convert_tokens_to_ids(quantifiers)
    logger.info(f"Quantifier IDs: {quantifier_ids}")

    # Load test set with optional subset
    test_set = load_test_set(subset_size=subset_size, subset_fraction=subset_fraction)
    if test_set is None:
        logger.error("No test set loaded. Exiting.")
        return

    # Initialize lists for metrics
    gold_fols = []
    standard_preds = []
    constrained_preds = []
    ranked_preds = []

    # Select unique random examples for qualitative analysis
    total_examples = len(test_set)
    num_random = min(num_qualitative, total_examples)
    if total_examples < num_qualitative:
        logger.warning(f"Test set contains only {total_examples} examples.")
        num_random = total_examples
    random_examples = set(random.sample(range(total_examples), num_random))

    for idx, example in enumerate(test_set):
        nl_sentence = example.get('NL', '')
        gold_fol = example.get('FOL', '')

        if not isinstance(nl_sentence, str) or not isinstance(gold_fol, str):
            logger.warning(f"Missing or invalid NL or FOL at index {idx}. Skipping.")
            continue

        input_text = "Translate English to FOL: " + nl_sentence
        input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)
        logger.info(f"\nProcessing NL Input: {nl_sentence}")

        # Standard Decoding
        standard_fol = standard_decoding(model, tokenizer, input_ids, device)
        standard_preds.append(standard_fol if standard_fol else "")
        gold_fols.append(gold_fol)

        # Constrained Decoding
        constrained_fol = constrained_decoding(model, tokenizer, input_ids, quantifier_ids, variable_ids, device)
        constrained_preds.append(constrained_fol if constrained_fol else "")

        # Hyperdimensional Embedding Ranking Decoding
        # Print beams only for selected random examples
        print_beams = idx in random_examples
        ranked_fol = hyperdimensional_ranking_decoding(
            model, tokenizer, input_ids, symbol_embeddings, type_embeddings, device,
            print_beams=print_beams,
            nl_sentence=nl_sentence,
            gold_fol=gold_fol
        )
        ranked_preds.append(ranked_fol if ranked_fol else "")

    # Calculate Metrics
    logger.info("Calculating evaluation metrics...")
    accuracy_std, avg_bleu_std, exact_matches_std = calculate_metrics(gold_fols, standard_preds)
    accuracy_constrained, avg_bleu_constrained, exact_matches_constrained = calculate_metrics(gold_fols, constrained_preds)
    accuracy_ranked, avg_bleu_ranked, exact_matches_ranked = calculate_metrics(gold_fols, ranked_preds)

    # Print Metrics
    print("\n=== Evaluation Metrics ===")
    print(f"Standard Decoding - Accuracy: {accuracy_std:.4f}, Avg BLEU: {avg_bleu_std:.4f}")
    print(f"Constrained Decoding - Accuracy: {accuracy_constrained:.4f}, Avg BLEU: {avg_bleu_constrained:.4f}")
    print(f"Hyperdimensional Ranking Decoding - Accuracy: {accuracy_ranked:.4f}, Avg BLEU: {avg_bleu_ranked:.4f}")
    print("==========================\n")


In [32]:
compare_methods(subset_size=200, num_qualitative=3)


INFO:__main__:Loaded model and moved to cuda.
INFO:__main__:Loaded hyperdimensional embeddings successfully.
INFO:__main__:Loaded 620 variables from 'hyperdimensional_embeddings/variables.txt'.
INFO:__main__:Variable IDs: [2, 2, 2, 26, 4250, 7325, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 19747, 2, 2, 2, 22288, 2, 2, 2, 2, 2, 2, 2, 15974, 2, 107, 2, 2, 20779, 2, 2256, 2, 2, 20113, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3009, 2, 5842, 2, 2, 2, 2, 2, 2, 63, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1135, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 29, 2, 28188, 2, 2, 2, 2, 14700, 20309, 2, 2, 2, 2857, 2, 2, 2, 2, 2, 2, 2, 2, 2, 7149, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 5556, 2, 6075, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2242, 2, 2, 26809, 2, 2, 6633, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2138, 2, 2, 2, 2, 2, 5165, 2, 2, 2, 2, 8115, 2, 2, 2, 2, 2, 2, 2, 6779, 2, 2, 2, 2, 2, 2, 2, 2, 15727, 2, 28491, 2, 2, 2, 2, 2, 2, 2, 24671, 2, 2, 2, 20316, 2, 2, 2, 2, 17396, 2, 2, 2, 2, 2, 40, 2, 2, 2, 10279, 2, 


--- Qualitative Analysis ---
Reference NL: A sport that is played between two teams on a field with a ball and the objective is to score points by getting the ball into the opposing team's goal is a goal-based sport.
Gold FOL: ∀x (Sport(x) ∧ PlayedBetweenTwoTeams(x) ∧ PlayedOnField(x) ∧ PlayedWithBall(x) ∧ ObjectiveToScorePointsByGettingBallIntoOpposingTeamsGoal(x) → GoalBasedSport(x))
Hyperdimensional Ranking FOL: ∀x (Sport(x) ∧ PlayedBetweenTwoTeams(x) ∧ FieldWithBall(x) ∧ ObjectiveIsToScorePoints(x) ∧ GetsBallIntoOpposingTeamGoal(x) → GoalBasedSport(x))
All Beam Hypotheses and Similarities:
Similarity: 0.8652, FOL: ∀x (Sport(x) ∧ PlayedBetweenTwoTeams(x) ∧ FieldWithBall(x) ∧ ObjectiveToScorePoints(x) ∧ GetsBallIntoOpposingTeamGoal(x) → GoalBasedSport(x))
Similarity: 0.8736, FOL: ∀x (Sport(x) ∧ PlayedBetweenTwoTeams(x) ∧ FieldWithBall(x) ∧ ObjectiveIsToScorePoints(x) ∧ GetsBallIntoOpposingTeamGoal(x) → GoalBasedSport(x))
Similarity: 0.8729, FOL: ∀x (Sport(x) ∧ PlayedBetweenTwoTeams(

INFO:__main__:
Processing NL Input: A detective solves crimes by gathering evidence and analyzing clues.
INFO:__main__:
Processing NL Input: Some paintings are abstract and created by famous artists.
INFO:__main__:
Processing NL Input: An instrument is either a string instrument or a wind instrument, but not both.
INFO:__main__:
Processing NL Input: Mammals give birth to live young and nurse their offspring with milk produced by the mother.
INFO:__main__:
Processing NL Input: A vaccine that is effective provides immunity, reduces the risk of infection, and often contributes to herd immunity in a population.
INFO:__main__:
Processing NL Input: A satellite orbits Earth, transmitting and receiving data, enabling services such as weather forecasting, global positioning, and communication.
INFO:__main__:
Processing NL Input: A flower blooms in spring if it requires a specific amount of sunlight and warmth for its growth and development.
INFO:__main__:
Processing NL Input: A tree loses its l


--- Qualitative Analysis ---
Reference NL: A building has either an elevator or stairs, but not both.
Gold FOL: ∀x (Building(x) → ((HasElevator(x) ⊕ HasStairs(x))))
Hyperdimensional Ranking FOL: ∀x (Building(x) → (HasElevator(x)  HasStoves(x)))
All Beam Hypotheses and Similarities:
Similarity: 0.8810, FOL: ∀x (Building(x) → (Elevator(x)  Stairs(x)))
Similarity: 0.9075, FOL: ∀x (Building(x) → (HasElevator(x)  HasStoves(x)))
Similarity: 0.8810, FOL: ∀x (Building(x) → ((Elevator(x)  Stairs(x))))
Similarity: 0.9075, FOL: ∀x (Building(x) → ((HasElevator(x)  HasStoves(x))))
Similarity: 0.8733, FOL: ∀x (Building(x) → (Exercise(x)  Stairs(x)))
---


INFO:__main__:
Processing NL Input: Trees absorb carbon dioxide, while cars emit carbon dioxide, and factories release carbon dioxide as a byproduct.
INFO:__main__:
Processing NL Input: A person who designs and creates video games is called a game developer.
INFO:__main__:
Processing NL Input: An apple is red or green, but not both.
INFO:__main__:
Processing NL Input: Organisms that can produce their own food through photosynthesis, provide energy for other organisms, and do not consume other organisms are primary producers.
INFO:__main__:
Processing NL Input: A device that converts thermal energy into electrical energy by using a temperature difference is a thermoelectric generator.
INFO:__main__:
Processing NL Input: A dessert that is made from milk, sugar, and flavorings and is frozen is ice cream.
INFO:__main__:
Processing NL Input: Birds migrate when the season changes and the temperature becomes unsuitable for their survival.
INFO:__main__:
Processing NL Input: A food product is 


--- Qualitative Analysis ---
Reference NL: There exist shapes that are neither squares nor circles nor triangles.
Gold FOL: ∃x (Shape(x) ∧ ¬(Square(x) ∨ Circle(x) ∨ Triangle(x)))
Hyperdimensional Ranking FOL: ∃x (Shape(x) ∧ ¬Squares(x) ∧ ¬Crowns(x) ∧ ¬Triangles(x))
All Beam Hypotheses and Similarities:
Similarity: 0.9160, FOL: ∃x (Shape(x) ∧ ¬Square(x) ∧ ¬Crown(x) ∧ ¬Triangle(x))
Similarity: 0.9160, FOL: ∃x (Shape(x) ∧ ¬Square(x) ∧ ¬Crowd(x) ∧ ¬Triangle(x))
Similarity: 0.7914, FOL: ∃x (Shape(x) ∧ ¬(Square(x) ∨ Circle(x) ∨ Triangle(x)))
Similarity: 0.9471, FOL: ∃x (Shape(x) ∧ ¬Squares(x) ∧ ¬Crowns(x) ∧ ¬Triangles(x))
Similarity: 0.8094, FOL: ∃x (Shape(x) ∧ ¬Square(x) ∨ Circle(x) ∨ Triangle(x))
---


INFO:__main__:
Processing NL Input: Rain occurs when clouds release water droplets.
INFO:__main__:
Processing NL Input: Chemists study chemical reactions, while biologists examine living organisms.
INFO:__main__:
Processing NL Input: A liquid turns into a gas if it reaches its boiling point.
INFO:__main__:
Processing NL Input: A smartphone is water-resistant if it has a protective casing and sealed ports.
INFO:__main__:
Processing NL Input: A computer can have a desktop, laptop, or tablet form factor, but not more than one form factor.
INFO:__main__:
Processing NL Input: A musical instrument is considered versatile if it can produce a wide range of tones and is suitable for various musical styles.
INFO:__main__:
Processing NL Input: A dancer performs on a stage.
INFO:__main__:
Processing NL Input: A planet orbits a star only when it has a stable elliptical path, and it is not a moon.
INFO:__main__:
Processing NL Input: A material is considered elastic if it returns to its original shap


=== Evaluation Metrics ===
Standard Decoding - Accuracy: 0.1750, F1 Score: 0.1750, Avg BLEU: 0.3916
Constrained Decoding - Accuracy: 0.1750, F1 Score: 0.1750, Avg BLEU: 0.3867
Hyperdimensional Ranking Decoding - Accuracy: 0.0700, F1 Score: 0.0700, Avg BLEU: 0.3037
