# Store YARA

# Translate YARA Rules to Database Entries with Embeddings

This notebook demonstrates how to translate YARA rules into  and generate embeddings for these rules using OpenAI's GPT-4 model. The embeddings will be stored in a PostgreSQL database with the PGVector extension for efficient similarity searches.

## Setup

First, we need to set up the environment by loading necessary libraries and environment variables.

Ensure your OpenAI API Key is cofnigured in the .env file as OPENAI_API_KEY

This code performs the following steps if the YARA file is yara-rules-core.yar:

- Load Environment Variables: It loads environment variables from a .env file, including the OpenAI API key.

- Extract YARA Rules: It reads the YARA file and extracts the rules using regular expressions. The extracted rules include the rule name, tags, metadata, detection logic, and condition.

- Parse Metadata: It parses the metadata section of each YARA rule to extract key-value pairs.

- Parse Strings: It parses the strings section of each YARA rule, decoding any Base64-encoded strings and adding them to the parsed strings.

- Interpret Rules Using OpenAI: It sends each YARA rule to the OpenAI API to get an interpretation of the rule, breaking it down into metadata, detection logic, condition, and explanations of encoded strings.

- Store Interpreted Rules in PGVector: It connects to a PostgreSQL database with PGVector extension, stores the interpreted rules as documents in the database, and closes the connection.

In [None]:
import re
from openai import OpenAI,AsyncOpenAI
import base64
import psycopg2
import os
import dotenv
from langchain_core.documents import Document
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector
from langchain_openai import OpenAIEmbeddings
from tqdm import tqdm
import asyncio
import asyncpg


In [None]:
dotenv.load_dotenv()
# Ensure your OpenAI API Key is cofnigured in the .env file as OPENAI_API_KEY


True

In [3]:
openai_client = AsyncOpenAI()

In [None]:
# modify the connection string to match your postgresql database
# PostgreSQL PGVector Config
DB_PARAMS = {
    "database": "malware_kb",
    "user": "malware_admin",
    "password": "admin_secure_password",
    "host": "localhost",
    "port": "5432"
}

PGVECTOR_CONNECTION_STRING = f"postgresql+psycopg://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['database']}?options=-csearch_path=malware"


In [4]:
# Function to parse metadata
def parse_metadata(metadata_section):
    metadata = {}
    for line in metadata_section.split("\n"):
        line = line.strip()
        if "=" in line:
            key, value = line.split("=", 1)
            metadata[key.strip()] = value.strip().strip('"')
    return metadata

# Function to parse and interpret encoded strings
def parse_strings(strings_section):
    parsed_strings = []
    for line in strings_section.split("\n"):
        line = line.strip()
        if line.startswith("$"):
            parts = line.split("=", 1)
            if len(parts) == 2:
                key, value = parts
                value = value.strip().strip('"')
                if is_base64(value):
                    try:
                        decoded_value = base64.b64decode(value).decode("utf-8", errors="ignore")
                        parsed_strings.append(f"{key} = \"{value}\" (decoded: \"{decoded_value}\")")
                    except Exception:
                        parsed_strings.append(f"{key} = \"{value}\" (decoded: [failed])")
                else:
                    parsed_strings.append(line)
    return parsed_strings

# Function to check if a string is Base64
def is_base64(s):
    try:
        return base64.b64encode(base64.b64decode(s)).decode() == s
    except Exception:
        return False



In [5]:
async def interpret_rule(rule, semaphore):
    async with semaphore:  # ✅ Proper use of async context manager
        prompt = f"""
        Interpret the following YARA rule by breaking it down into:
        - **Detection Logic**: Explain in a sentence.
        - **Condition**: Explain what must be met for detection.
        - **Explanation of Encoded Strings (if any)**: Decode and explain.
        - **Overall Explanation**: How does the YARA Rule detect malware?

        YARA Rule:
        Rule Name: {rule['name']}
        Metadata: {rule['metadata']}
        Detection Logic: {rule['detection_logic']}
        Condition: {rule['condition']}
        """

        try:
            response = await openai_client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "You are a cybersecurity analyst."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=512
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error: {str(e)}"


