In [1]:
import struct
import hashlib

from morpheus.utils import token_quality

def hash64(value: int) -> int:
    """Simple 64-bit hash"""
    h = hashlib.md5(value.to_bytes(4, 'little')).digest()
    return int.from_bytes(h[:8], 'little')

class RuleFingerprint:
    # optimize memory usage by avoiding one __dict__ for each instance
    __slots__ = ('bloom', 'xor_hash', 'num_unique', 'num_observations')
    MAX_DISTINCT_OBSERVATIONS = 10

    def __init__(self):
        self.bloom = 0      # 64-bit bloom filter
        self.xor_hash = 0   # XOR of unique value hashes
        self.num_unique = 0      # Unique count
        self.num_observations = 0  # Total observations
        
    def update(self, value: bytes, token_quality_threshold: float = 0.5):
        """Add 4-byte observation"""
        assert len(value) == 4, "Value must be 4 bytes"

        if token_quality(value) < token_quality_threshold:
            return
        
        val = struct.unpack('<I', value)[0]
        h = hash64(val)
        
        # Check if potentially new value
        bits = (1 << (h % 64)) | (1 << ((h >> 32) % 64))
        is_new = (self.bloom & bits) != bits
        
        if is_new:
            self.bloom |= bits
            self.xor_hash ^= h
            self.num_unique += 1
            
            # If we exceed the max distinct observations, set count to overflow marker
            if self.num_unique > self.MAX_DISTINCT_OBSERVATIONS:
                self.num_unique = 0xFFFF  # Overflow marker
    
    def to_hex(self) -> str:
        """Get 16-byte fingerprint"""
        if self.num_unique == 0:
            return '00000000000000000000000000000000'
        if self.num_unique > self.MAX_DISTINCT_OBSERVATIONS:
            return 'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF'
        return struct.pack('<QQH', self.xor_hash, self.bloom, self.num_unique)[:16].hex()

    def from_hex(self, hex_str: str):
        """Load from 16-byte hex string"""
        assert len(hex_str) == 32, "Hex string must be 32 characters"
        data = bytes.fromhex(hex_str)
        self.xor_hash, self.bloom, self.num_unique = struct.unpack('<QQH', data)
        
        if self.num_unique > self.MAX_DISTINCT_OBSERVATIONS:
            self.num_unique = 0xFFFF


In [None]:
import base64
import os
import logging
from glob import glob
from collections import defaultdict

from morpheus.grammar import Grammar

def seeds_to_prefixes(seeds):
    return [s[:4] for s in seeds if len(s) >= 4]

clusters = defaultdict(set)

for grammar_path in list(glob("/shellphish/libs/nautilus/grammars/reference/*.py"))[:]:
    grammar_name = grammar_path.split("/")[-1].replace(".py", "")
    # if "@CORPUS" in grammar_name:
    #     continue
    if grammar_name in ["TNEF", "LZ4"]:
        continue
    
    logging.info(f"Processing grammar: {grammar_name}")
    try:
        # logging.info(f"Loading grammar from: {grammar_path}")
        grammar = Grammar.from_file(grammar_path)

        all_nts = {rule.nt for rule in grammar.rules if rule.is_composable()} - {"ANYRULE"}
        # all_nts = {"START"}

        for nt in all_nts:
            # logging.info(f"Processing non-terminal: {nt}")
            fp = RuleFingerprint()
            fp_base64 = RuleFingerprint()
            for seed in grammar.seed_iterator(nt=nt, n=100):
                prefix = seed[:4]
                fp.update(prefix.ljust(4, b'\x00'))

                base64_seed = base64.b64encode(seed)
                base64_prefix = base64_seed[:4]
                fp_base64.update(base64_prefix.ljust(4, b'\x00'))

                if fp.num_unique > RuleFingerprint.MAX_DISTINCT_OBSERVATIONS and fp_base64.num_unique > RuleFingerprint.MAX_DISTINCT_OBSERVATIONS:
                    # logging.info(f"Exceeded max distinct observations for {grammar_name} {nt}, stopping early.")
                    break
            fp = fp.to_hex()
            fp_base64 = fp_base64.to_hex()
            clusters[fp].add((grammar_name, nt, None))
            clusters[fp_base64].add((grammar_name, nt, "base64"))
    except:
        logging.exception(f"Failed to load grammar: {grammar_name}")
        continue

    # logging.info(f"Processed grammar: {grammar_name}")

In [3]:
filtered_clusters = dict(clusters)
filtered_clusters = {k: list(v) for k, v in filtered_clusters.items()}
if "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" in filtered_clusters:
    del filtered_clusters["FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"]
