# Imports

In [1]:
import os
import pickle
from torch import nn
import torch
from openai import OpenAI
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel

# File Path Declaration

In [2]:
project_base_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.getcwd())))))
project_base_path

'/home/ANONYMOUS/projects/FALCON'

In [3]:
saved_v1_generated_data_path = os.path.join(project_base_path, "data/generation/yara/yara-rules_v1.pkl")
saved_v1_generated_data_path

'/home/ANONYMOUS/projects/FALCON/data/generation/yara/yara-rules_v1.pkl'

In [4]:
saved_v2_generated_data_path = os.path.join(project_base_path, "data/generation/yara/yara-rules_v2.pkl")
saved_v2_generated_data_path

'/home/ANONYMOUS/projects/FALCON/data/generation/yara/yara-rules_v2.pkl'

In [5]:
generated_rule_dir_path = os.path.join(project_base_path, "results/LLM_Rule_GtRule/quantitative/yara/gpt_4o")
generated_rule_dir_path

'/home/ANONYMOUS/projects/FALCON/results/LLM_Rule_GtRule/quantitative/yara/gpt_4o'

# Environment

In [6]:
open_ai_key = "OPENAI_KEY"
client = OpenAI(api_key=open_ai_key)
open_ai_model_name = "gpt-4o"
SEED = 42
DEVICE = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
MAX_LEN = 512

