In [39]:
import pandas as pd
import numpy as np
import os
from io import DEFAULT_BUFFER_SIZE
from scipy.stats import entropy
from tqdm.auto import tqdm
import pefile
import PySimpleGUI as sg


from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier

from sklearn.neural_network import MLPClassifier

from sklearn.svm import SVC

In [40]:
# Global variables
seed = 42
# Path to original train.csv
csv_input_train_path = 'train.csv'
# Path to original test.csv
csv_input_test_path = 'test.csv'
# Path to training data
csv_train_data_path = 'train-data.csv'
# Path to testing data
csv_test_data_path = 'test-data.csv'
# Path to training labels
csv_train_labels_path = 'train-labels.csv'
# Path to testing labels
csv_test_labels_path = 'test-labels.csv'
# Path to training samples directory
dir_train = '.\\train'
# Path to testing samples directory
dir_test = '.\\test'

In [41]:
# Read the CSV files as DataFrames
df_train = pd.read_csv(csv_input_train_path)
df_test = pd.read_csv(csv_input_test_path)

# Rename `id` to `path` and `list` to `malicious`
df_train = df_train.rename(columns={'id': 'path', 'list': 'malicious'})
df_test = df_test.rename(columns={'id': 'path', 'list': 'malicious'})

# Change `malicious` column to bool type
df_train['malicious'] = df_train['malicious'].eq('Blacklist')
df_test['malicious'] = df_test['malicious'].eq('Blacklist')

# Change file name to relative path
df_train['path'] = df_train['path'].apply(lambda path: os.path.join(dir_train, str(path)))
df_test['path'] = df_test['path'].apply(lambda path: os.path.join(dir_test, str(path)))

# Sort by path
df_train = df_train.sort_values(by='path').reset_index(drop=True)
df_test = df_test.sort_values(by='path').reset_index(drop=True)

# Preview training data
df_train.head()

Unnamed: 0,path,md5,sha1,sha256,total,positives,malicious,filetype,submitted,user_id,length,entropy
0,.\train\1,0000e1cccca9ca2ae5d1dfb124686920,7704b8f6a9538f9bc2366696cf85eeec1f15e224,e1bde0b326a91688e8718086d06d1377394f2cb7feddec...,64,42,True,exe,40:22.2,1,67328,7.26516
1,.\train\10,000a1a3e8204261b7408ff51ffa5b440,fb4a5e7801cf6d0f293b6051569e325c46271309,9d5fc038b67068fa5fba1bbb34b938db83b26638b4e22f...,52,23,True,exe,40:24.5,1,773024,7.935332
2,.\train\100,013539d21eb2a5db8d0554affb3541b8,7d03a5db7166724ce98840ecb925bdab488e41e6,321d5a6d4f78f64b579f4182d18857e342da95c80dd30c...,51,46,True,exe,23:36.9,1,237766,5.571339
3,.\train\1000,00a1a53ae5ed39cbc04df7bd5bb3f9d1,249042d8478015cefdcd114a2009db14c2c20d48,3a3d86e16b03f98d10e3bd28d69f8486b868ce107b3332...,54,51,True,exe,10:37.1,1,57856,7.860059
4,.\train\1001,00a1abf0d976822bd049a72194af7440,cc96a187b2eca3f1b05fe0ed4c2ed7d005c3cb5a,e61e81190ca872eead63880a86a76f5c12563d51bd62d3...,51,40,True,exe,10:37.2,1,288256,7.016225


In [42]:
# Store labels in new dataframe
df_train_labels = df_train[['path', 'malicious']].copy()
df_test_labels = df_test[['path', 'malicious']].copy()

# Save labels to CSV
df_train_labels.to_csv(csv_train_labels_path, index=False)
df_test_labels.to_csv(csv_test_labels_path, index=False)

# Store input data in new dataframe, saving only the `path` column
df_train_data = df_train[['path']].copy()
df_test_data = df_test[['path']].copy()

# Preview training data
df_train_data.head()

Unnamed: 0,path
0,.\train\1
1,.\train\10
2,.\train\100
3,.\train\1000
4,.\train\1001


In [43]:
# use magic numbers to find filetype
import magic
df_train['extension'] = df_train['path'].apply(lambda path: magic.from_file(path, mime=True))
df_test['extension'] = df_test['path'].apply(lambda path: magic.from_file(path, mime=True))

FileNotFoundError: [Errno 2] No such file or directory: '.\\train\\1'

In [None]:
print(f'# train rows:               {len(df_train_data)}')
print(f'# test rows:                {len(df_test_data)}')
print(f'# NaN train path values:    {df_train_data['path'].isna().sum()}')
print(f'# NaN test path values:     {df_test_data['path'].isna().sum()}')
print(f'# unique train path values: {df_train_data['path'].nunique()}')
print(f'# unique test path values:  {df_test_data['path'].nunique()}')

In [None]:
def get_entropy(path):
    byte_counts = np.zeros(256, dtype=np.uint64)
    with open(path, 'rb') as file:
        while chunk := file.read(DEFAULT_BUFFER_SIZE):
            byte_counts += np.bincount(np.frombuffer(chunk, dtype=np.uint8), minlength=256).astype(np.uint64)
    file_size = os.path.getsize(path)
    return entropy(byte_counts / file_size, base=2) if file_size else 0

