## 1. IMPORT LIBRARIES

In [1]:
import sys
import os
import pandas

## 2. CLEAN GADGET

### Processes code lines by:

- Replacing user-defined function names with symbolic names like `FUN1, FUN2`, etc.
- Replacing user-defined variable names with symbolic names like `VAR1, VAR2`, etc.
- Leaving keywords, built-in functions, and standard arguments (`argc, argv`) untouched.
- Skipping over string and character literals, comments, and non-ASCII characters to focus only on code identifiers.

In [2]:
import re

# Immutable set of keywords up to C11 and C++17 standards.
# These are reserved words in C/C++ that should not be renamed or replaced.
keywords = frozenset({
    '__asm', '__builtin', '__cdecl', '__declspec', '__except', '__export', '__far16', '__far32',
    '__fastcall', '__finally', '__import', '__inline', '__int16', '__int32', '__int64', '__int8',
    '__leave', '__optlink', '__packed', '__pascal', '__stdcall', '__system', '__thread', '__try',
    '__unaligned', '_asm', '_Builtin', '_Cdecl', '_declspec', '_except', '_Export', '_Far16',
    '_Far32', '_Fastcall', '_finally', '_Import', '_inline', '_int16', '_int32', '_int64',
    '_int8', '_leave', '_Optlink', '_Packed', '_Pascal', '_stdcall', '_System', '_try', 'alignas',
    'alignof', 'and', 'and_eq', 'asm', 'auto', 'bitand', 'bitor', 'bool', 'break', 'case',
    'catch', 'char', 'char16_t', 'char32_t', 'class', 'compl', 'const', 'const_cast', 'constexpr',
    'continue', 'decltype', 'default', 'delete', 'do', 'double', 'dynamic_cast', 'else', 'enum',
    'explicit', 'export', 'extern', 'false', 'final', 'float', 'for', 'friend', 'goto', 'if',
    'inline', 'int', 'long', 'mutable', 'namespace', 'new', 'noexcept', 'not', 'not_eq', 'nullptr',
    'operator', 'or', 'or_eq', 'override', 'private', 'protected', 'public', 'register',
    'reinterpret_cast', 'return', 'short', 'signed', 'sizeof', 'static', 'static_assert',
    'static_cast', 'struct', 'switch', 'template', 'this', 'thread_local', 'throw', 'true', 'try',
    'typedef', 'typeid', 'typename', 'union', 'unsigned', 'using', 'virtual', 'void', 'volatile',
    'wchar_t', 'while', 'xor', 'xor_eq', 'NULL'
})

# Known, non-user-defined function names that shouldn't be replaced.
main_set = frozenset({'main'})

# Common arguments in the 'main' function that should not be renamed.
main_args = frozenset({'argc', 'argv'})

