Copyright (c) 2024 Gabor Seljan. All rights reserved.

Licensed under the MIT License.

# SeedClass

SeedClass is an experimental machine learning project employing binary classification to predict whether fuzzing a specific seed file will lead to a vulnerability.

In [None]:
import os
import heapq
import config
import numpy as np
import pandas as pd
import seaborn as sns
import plotly.express as px
import matplotlib.pyplot as plt

from tensorflow.keras import Input
from tensorflow.keras.metrics import AUC, Recall, Precision
from tensorflow.keras.metrics import TruePositives, TrueNegatives
from tensorflow.keras.metrics import FalsePositives, FalseNegatives
from tensorflow.keras.regularizers import L1, L2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard

from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split

from shutil import copy
from utils import load_data
from utils import plot_prc

if config.VALIDATION:
    from utils import plot_cross_val_essential_metrics
    from utils import plot_cross_val_confusion_metrics
else:
    from utils import plot_essential_metrics
    from utils import plot_confusion_metrics

In [None]:
dd = {}
dd = load_data(dd, 'data/clean', config.LABEL_NEGATIVE)
dd = load_data(dd, 'data/crash', config.LABEL_POSITIVE)

In [None]:
df = pd.DataFrame.from_dict(dd, orient='index').add_prefix('f')
df.info()
df.describe().transpose()

In [None]:
sns.countplot(x='f0', data=df)

In [None]:
corr = df.corr()
corr['f0'][1:-1].sort_values().plot(kind='bar')

In [None]:
sns.heatmap(corr)

In [None]:
mask = (abs(corr) > 0.5) & (abs(corr) != 1)
corr.where(mask).stack().sort_values()

In [None]:
X = df.drop('f0', axis=1).values
y = df['f0'].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=config.TEST_SIZE, shuffle=True, random_state=config.SEED)

In [None]:
scaler = MinMaxScaler(feature_range=(0, config.FEATURE_RANGE_MAX))
#X = pd.DataFrame(scaler.fit_transform(X))
X_train = pd.DataFrame(scaler.fit_transform(X_train)).values
X_test = pd.DataFrame(scaler.transform(X_test)).values

In [None]:
pca = PCA()
pca.fit_transform(X_train)
cumsum = np.cumsum(pca.explained_variance_ratio_)
d = np.argmax(cumsum >= 0.95) + 1
print(f'Min {d} dimensions required for 95% variance')

In [None]:
N=3
pca = PCA(n_components=N)
X_pca = pca.fit_transform(X_train)
X_pca_df = pd.DataFrame(X_pca, columns = ['PC1', 'PC2', 'PC3'])
fig = px.scatter_3d(X_pca_df, x='PC1', y='PC2', z='PC3', size_max=18, opacity=0.7).update_traces(marker = dict(color = y_train))
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
fig.show()
print(f'Variance: {sum(pca.explained_variance_ratio_) * 100:.2f}%')

In [None]:
N=3
pca = PCA(n_components=N)
X_pca = pca.fit_transform(X_train)
fig = plt.figure(figsize=[8.4, 6.8])
ax = fig.add_subplot(111, projection='3d')
for i in range(0, N-2, 3):
    ax.scatter(X_pca[:,i+1], X_pca[:,i], X_pca[:,i+2], c=y_train)
plt.title(f'{N} Component PCA')
plt.show()
print(f'Variance: {sum(pca.explained_variance_ratio_) * 100:.2f}%')

In [None]:
N=2
pca = PCA(n_components=N)
X_pca = pca.fit_transform(X_train)
plt.figure(figsize=[6.4, 4.8])
for i in range(0, N-1, 2):
    plt.scatter(X_pca[:, i], X_pca[:, i+1], c=y_train)
plt.title(f'{N} Component PCA')
plt.show()
print(f'Variance: {sum(pca.explained_variance_ratio_) * 100:.2f}%')

In [None]:
N=2
tsne = TSNE(n_components=N)
X_tsne = tsne.fit_transform(X_train)
plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=y_train)
plt.title(f'{N} Component TSNE')
plt.show()

In [None]:
model = Sequential()
model.add(Input(shape=(256,)))
model.add(Dense(256, activation='relu', kernel_regularizer=L2(0.0001) if config.REGULARIZATION else None))
model.add(Dropout(0.5))
model.add(Dense(256, activation='relu', kernel_regularizer=L2(0.0001) if config.REGULARIZATION else None))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