In [6]:
# Function to parse metadata
def parse_metadata(metadata_section):
    metadata = {}
    for line in metadata_section.split("\n"):
        line = line.strip()
        if "=" in line:
            key, value = line.split("=", 1)
            metadata[key.strip()] = value.strip().strip('"')
    return metadata


In [7]:
# Function to extract YARA rules
def extract_yara_rules(file_path):
    with open(file_path, "r") as file:
        content = file.read()

    # Regular expression to match YARA rules
    yara_pattern = re.compile(
        r"rule\s+([\w_]+)\s*:\s*([\w\s,]*)\s*{\s*meta:\s*(.*?)\s*strings:\s*(.*?)\s*condition:\s*(.*?)}",
        re.DOTALL
    )

    rules = []
    for match in yara_pattern.finditer(content):
        rule_name, tags, metadata, detection_logic, condition = match.groups()
        rules.append({
            "name": rule_name.strip(),
            "tags": tags.strip(),
            "metadata": parse_metadata(metadata),
            "detection_logic": parse_strings(detection_logic),
            "condition": condition.strip()
        })
    
    return rules


In [9]:
async def interpret_all_rules(rules, max_concurrent_requests=70,start_range=0,end_range=100):
    """Interpret YARA rules asynchronously and return a list of results."""
    semaphore = asyncio.Semaphore(max_concurrent_requests)  # ✅ Create semaphore

    async def process_rule(rule):
        async with semaphore:
            interpreted_text = await interpret_rule(rule, semaphore)  # ✅ Interpret rule
            return {
                "rule_name": rule["name"],
                "tags": rule.get("tags", []),
                "metadata": rule["metadata"],
                "interpreted_text": interpreted_text
            }

    # Run all tasks concurrently and collect results
    tasks = [process_rule(rule) for rule in rules[start_range:end_range]]
    interpreted_results = await asyncio.gather(*tasks)

    return interpreted_results  # ✅ Return all interpreted rules


In [10]:
def process_interpretation(interpreted_result):
   texts = interpreted_result.split("\n###")
   new_texts = ''
   for text in texts:
       text = text.strip()
       text = text.replace(".\n", " ")
       text = text.replace("\n", ":")
       new_texts += text + "\n"
       
   return new_texts
       

In [11]:
# Initialize OpenAI Embeddings
embeddings = OpenAIEmbeddings(api_key=os.getenv("OPENAI_API_KEY"))

In [None]:

def store_in_pgvector(docs):
    vector_store = PGVector(
        connection=PGVECTOR_CONNECTION_STRING,  # ✅ Use correct connection string
        embeddings=embeddings,
        collection_name="malware.yara_rules2",
        use_jsonb=True,
    )

    
    vector_store.add_documents(docs)  # ✅ Store asynchronously

    




In [None]:
yara_file_path = "..\YaraForge\sample-yara-rules-core.yar"  # change this to the path of your YARA file
extracted_rules = extract_yara_rules(yara_file_path)


In [50]:
start=6200
batch = 21

interpreted_results = await interpret_all_rules(extracted_rules, start_range=start, end_range=start+batch)
interpreted_documents =[]
print(f"starting interpretation {len(interpreted_results)} rules from {start} to {start+batch}")
for result in interpreted_results:
    
    metadata={
        "rule_name": result["rule_name"],
        "tags": result["tags"],
        "description": result["metadata"].get("description"),
        'author': result["metadata"].get("author"),
        'id': result["metadata"].get("id"),
        'os': result["metadata"].get("os"),
    }
    content = process_interpretation(result["interpreted_text"])    
    interpreted_documents.append(Document(page_content=content, metadata=metadata))
    
store_in_pgvector(interpreted_documents)
print(f"Stored {len(interpreted_documents)} documents in PGVector from {start} to {start+batch}")

starting interpretation 20 rules from 6200 to 6221
Stored 20 documents in PGVector from 6200 to 6221


In [49]:
len(extracted_rules)

6220

In [None]:
# Generate connection string
# Use malware schema in PostgreSQL connection string