# Function to process and anonymize a C/C++ code snippet.
# Input: gadget (list of strings), where each string is a line of code.
# Output: cleaned_gadget (list of strings) with function/variable names replaced by symbolic names.
def clean_gadget(gadget):
    # Maps user-defined function names to anonymized symbols (e.g., FUN1, FUN2).
    fun_symbols = {}

    # Maps user-defined variable names to anonymized symbols (e.g., VAR1, VAR2).
    var_symbols = {}

    # Counters to generate unique symbolic names.
    fun_count = 1
    var_count = 1

    # Regex to detect if a line is ending a multi-line comment.
    rx_comment = re.compile(r'\*/\s*$')

    # Regex to find candidate function names (words followed by an opening parenthesis).
    rx_fun = re.compile(r'\b([_A-Za-z]\w*)\b(?=\s*\()')

    # Regex to find candidate variable names.
    # Matches identifiers not immediately followed by a '('.
    rx_var = re.compile(r'\b([_A-Za-z]\w*)\b(?:(?=\s*\w+\()|(?!\s*\w+))(?!\s*\()')

    # List to store the cleaned code lines.
    cleaned_gadget = []

    # Process each line of the input gadget.
    for line in gadget:
        # Skip lines that end multi-line comments.
        if rx_comment.search(line) is None:

            # Step 1: Clean the line of literals and non-ASCII characters.

            # Remove string literals (content inside double quotes) to avoid replacing names inside them.
            nostrlit_line = re.sub(r'".*?"', '""', line)

            # Remove character literals (content inside single quotes).
            nocharlit_line = re.sub(r"'.*?'", "''", nostrlit_line)

            # Remove non-ASCII characters to ensure processing of clean ASCII text.
            ascii_line = re.sub(r'[^\x00-\x7f]', r'', nocharlit_line)

            # Step 2: Extract potential function and variable names.

            # Find all function-like identifiers (names followed by '(').
            user_fun = rx_fun.findall(ascii_line)

            # Find all variable-like identifiers (names not followed by '(').
            user_var = rx_var.findall(ascii_line)

            # Step 3: Replace user-defined function names with symbolic names.
            for fun_name in user_fun:
                # Skip if it's 'main' or a reserved keyword.
                if fun_name not in main_set and fun_name not in keywords:

                    # If the function isn't already mapped, assign it a new symbol.
                    if fun_name not in fun_symbols:
                        fun_symbols[fun_name] = 'FUN' + str(fun_count)
                        fun_count += 1

                    # Replace function calls with the symbolic name (positive lookahead ensures it's only a function call).
                    ascii_line = re.sub(
                        r'\b(' + re.escape(fun_name) + r')\b(?=\s*\()',
                        fun_symbols[fun_name],
                        ascii_line
                    )

            # Step 4: Replace user-defined variable names with symbolic names.
            for var_name in user_var:
                # Skip if it's a reserved keyword or a common 'main' argument.
                if var_name not in keywords and var_name not in main_args:

                    # If the variable isn't already mapped, assign it a new symbol.
                    if var_name not in var_symbols:
                        var_symbols[var_name] = 'VAR' + str(var_count)
                        var_count += 1

                    # Replace variables with the symbolic name.
                    # Uses lookaheads to ensure it's not a function name.
                    ascii_line = re.sub(
                        r'\b(' + re.escape(var_name) + r')\b(?:(?=\s*\w+\()|(?!\s*\w+))(?!\s*\()',
                        var_symbols[var_name],
                        ascii_line
                    )

            # Append the processed line to the result list.
            cleaned_gadget.append(ascii_line)

    # Return the fully cleaned and anonymized code.
    return cleaned_gadget


### FUNCTION TEST

In [3]:
test_gadget = ['int counter = 0', 'counter = counter + 1;']

test_gadget2 = ['int sum(int a, int b) {return a + b;}', 'int result = sum(5, 10);']

test_gadget3 = ['function(File file, Buffer buff)', 'this is a comment test */']

print(clean_gadget(test_gadget))
print(clean_gadget(test_gadget2))
print(clean_gadget(test_gadget3))

['int VAR1 = 0', 'VAR1 = VAR1 + 1;']
['int FUN1(int VAR1, int VAR2) {return VAR1 + VAR2;}', 'int VAR3 = FUN1(5, 10);']
['FUN1(File VAR1, Buffer VAR2)']


## 3. PARSE FILE

In [4]:
def parse_file(filename):
    """
    Opens and reads the specified gadget file line by line.
    Groups lines into individual gadgets, ignoring the first "index" line of each gadget.
    Cleans each gadget using `clean_gadget()`, which anonymizes variable and function names.
    Yields a tuple for each gadget: (cleaned_gadget_lines, gadget_label)
    """
    with open(filename, "r", encoding="utf8") as file:
        gadget = []       # Stores code lines for the current gadget
        gadget_val = 0    # Stores the vulnerability label (0 or 1) for the current gadget
        
        for line in file:
            stripped = line.strip()   # Remove leading/trailing whitespace
            
            if not stripped:
                # Skip empty lines
                continue

            # Check for end-of-gadget delimiter (a line of dashes)
            if "-" * 33 in line and gadget: 
                # Yield the current gadget and its label after cleaning it
                yield clean_gadget(gadget), gadget_val
                # Reset for the next gadget
                gadget = []

            # Check if the line starts with a digit (could be the label or code)
            elif stripped.split()[0].isdigit():
                if gadget:
                    # If it's just a number, treat it as the vulnerability label (e.g., "1" or "0")
                    if stripped.isdigit():
                        gadget_val = int(stripped)
                    else:
                        # Otherwise, it's a code line that happens to start with a number
                        gadget.append(stripped)
            else:
                # Regular code line, add it to the current gadget
                gadget.append(stripped)

In [5]:
filename = "cwe119_cgd.txt"
parse_file(filename)

<generator object parse_file at 0x0000024AD7779140>

In [6]:
base = os.path.splitext(os.path.basename(filename))[0]
vector_filename = base + "_gadget_vectors.pkl" # Example: 'cwe399_cgd_gadget_vectors.pkl'
vector_length = 50

## 4. VECTORIZE GADGET

### Define a GadgetVectorizer class, which:

- Tokenizes C/C++ code snippets (gadgets).
- Trains a Word2Vec model on these tokenized gadgets.
- Generates vector representations of gadgets using the trained Word2Vec embeddings.

In [7]:
import re
import sys

import warnings
warnings.filterwarnings("ignore")

from gensim.models import Word2Vec
import numpy

# =======================
# Operator Sets for Tokenization
# =======================

# Operators with 3 characters 
operators3 = {'<<=', '>>='}
# Operators with 2 characters
operators2 = {
    '->', '++', '--', 
    '!~', '<<', '>>', '<=', '>=', 
    '==', '!=', '&&', '||', '+=', 
    '-=', '*=', '/=', '%=', '&=', '^=', '|='
    }
# Operators with 1 character
operators1 = { 
    '(', ')', '[', ']', '.', 
    '+', '-', '*', '&', '/', 
    '%', '<', '>', '^', '|', 
    '=', ',', '?', ':' , ';',
    '{', '}'
    }

"""
A class for tokenizing code gadgets, training a Word2Vec model, and generating
fixed-size vector representations of the gadgets.

Primary Functions:
- Tokenize individual code lines and gadgets.
- Buffer gadgets and train Word2Vec embeddings.
- Convert tokenized gadgets into 2D vector matrices.

Each gadget is treated as a sequence of tokens, and the final vector
representation is a matrix of size (50 x vector_length).
"""
class GadgetVectorizer:

    def __init__(self, vector_length):
        """
        Initialize the GadgetVectorizer.
        """
        self.gadgets = []             # List to store tokenized gadgets for Word2Vec training.
        self.vector_length = vector_length  # Dimension of each token vector.
        self.forward_slices = 0       # Count of gadgets vectorized in forward direction.
        self.backward_slices = 0      # Count of gadgets vectorized in backward direction.

    @staticmethod
    def tokenize(line):
        """
        Tokenize a single line of C/C++ code.
        Splits the line into tokens including identifiers, keywords, and operators.
        Preserves the original order of tokens.
        """
        tmp, w = [], []  # tmp: list of finalized tokens, w: characters building current word
        i = 0

        while i < len(line):
            if line[i] == ' ':
                # End of word; finalize and add a space token
                tmp.append(''.join(w))
                tmp.append(line[i])
                w = []
                i += 1
            # Check for three-character operators (e.g., <<=)
            elif line[i:i+3] in operators3:
                tmp.append(''.join(w))
                tmp.append(line[i:i+3])
                w = []
                i += 3
            # Check for two-character operators (e.g., ++, ==)
            elif line[i:i+2] in operators2:
                tmp.append(''.join(w))
                tmp.append(line[i:i+2])
                w = []
                i += 2
            # Check for one-character operators (e.g., +, -, ;)
            elif line[i] in operators1:
                tmp.append(''.join(w))
                tmp.append(line[i])
                w = []
                i += 1
            else:
                # Part of a word/identifier; collect characters
                w.append(line[i])
                i += 1

        # Filter out empty strings and space tokens
        res = list(filter(lambda c: c != '', tmp))
        return list(filter(lambda c: c != ' ', res))

    @staticmethod
    def tokenize_gadget(gadget):
        """
        Tokenize an entire gadget (list of code lines).

        For each line:
            - Tokenize the line.
            - Concatenate all tokens into a single list.
            - Check if any function calls (tokens starting with 'FUN') exist.
        """
        tokenized = []
        function_regex = re.compile(r'FUN(\d)+')  # Matches tokens like FUN1, FUN2, etc.
        backwards_slice = False

        for line in gadget:
            tokens = GadgetVectorizer.tokenize(line)
            tokenized += tokens

            # If a function token exists in this line, set backwards_slice to True
            if any(function_regex.match(token) for token in tokens):
                backwards_slice = True
            else:
                backwards_slice = False

        return tokenized, backwards_slice

    def add_gadget(self, gadget):
        """
        Add a tokenized gadget to the training buffer.

        Updates forward or backward slice counters depending on the presence of function tokens.
        """
        tokenized_gadget, backwards_slice = GadgetVectorizer.tokenize_gadget(gadget)
        self.gadgets.append(tokenized_gadget)
        if backwards_slice:
            self.backward_slices += 1
        else:
            self.forward_slices += 1

    def vectorize(self, gadget):
        """
        Generate a 2D vector representation of a gadget.

        - Tokenizes the gadget.
        - Creates a matrix of size (50 x vector_length).
        - Fills it with token embeddings from the Word2Vec model.
        - Uses forward or backward slicing to populate vectors.
        """
        tokenized_gadget, backwards_slice = GadgetVectorizer.tokenize_gadget(gadget)
        vectors = numpy.zeros(shape=(50, self.vector_length))

        num_tokens = min(len(tokenized_gadget), 50)

        if backwards_slice:
            # Populate the matrix from the bottom up (reverse order)
            for i in range(num_tokens):
                token_index = len(tokenized_gadget) - 1 - i
                vectors[49 - i] = self.embeddings[tokenized_gadget[token_index]]
        else:
            # Populate the matrix from the top down (forward order)
            for i in range(num_tokens):
                vectors[i] = self.embeddings[tokenized_gadget[i]]

        return vectors

    def train_model(self):
        """
        Train the Word2Vec model on the buffered tokenized gadgets.

        - Uses skip-gram model (`sg=1`) for learning embeddings.
        - Sets `min_count=1` to ensure every token has an embedding.
        - After training, keeps only the word vectors (embeddings).
        - Frees memory by deleting the model and raw gadget data.
        """
        # Train the Word2Vec model on all gadgets
        model = Word2Vec(
            sentences=self.gadgets,
            min_count=1,
            vector_size=self.vector_length,
            sg=1  # Skip-gram model
        )

        # Save the learned word vectors
        self.embeddings = model.wv

        # Clean up to save memory
        del model
        del self.gadgets