In [None]:
# Store the Shannon entropies of the files
tqdm.pandas()
df_train_data['entropy'] = df_train_data['path'].progress_apply(get_entropy)
df_test_data['entropy'] = df_test_data['path'].progress_apply(get_entropy)

# Preview training data
df_train_data.head()

In [None]:
def parse_pe(path):
    d = dict()
    try:
        d = pefile.PE(path).dump_dict()
        d.pop('LOAD_CONFIG', None)
        d.pop('TLS', None)
        d['Parsing Warnings'] = 'Parsing Warnings' in d
        _keys = list(d.keys())
        for key in _keys:
            if isinstance(d[key], list):
                d.pop(key, None)

    except Exception as e:
        print(f'Error processing {path}: {e}')
        d['Parsing Warnings'] = True

    return d

In [None]:
# Make copies of the dataframes because the next operation takes a long time
_temp_df_train_data = df_train_data.copy()
_temp_df_test_data = df_test_data.copy()

# Get the portable executable file information
# This excludes LOAD_CONFIG and TLS data and columns whose values are lists
_temp_train = _temp_df_train_data['path'].progress_apply(parse_pe)
_temp_test = _temp_df_test_data['path'].progress_apply(parse_pe)

In [None]:
# Add the portable executable file information to the data
df_train_data = pd.concat([_temp_df_train_data, pd.json_normalize(_temp_train)], axis=1)
df_test_data = pd.concat([_temp_df_test_data, pd.json_normalize(_temp_test)], axis=1)

# Replace all NaN values with 0
df_train_data = df_train_data.fillna(0)
df_test_data = df_test_data.fillna(0)

# Exclude columns with 2 or fewer unique values
_cols = df_train_data.columns[df_train_data.nunique() <= 2].tolist()
df_train_data = df_train_data.drop(columns=_cols)
df_test_data = df_test_data.drop(columns=_cols)

# Exclude columns that end with 'Offset' (this removes a lot of useless data)
_cols = [_col for _col in df_train_data.columns if _col.endswith('Offset')]
df_train_data = df_train_data.drop(columns=_cols)
df_test_data = df_test_data.drop(columns=_cols)

In [None]:
# Preview training data
df_train_data.head()

In [None]:
pd.set_option('display.max_rows', 500)

# Some of the columns' data types are numerical, but many are strings (object)
# We may want to one-hot encode the columns whose data types are not numeric
# https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics
df_train_data.dtypes

In [None]:
# Or... we can just drop non-numeric data for simplicity
_cols = df_train_data.select_dtypes(exclude=np.number).drop(columns='path')
df_train_data = df_train_data.drop(columns=_cols)
df_test_data = df_test_data.drop(columns=_cols)

# All of the columns should be numeric except for `path`
df_train_data.dtypes

In [None]:
# Checkpoint progress
df_train_data.to_csv(csv_train_data_path, index=False)
df_test_data.to_csv(csv_test_data_path, index=False)

*** IMPORTANT ***

The cells above will generate the csv files needed for training and testing. After running the cells above once, you don't need to run them again.

In [None]:
# Load save state
df_train_data = pd.read_csv(csv_train_data_path)
df_test_data = pd.read_csv(csv_test_data_path)

df_train_labels = pd.read_csv(csv_train_labels_path)
df_test_labels = pd.read_csv(csv_test_labels_path)

In [None]:
# Z-score normalize all the data (except for `path`, which isn't numeric)
scaler = StandardScaler()
x_train = scaler.fit_transform(df_train_data.drop(columns=['path']))
x_test = scaler.transform(df_test_data.drop(columns=['path']))

# Preview the input data
x_train[:5]

In [None]:
# Get the training labels
y_train = np.array(df_train_labels['malicious'])
y_test = np.array(df_test_labels['malicious'])

# Verify that the training and testing datasets are balanced
print(df_train_labels['malicious'].value_counts(), end='\n\n')
print(df_test_labels['malicious'].value_counts())

In [None]:
models = {
    'Adaptive Boosting Classifier': AdaBoostClassifier(random_state=seed),
    'GBM': GradientBoostingClassifier(random_state=seed),
    'Histogram-based GBM': HistGradientBoostingClassifier(random_state=seed),
    'Random Forest Classifier': RandomForestClassifier(random_state=seed),
    'MLP': MLPClassifier(max_iter=1000, random_state=seed),
    'C-SVC': SVC(random_state=seed),
}

for name, model in models.items():
    # Train the model
    model = model.fit(x_train, y_train)
    # Test the model
    y_pred = model.predict(x_test)
    # Print the results of the test
    print(f'{name}: {accuracy_score(y_test, y_pred)}')

In [None]:
def scan_file(filename):
    #grab file object, give to AI, return either malware or not
    output = filename #placeholder line
    return output

In [None]:
layout = [[sg.Text('What file do you want to scan?')], 
          [sg.Input(key='-IN-', enable_events=True), sg.FileBrowse(target='-IN-', initial_folder='Downloads')],
          [sg.Text(key='-OUT-', size=(50,10), text='')],
          [sg.Button('Exit')]]
window = sg.Window('File Scan', layout)

while True:
    event, values = window.read()
    if event == sg.WIN_CLOSED or event == 'Exit':
        break
    if event == '-IN-':
        filename = values[event]
        output = scan_file(filename)
        window['-OUT-'].update(output)

window.close()