In [None]:
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy',
        Recall(thresholds=config.THRESHOLD, name='recall'),
        Precision(thresholds=config.THRESHOLD, name='precision'),
        TruePositives(thresholds=config.THRESHOLD, name='tp'),
        TrueNegatives(thresholds=config.THRESHOLD, name='tn'),
        FalsePositives(thresholds=config.THRESHOLD, name='fp'),
        FalseNegatives(thresholds=config.THRESHOLD, name='fn'),
        AUC(name='auc'),
        AUC(name='prc', curve='PR')
    ]
)

In [None]:
model.summary()

In [None]:
if config.VALIDATION:
    histories = []
    kfold = KFold(n_splits=config.N_SPLITS, shuffle=True)
    for train, test in kfold.split(X_train, y_train):
        history = model.fit(
            x=X_train[train],
            y=y_train[train],
            batch_size=config.BATCH_SIZE,
            epochs=config.EPOCHS,
            verbose=config.VERBOSE,
            validation_data=(X_train[test], y_train[test]),
            callbacks=[
                EarlyStopping(monitor='val_loss', mode='min', restore_best_weights=True),
                TensorBoard(log_dir=config.LOG_DIR)
            ]
        )
        histories.append(history)
else:
    history = model.fit(
        x=X_train,
        y=y_train,
        batch_size=config.BATCH_SIZE,
        epochs=config.EPOCHS,
        verbose=config.VERBOSE,
        validation_data=(X_test, y_test),
        callbacks=[
            EarlyStopping(monitor='val_loss', mode='min', restore_best_weights=True),
            TensorBoard(log_dir=config.LOG_DIR)
        ]
    )

In [None]:
essentials = ['loss', 'prc', 'precision', 'recall']
confusions = ['tp', 'tn', 'fp', 'fn']
if config.VALIDATION:
    plot_cross_val_essential_metrics(essentials, histories)
    plot_cross_val_confusion_metrics(confusions, histories)
else:
    plot_essential_metrics(essentials, history)
    plot_confusion_metrics(confusions, history)

In [None]:
scores = model.evaluate(X_test, y_test, verbose=config.VERBOSE)
print(f'Loss: {scores[0] * 100:.2f}% - PRC: {scores[9] * 100:.2f}% - Precision: {scores[3] * 100:.2f}% - Recall: {scores[2] * 100:.2f}%')

In [None]:
y_pred = (model.predict(X_test) > config.THRESHOLD).astype('int32')
cm = confusion_matrix(y_true=y_test, y_pred=y_pred)
ConfusionMatrixDisplay(confusion_matrix=cm).plot()

In [None]:
y_train_pred = model.predict(X_train, verbose=config.VERBOSE)
y_test_pred = model.predict(X_test, verbose=config.VERBOSE)
plot_prc('Train Baseline', y_train, y_train_pred, color='C0')
plot_prc('Test Baseline', y_test, y_test_pred, color='C1', linestyle='--')

In [None]:
dd = {}
dd = load_data(dd, 'data/check')

In [None]:
df = pd.DataFrame.from_dict(dd, orient='index').add_prefix('f')
df.shape

In [None]:
X_pred = df.drop('f0', axis=1).values
X_pred = pd.DataFrame(scaler.transform(X_pred))
#X_pred = pd.DataFrame(scaler.fit_transform(X_pred.T)).T

In [None]:
preds = (model.predict(X_pred, verbose=config.VERBOSE) > config.THRESHOLD).astype('int32')
indices = []

# Automatically adjust threshold to always find best candidates
if np.count_nonzero(preds) == 0:
    best = []
    uniques = set()

    preds = model.predict(X_pred, verbose=config.VERBOSE)

    flats = [item for sublist in preds for item in sublist]
    for v in heapq.nlargest(len(flats), flats):
        if v not in uniques:
            uniques.add(v)
            best.append(v)
        if len(best) == 21:
            break
    
    indices = [flats.index(v) for v in best]

    preds = (preds > min(best)).astype('int32')

In [None]:
lst = list()

for idx, l in np.ndenumerate(preds):
    if l == 1:
        if indices and idx[0] not in indices:
            continue
        print('{}'.format(idx[0]))
        lst.append(df.index[idx[0]])
    if len(lst) == 20:
        break

print(f'Selected {len(lst)} files out of {len(X_pred)} total')

In [None]:
try:
    for filename in lst:
        print('{}'.format(filename))
except:
    print('Something, somewhere went terribly wrong!')
    pass