In [8]:
"""
Processes a gadget file and returns a DataFrame of vectorized gadgets and their labels.

Workflow:
---------
1. Parse the gadget file using `parse_file()` to extract individual gadgets and their vulnerability labels.
2. Store each gadget (code + label) in a list.
3. Add each gadget to the `GadgetVectorizer` to build a training corpus.
4. After all gadgets are collected, train a Word2Vec model on the tokens.
5. Re-iterate over the gadgets and convert each into a fixed-size vector matrix.
6. Store each vector and its corresponding label into a list.
7. Convert the list into a Pandas DataFrame with two columns: "vector" and "val".
"""
def get_vectors_df(filename, vector_length=100):
    gadgets = []  # List to hold all gadgets with their labels
    count = 0
    vectorizer = GadgetVectorizer(vector_length)  # Initialize vectorizer with embedding dimension

    # First pass: parse and collect gadgets, add them to the vectorizer
    for gadget, val in parse_file(filename):
        count += 1
        print("Collecting gadgets...", count, end="\r")
        vectorizer.add_gadget(gadget)  # Tokenize and store gadget for training
        row = {"gadget": gadget, "val": val}  # Store raw gadget + label
        gadgets.append(row)

    # Print slicing mode stats
    print('Found {} forward slices and {} backward slices'
          .format(vectorizer.forward_slices, vectorizer.backward_slices))
    print()

    # Train Word2Vec model on all tokenized gadgets
    print("Training model...", end="\r")
    vectorizer.train_model()
    print()

    vectors = []  # Final list to store vectorized gadgets
    count = 0

    # Second pass: convert each gadget to a vector
    for gadget in gadgets:
        count += 1
        print("Processing gadgets...", count, end="\r")
        vector = vectorizer.vectorize(gadget["gadget"])  # Get (50 x vector_length) matrix
        row = {"vector": vector, "val": gadget["val"]}  # Store vector + label
        vectors.append(row)

    print()

    # Convert to DataFrame 
    df = pandas.DataFrame(vectors)
    return df

In [9]:
# Check if the preprocessed vector data already exists as a pickle file
if os.path.exists(vector_filename):
    # Load the DataFrame from the cached pickle file 
    df = pandas.read_pickle(vector_filename)
else:
    # If not cached, generate the vectors from the raw gadget file
    df = get_vectors_df(filename, vector_length)
    # Save the generated DataFrame as a pickle file for future reuse
    df.to_pickle(vector_filename)

## 5. BLSTM

In [11]:
from __future__ import print_function
import warnings
import os
import random
import numpy as np
import tensorflow as tf

from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, Bidirectional, LeakyReLU, ReLU
from keras.optimizers import Adamax

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.utils import compute_class_weight

warnings.filterwarnings("ignore")


def set_seed(seed=41, deterministic=True):
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    if deterministic:
        os.environ['TF_DETERMINISTIC_OPS'] = '1'
        tf.config.experimental.enable_op_determinism()
    tf.config.set_visible_devices([], 'GPU')
    print("[INFO] TensorFlow is running on CPU only.")
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    tf.config.threading.set_intra_op_parallelism_threads(1)
    tf.config.threading.set_inter_op_parallelism_threads(1)
    print(f"[INFO] Reproducibility seed set to {seed}")