if "00000000000000000000000000000000" in filtered_clusters:
    del filtered_clusters["00000000000000000000000000000000"]
# filtered_clusters = {k: v for k, v in filtered_clusters.items() if len(v) > 1}
# filtered_clusters = {k: sorted(v) for k, v in filtered_clusters.items() if len({p[0] for p in v}) > 1}

# print the size in megabytes of filtered_clusters in memory
import sys
size_in_bytes = sys.getsizeof(filtered_clusters)
size_in_kilobytes = size_in_bytes / (1024)
print(f"Filtered clusters size: {size_in_kilobytes:.2f} KB")

# write to ./fingerprints.json
import json
with open("../reference_fingerprints.json", "w") as f:
    json.dump(filtered_clusters, f, indent=4, sort_keys=True)

Filtered clusters size: 72.09 KB


In [None]:
from morpheus.grammar import Grammar
grammar = Grammar.from_file("/shellphish/libs/nautilus/grammars/reference/IBOOKS.py")

for grammar in grammar.iter_compositions():
    print(grammar)

In [None]:
grammar_str = """
######################################################################
# Helper Functions
######################################################################

def artiphishell_base64_encode(data: bytes) -> bytes:
    import base64
    return base64.b64encode(data)

######################################################################
# Grammar Rules
######################################################################

ctx.rule('START', '{BASE64_DATA}')
ctx.rule('BASE64_DATA', '{BASE64_DATA_TXT}')
ctx.rule('BASE64_DATA', '{BASE64_DATA_HTML}')
ctx.rule('BASE64_DATA', '{BASE64_DATA_PNG}')
ctx.rule('BASE64_DATA_TXT', b'SGVsbG8gV29ybGQ=')
ctx.rule('BASE64_DATA_HTML', b'PGh0bWw+PGJvZHk+SGVsbG8hPC9ib2R5PjwvaHRtbD4=')
ctx.rule('BASE64_DATA_PNG', b'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNk+A8AAQUBAScY42YAAAAASUVORK5CYII=')
# ctx.external('ARTIPHISHELL_NON_ENCODED_PNG', 'PNG', 'START')
# ctx.script('BASE64_DATA_PNG', ['ARTIPHISHELL_NON_ENCODED_PNG'], lambda data, encode=artiphishell_base64_encode: encode(data))
# ctx.external('ARTIPHISHELL_NON_ENCODED_PNG@CORPUS', 'PNG@CORPUS', 'START')
# ctx.script('BASE64_DATA_PNG', ['ARTIPHISHELL_NON_ENCODED_PNG@CORPUS'], lambda data, encode=artiphishell_base64_encode: encode(data))
"""

from morpheus.grammar import Grammar
grammar = Grammar.from_string(grammar_str)
list(grammar.seed_iterator(nt='START', n=10))

In [None]:
for replacements in grammar.iter_composition_replacements(20):
    print([(internal_rule.nt, external_grammar, external_nt, encoding) for internal_rule, external_grammar, external_nt, encoding in replacements])

In [None]:
import base64
import os
import logging
from glob import glob
from collections import defaultdict

from morpheus.grammar import Grammar

REFERENCE_RULEHASHES = defaultdict(set)

for grammar_path in list(glob("/shellphish/libs/nautilus/grammars/reference/*.py"))[:]:
    grammar_name = grammar_path.split("/")[-1].replace(".py", "")
    
    logging.info(f"Processing grammar: {grammar_name}")
    try:
        # logging.info(f"Loading grammar from: {grammar_path}")
        grammar = Grammar._from_file(grammar_name, grammar_path)

        for rule in grammar.rules:
            if not rule.is_composable():
                continue

            REFERENCE_RULEHASHES[rule.nt].add(rule.hexdigest)
    except:
        logging.exception(f"Failed to load grammar: {grammar_name}")
        continue

In [17]:
REFERENCE_RULEHASHES = {k: sorted(v) for k, v in REFERENCE_RULEHASHES.items()}
import sys
size_in_bytes = sys.getsizeof(REFERENCE_RULEHASHES)
size_in_kilobytes = size_in_bytes / (1024)
print(f"Filtered clusters size: {size_in_kilobytes:.2f} KB")

# write to ./fingerprints.json
import json
with open("../reference_rulehashes.json", "w") as f:
    json.dump(REFERENCE_RULEHASHES, f, indent=4, sort_keys=True)

Filtered clusters size: 144.09 KB