In [13]:
def get_open_ai_response(prompt: str, model_name: str = open_ai_model_name) -> str:

  # Set the client with API key
  client = OpenAI(
    api_key=open_ai_key,  # This is the default and can be omitted
  )

  try:
      # Call the OpenAI API
      chat_completion = client.chat.completions.create(
          messages=[{"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt}],
          model=model_name
      )

      # Extract and return the assistant's reply
      return chat_completion.choices[0].message.content

  except Exception as e:
      return f"@@$$## Error communicating with OpenAI API: {str(e)}"

In [14]:
get_open_ai_response("Hello, which gpt version are you?")

"Hello! I am based on OpenAI's GPT-4.0 architecture. How can I assist you today?"

# Helper Functions

In [15]:
def load_from_pickle(file_path) -> dict:
    """
    Loads data from a pickle file.

    :param file_path: Path to the pickle file
    :return: Loaded data
    """
    try:
        with open(file_path, 'rb') as file:
            return pickle.load(file)
    except Exception as e:
        print(f"Error loading data from pickle: {e}")
        return None

In [16]:
def get_first_n_elements(dictionary: dict, n: int) -> dict:
    """
    Get the first n elements of a dictionary.

    :param dictionary: The input dictionary
    :param n: The number of elements to retrieve
    :return: A dictionary with the first n elements
    """
    return dict(list(dictionary.items())[:n])

In [18]:
def save_string_as_txt(directory_path, file_name, content):
    """
    Saves a given string as a .txt file in the specified directory.

    Args:
        directory_path (str): Path to the directory where the file should be saved.
        file_name (str): Desired name of the file (with or without .txt extension).
        content (str): The string content to be written to the file.

    Returns:
        str: Full path to the saved file if successful, otherwise an empty string.
    """
    if not file_name.lower().endswith('.txt'):
        file_name += '.txt'
    
    file_path = os.path.join(directory_path, file_name)

    try:
        os.makedirs(directory_path, exist_ok=True)  # Create directory if it doesn't exist
        with open(file_path, 'w', encoding='utf-8') as file:
            file.write(content)
        return file_path
    except PermissionError:
        print(f"Error: Permission denied to write to '{file_path}'.")
    except Exception as e:
        print(f"An error occurred while saving the file: {e}")

    return ""

# Data Generation

In [19]:
# Load the data back from the pickle file
loaded_v1_data = load_from_pickle(saved_v1_generated_data_path)
print(len(loaded_v1_data.keys()))

4588


In [20]:
yara_cti_sample_dict = get_first_n_elements(loaded_v1_data, 10)

In [21]:
# Load the data back from the pickle file
loaded_v2_data = load_from_pickle(saved_v2_generated_data_path)
print(len(loaded_v2_data.keys()))

4587


In [22]:
yara_cti_sample_dict

{'rule MSIETabularActivex\n{\n        meta:\n                ref = "CVE-2010-0805"\n                impact = 7\n                hide = true\n                author = "@d3t0n4t0r"\n        strings:\n                $cve20100805_1 = "333C7BC4-460F-11D0-BC04-0080C7055A83" nocase fullword\n                $cve20100805_2 = "DataURL" nocase fullword\n                $cve20100805_3 = "true"\n        condition:\n                ($cve20100805_1 and $cve20100805_3) or (all of them)\n}': 'Rule Name\n  MSIETabularActivex\n\nDescription\n  This YARA rule detects a specific vulnerability (CVE-2010-0805) associated with an ActiveX control. The rule targets potentially malicious strings that could be used in exploit attempts related to this vulnerability.\n\nReference\n  CVE-2010-0805\n\nIndicators / String Matches\n  This rule matches the following strings:\n\n  String ID\tPattern\tNotes\n  $cve20100805_1\t"333C7BC4-460F-11D0-BC04-0080C7055A83"\tActiveX control CLSID\n  $cve20100805_2\t"DataURL"\tPos

In [23]:
yaras, ctis = zip(*yara_cti_sample_dict.items())
yaras = list(yaras)
ctis = list(ctis)

In [24]:
len(yaras), len(ctis)

(10, 10)

In [25]:
def format_cti_yara_data_to_training_data(data: list[dict]) -> list[tuple]:
    """
    Format the CTI YARA data into training data.

    :param data: The data to format
    :return: Formatted training data
    """
    training_data = []
    for dataset in data:
        for key, value in dataset.items():
            training_data.append((key, value))
    return training_data

In [26]:
# Sample Dataset Format (list of (anchor, positive) sentence pairs)
full_dataset = format_cti_yara_data_to_training_data([loaded_v1_data, loaded_v2_data])
print(len(full_dataset))

9175


In [27]:
def remove_10_test_samples(training_data: list[tuple], test_pairs: dict) -> list[tuple]:
    # Extract all test keys and values into sets for quick lookup
    test_keys = set(test_pairs.keys())
    test_values = set(test_pairs.values())
    
    # Filter training data
    filtered_data = [(key, value) for key, value in training_data if key not in test_keys and value not in test_values]
    
    return filtered_data

In [28]:
# Sample Dataset Format (list of (anchor, positive) sentence pairs)
full_dataset = remove_10_test_samples(full_dataset, yara_cti_sample_dict)
print(len(full_dataset))

9155


In [29]:
# Split into training and testing sets (80% train, 20% test)
train_pairs, test_pairs = train_test_split(full_dataset, test_size=0.1, random_state=SEED)

In [30]:
len(test_pairs)

916

# Dataset Class

In [31]:
# Custom Dataset
class ContrastiveDataset(Dataset):
    def __init__(self, data, tokenizer):
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        anchor, positive = self.data[idx]
        encoded = self.tokenizer([anchor, positive], padding="max_length", truncation=True,
                                 max_length=MAX_LEN, return_tensors="pt")
        return {
            "input_ids_a": encoded["input_ids"][0],
            "attention_mask_a": encoded["attention_mask"][0],
            "input_ids_b": encoded["input_ids"][1],
            "attention_mask_b": encoded["attention_mask"][1],
        }

In [32]:
# Bi-Encoder Model
class SentenceEncoder(nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        embeddings = outputs.last_hidden_state[:, 0]  # CLS token
        return nn.functional.normalize(embeddings, p=2, dim=1)  # Normalize for cosine similarity


# Evaluation Functions

In [33]:
def generate_rule_from_cti_prompt(input_cti: str) -> str:

  rule_generation_prompt = f"""

    You are a cybersecurity expert tasked with performing YARA rule generation for a given Cyber Threat Intelligence (CTI).
    There is a sample task input and output provided below.
    
    Sample CTI Input and corresponding YARA Output:

    CTI Input:
        
      Rule Name
        APT30_Sample_2

      Description
        This YARA rule is designed to detect a specific malware sample associated with the APT30 threat group, as documented in a report by FireEye. The binary appears to be masquerading as a legitimate Microsoft Word-related executable, with embedded strings referencing ForZRLnkWordDlg.EXE, suggesting impersonation of Microsoft Office components.

      Reference
        FireEye APT30 Report
        Full Report: https://www2.fireeye.com/rs/fireye/images/rpt-apt30.pdf

      Indicators / String Matches
        This rule matches the following wide (Unicode) strings:

        String ID	Pattern	Notes
        $s0	"ForZRLnkWordDlg.EXE"	Executable filename
        $s1	"ForZRLnkWordDlg Microsoft "	Vendor impersonation
        $s9	"ForZRLnkWordDlg 1.0 "	Fake version info
        $s11	"ForZRLnkWordDlg"	Generic name
        $s12	" (C) 2011"	Fake copyright year
        
        All string patterns use fullword and wide modifiers, meaning they match exact full Unicode words.

      Condition Logic
        The rule triggers if:

        The file size is less than 100KB.

        The file has a valid DOS MZ header (uint16(0) == 0x5A4D).

        All of the specified string patterns ($s0 through $s12) are found.

      Known File Hash
        SHA1: 0359ffbef6a752ee1a54447b26e272f4a5a35167

      Rule UUID
        821a2de9-48c4-58d8-acc4-1e25025ab5cf

      
    YARA Output:
        
      rule APT30_Sample_2 {{
        meta:
          description = "FireEye APT30 Report Sample"
          license = "Detection Rule License 1.1 https://github.com/Neo23x0/signature-base/blob/master/LICENSE"
          author = "Florian Roth (Nextron Systems)"
          reference = "https://www2.fireeye.com/rs/fireye/images/rpt-apt30.pdf"
          date = "2015/04/13"
          hash = "0359ffbef6a752ee1a54447b26e272f4a5a35167"
          id = "821a2de9-48c4-58d8-acc4-1e25025ab5cf"
        strings:
          $s0 = "ForZRLnkWordDlg.EXE" fullword wide
          $s1 = "ForZRLnkWordDlg Microsoft " fullword wide
          $s9 = "ForZRLnkWordDlg 1.0 " fullword wide
          $s11 = "ForZRLnkWordDlg" fullword wide
          $s12 = " (C) 2011" fullword wide
        condition:
          filesize < 100KB and uint16(0) == 0x5A4D and all of them
      }}


    Generate YARA from the provided CTI. Do not include anything that is not provided.
    Do not print anything like sure here is the CTI or anything else. Only print the CTI. 

    CTI Input: 
    
      {input_cti}

    YARA Output:

  """

  return rule_generation_prompt

# Test Code

In [34]:
test_ctis = [i[1] for i in test_pairs]
gt_rules = [i[0] for i in test_pairs]

In [35]:
len(test_ctis), len(gt_rules)

(916, 916)

In [36]:
prompt = generate_rule_from_cti_prompt(test_ctis[0])
print(prompt)



    You are a cybersecurity expert tasked with performing YARA rule generation for a given Cyber Threat Intelligence (CTI).
    There is a sample task input and output provided below.
    
    Sample CTI Input and corresponding YARA Output:

    CTI Input:
        
      Rule Name
        APT30_Sample_2

      Description
        This YARA rule is designed to detect a specific malware sample associated with the APT30 threat group, as documented in a report by FireEye. The binary appears to be masquerading as a legitimate Microsoft Word-related executable, with embedded strings referencing ForZRLnkWordDlg.EXE, suggesting impersonation of Microsoft Office components.

      Reference
        FireEye APT30 Report
        Full Report: https://www2.fireeye.com/rs/fireye/images/rpt-apt30.pdf

      Indicators / String Matches
        This rule matches the following wide (Unicode) strings:

        String ID	Pattern	Notes
        $s0	"ForZRLnkWordDlg.EXE"	Executable filename
        $s1	"Fo

In [37]:
test_rule = get_open_ai_response(prompt)
print(test_rule)

```yara
rule FSO_s_remview_2 {
  meta:
    description = "Detection of specific webshell file 'remview.php' targeting malicious PHP code patterns."
    hash = "b4a09911a5b23e00b55abe546ded691c"
  strings:
    $s0 = "<xmp>$out</"
    $s1 = ".mm(\"Eval PHP code\")."
  condition:
    all of them
}
```


# Generate Rule from CTI

In [38]:
generated_rules = []

inference_counter = 0
for cti in tqdm(test_ctis, "Generating YARA rules from CTIs..."):
  prompt = generate_rule_from_cti_prompt(cti)
  rule = get_open_ai_response(prompt)
  generated_rules.append(rule)
  file_name = f"quantitative_eval_yara_rule_{inference_counter}.txt"
  save_string_as_txt(generated_rule_dir_path, file_name, rule)
  inference_counter += 1

Generating YARA rules from CTIs...: 100%|██████████| 916/916 [42:44<00:00,  2.80s/it]  


In [39]:
len(generated_rules)

916

# CTI-Rule Semantic Evaluation

In [40]:
def compute_dot_product_matrix_batched(model, tokenizer, test_yaras, test_ctis, batch_size=64):
    # Tokenize yaras once (since all CTIs will be compared to them)
    tokenized_yaras = tokenizer(test_yaras, return_tensors="pt", padding=True, max_length=MAX_LEN, truncation=True)
    input_ids_yaras = tokenized_yaras["input_ids"].to(DEVICE)
    attention_mask_yaras = tokenized_yaras["attention_mask"].to(DEVICE)

    with torch.no_grad():
        emb_yaras = model(input_ids_yaras, attention_mask_yaras)  # (802, dim)
        emb_yaras = emb_yaras.detach()

    # Prepare output tensor for all dot products
    num_ctis = len(test_ctis)
    dot_product_matrix = []

    for i in range(0, num_ctis, batch_size):
        batch_ctis = test_ctis[i:i + batch_size]
        tokenized_ctis = tokenizer(batch_ctis, return_tensors="pt", padding=True, max_length=MAX_LEN, truncation=True)
        input_ids_ctis = tokenized_ctis["input_ids"].to(DEVICE)
        attention_mask_ctis = tokenized_ctis["attention_mask"].to(DEVICE)

        with torch.no_grad():
            emb_ctis = model(input_ids_ctis, attention_mask_ctis)  # (B, dim)
            dot_product_batch = torch.matmul(emb_ctis, emb_yaras.T)  # (B, 802)
            dot_product_matrix.append(dot_product_batch.cpu())

        # Cleanup
        del input_ids_ctis, attention_mask_ctis, emb_ctis, dot_product_batch
        torch.cuda.empty_cache()

    # Concatenate batches into full matrix
    dot_product_matrix = torch.cat(dot_product_matrix, dim=0)  # (802, 802)
    return dot_product_matrix


In [41]:
RUN = 1
FINE_TUNED_MODEL_NAME = "all-mpnet-base-v2"
MODEL_NAME = f"/data/common/models/sentence-transformers/{FINE_TUNED_MODEL_NAME}"
FINE_TUNED_MODEL_STATE_NAME = f"contrastive_encoder_r{RUN}.pt"
MODEL_SAVE_PATH = os.path.join(project_base_path, f"script/fine_tuning/bi-encoder/snort/{FINE_TUNED_MODEL_NAME}/{FINE_TUNED_MODEL_STATE_NAME}")


# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Load model
model = SentenceEncoder(MODEL_NAME).to(DEVICE)
model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location=DEVICE))

<All keys matched successfully>

In [42]:
dot_product_matrix_test = compute_dot_product_matrix_batched(
    model=model,
    tokenizer=tokenizer,
    test_yaras=generated_rules,
    test_ctis=gt_rules,
    batch_size=256
)

In [43]:
import numpy as np

def extract_sigmoid_diagonal(dot_product_matrix: torch.Tensor):
    """
    Extracts principal diagonal from a dot-product matrix,
    applies sigmoid to each value, and returns the sigmoid list.
    Also prints the mean of sigmoid values.

    Args:
        dot_product_matrix (torch.Tensor): A square matrix of shape [N x N]

    Returns:
        List[float]: Sigmoid values of diagonal entries
    """
    assert dot_product_matrix.shape[0] == dot_product_matrix.shape[1], "Matrix must be square."

    # Step 1: Extract diagonal
    diag_values = dot_product_matrix.diag()  # shape: (N,)

    # Step 2: Apply sigmoid
    sigmoid_values = torch.sigmoid(diag_values)

    # Step 3: Convert to list and compute mean
    sigmoid_list = sigmoid_values.tolist()
    mean_value = torch.mean(sigmoid_values).item()

    # Output
    print(f"Mean of sigmoid(diagonal values): {mean_value:.4f}")
    return sigmoid_list

In [44]:
sigmoid_diagonal_scores = extract_sigmoid_diagonal(dot_product_matrix_test)

Mean of sigmoid(diagonal values): 0.6710


# Ragas 

In [45]:
open_ai_key = "OPENAI_KEY"
os.environ['OPENAI_API_KEY'] = open_ai_key

In [46]:
import numpy as np
from langchain.embeddings import OpenAIEmbeddings

def compute_dot_product_matrix_openai(test_yaras, test_ctis, batch_size=50):
    embedder = OpenAIEmbeddings()  # Uses text-embedding-ada-002 by default

    # Step 1: Get embeddings for all yara rules
    yara_embeddings = []
    for i in range(0, len(test_yaras), batch_size):
        batch = test_yaras[i:i + batch_size]
        yara_embeddings.extend(embedder.embed_documents(batch))  # List of vectors

    yara_embeddings = np.array(yara_embeddings)  # Shape: (N, D)
    yara_embeddings_norm = np.linalg.norm(yara_embeddings, axis=1, keepdims=True)

    # Step 2: Compute batched dot products with CTIs
    dot_product_matrix = []

    for i in range(0, len(test_ctis), batch_size):
        batch = test_ctis[i:i + batch_size]
        cti_embeddings = embedder.embed_documents(batch)
        cti_embeddings = np.array(cti_embeddings)
        cti_embeddings_norm = np.linalg.norm(cti_embeddings, axis=1, keepdims=True)

        # Normalize and compute dot product
        sim_matrix = np.dot(cti_embeddings, yara_embeddings.T) / (
            cti_embeddings_norm @ yara_embeddings_norm.T
        )
        dot_product_matrix.append(sim_matrix)

    dot_product_matrix = np.vstack(dot_product_matrix)  # Final shape: (len(test_ctis), len(test_yaras))
    return dot_product_matrix

In [47]:
def extract_diagonal(dot_product_matrix: torch.Tensor):
    """
    Extracts principal diagonal from a dot-product matrix,
    applies sigmoid to each value, and returns the sigmoid list.
    Also prints the mean of sigmoid values.

    Args:
        dot_product_matrix (torch.Tensor): A square matrix of shape [N x N]

    Returns:
        List[float]: Sigmoid values of diagonal entries
    """
    assert dot_product_matrix.shape[0] == dot_product_matrix.shape[1], "Matrix must be square."

    # Step 1: Extract diagonal
    diag_values = dot_product_matrix.diag()  # shape: (N,)

    # Step 3: Convert to list and compute mean
    diag_list = diag_values.tolist()
    mean_value = torch.mean(diag_values).item()

    # Output
    print(f"Mean of sigmoid(diagonal values): {mean_value:.4f}")
    return diag_list

In [48]:
dot_product_matrix_test = compute_dot_product_matrix_openai(
    test_yaras=generated_rules,
    test_ctis=gt_rules,
    batch_size=50
)

  embedder = OpenAIEmbeddings()  # Uses text-embedding-ada-002 by default


In [49]:
diagonal_scores_openai = extract_diagonal(torch.tensor(dot_product_matrix_test))

Mean of sigmoid(diagonal values): 0.9196


# Bert

In [50]:
from bert_score import score

def compute_bert_scores(cti_list, generated_rules, lang="en", model_type="bert-base-uncased"):
    """
    Computes BERTScore between CTI descriptions and generated YARA rules.

    Args:
        cti_list (List[str]): List of CTI strings.
        generated_rules (List[str]): Corresponding generated YARA rule strings.
        lang (str): Language (default = "en").
        model_type (str): BERT model to use (default = DeBERTa-MNLI).

    Returns:
        dict: Precision, Recall, F1 scores (averaged) and all individual F1s.
    """
    assert len(cti_list) == len(generated_rules), "Mismatch in CTI and rule count."

    P, R, F1 = score(generated_rules, cti_list, lang=lang, model_type=model_type, verbose=True)

    mean_p = P.mean().item()
    mean_r = R.mean().item()
    mean_f1 = F1.mean().item()

    print(f"\nBERTScore Results:\nPrecision: {mean_p:.4f} | Recall: {mean_r:.4f} | F1: {mean_f1:.4f}")
    return {
        "precision": mean_p,
        "recall": mean_r,
        "f1": mean_f1,
        "f1_scores": F1.tolist()  # optional: return individual F1s
    }


In [51]:
bert_score_results = compute_bert_scores(gt_rules, generated_rules)

calculating scores...
computing bert embedding.


  0%|          | 0/28 [00:00<?, ?it/s]

computing greedy matching.


  0%|          | 0/15 [00:00<?, ?it/s]

done in 7.43 seconds, 123.25 sentences/sec

BERTScore Results:
Precision: 0.8329 | Recall: 0.8741 | F1: 0.8514