class BLSTM:
    def __init__(self, data, name="blstm_final", seed=41):
        self.seed = seed
        self.name = name
        set_seed(seed)

        vectors = np.stack(data.iloc[:, 0].values)
        labels = data.iloc[:, 1].values

        # Train/Test split
        X_train_raw, X_test, y_train_raw, y_test = train_test_split(
            vectors, labels, test_size=0.2, stratify=labels, random_state=seed
        )

        # Balance training set only
        pos_idxs = np.where(y_train_raw == 1)[0]
        neg_idxs = np.where(y_train_raw == 0)[0]
        rng = np.random.default_rng(seed=seed)
        undersampled_neg_idxs = rng.choice(neg_idxs, len(pos_idxs), replace=False)
        balanced_idxs = np.concatenate([pos_idxs, undersampled_neg_idxs])

        self.X_train = X_train_raw[balanced_idxs]
        self.y_train = to_categorical(y_train_raw[balanced_idxs])
        self.X_test = X_test
        self.y_test = to_categorical(y_test)

        # Class weights from unbalanced training data
        classes = np.unique(y_train_raw)
        weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train_raw)
        self.class_weight = dict(zip(classes, weights))

        # Static hyperparameters
        self.best_params = {
            'lstm_units': 256,
            'dense_units_1': 256,
            'dense_units_2': 256,
            'dropout_rate': 0.3219216042267068,
            'learning_rate': 0.002628016920729174,
            'activation': 'LeakyReLU',
            'batch_size': 64,
            'epochs': 20
        }

        self.model = self.build_model_with_params(self.best_params)
        self.train_final_model()

    def build_model_with_params(self, params):
        model = Sequential()
        model.add(Bidirectional(LSTM(params['lstm_units']), input_shape=(self.X_train.shape[1], self.X_train.shape[2])))
        model.add(Dense(params['dense_units_1']))
        model.add(LeakyReLU() if params['activation'] == 'LeakyReLU' else ReLU())
        model.add(Dropout(params['dropout_rate']))
        model.add(Dense(params['dense_units_2']))
        model.add(LeakyReLU() if params['activation'] == 'LeakyReLU' else ReLU())
        model.add(Dropout(params['dropout_rate']))
        model.add(Dense(2, activation='softmax'))

        optimizer = Adamax(learning_rate=params['learning_rate'])
        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        return model

    def train_final_model(self):
        print(f"[INFO] Training on {len(self.X_train)} samples for {self.best_params['epochs']} epochs...")
        self.model.fit(
            self.X_train,
            self.y_train,
            batch_size=self.best_params['batch_size'],
            epochs=self.best_params['epochs'],
            class_weight=self.class_weight,
            verbose=1
        )
        self.model.save_weights(self.name + "_best_model.weights.h5")

    def test(self):
        self.model.load_weights(self.name + "_best_model.weights.h5")
        batch_size = self.best_params['batch_size']
        results = self.model.evaluate(self.X_test, self.y_test, batch_size=batch_size, verbose=1)
        print(f"Test Accuracy: {results[1]:.4f}")

        predictions = self.model.predict(self.X_test, batch_size=batch_size).round()
        tn, fp, fn, tp = confusion_matrix(
            np.argmax(self.y_test, axis=1),
            np.argmax(predictions, axis=1)
        ).ravel()

        print(f'False positive rate: {fp / (fp + tn):.4f}')
        print(f'False negative rate: {fn / (fn + tp):.4f}')
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        print(f'True positive rate (Recall): {recall:.4f}')
        print(f'Precision: {precision:.4f}')
        print(f'F1 score: {f1_score:.4f}')

In [12]:
blstm = BLSTM(df, name=base)
blstm.test()

[INFO] TensorFlow is running on CPU only.
[INFO] Reproducibility seed set to 41
[INFO] Training on 16704 samples for 20 epochs...
Epoch 1/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 156ms/step - accuracy: 0.5764 - loss: 0.6854
Epoch 2/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 159ms/step - accuracy: 0.7046 - loss: 0.5527
Epoch 3/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 156ms/step - accuracy: 0.7237 - loss: 0.5294
Epoch 4/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 155ms/step - accuracy: 0.7402 - loss: 0.5064
Epoch 5/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 155ms/step - accuracy: 0.7510 - loss: 0.4850
Epoch 6/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 155ms/step - accuracy: 0.7681 - loss: 0.4550
Epoch 7/20
[1m261/261[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 157ms/step - accuracy: 0.7898 - loss: 0.4154
Epoch 8/2