In [None]:
import sys, os
sys.path.append(os.path.abspath('..'))
from utils.clean_opcode_from_file import process_file
from utils.pre_process import clean_tokens

Use this if you want to get dataset from an online source (like Hugging Face). 

In [None]:
from datasets import load_dataset

hf_dataset = load_dataset('YOUR_REMOTE_DATASET_NAME')

You can also use local train samples - save them in the `model_training/data/train` directory and update the classification labels.

In [4]:
directory = '../model_training/data/'
malicious_directory_path = os.path.join(directory, 'train/malicious')
not_malicious_directory_path = os.path.join(directory, 'train/not_malicious')

train_from_files_true = []
train_from_files_false = []

    
for filename in os.listdir(malicious_directory_path):
    data = {
        'text': process_file(os.path.join(malicious_directory_path, filename)),
        'label': True
    }
    train_from_files_true.append(data)

for filename in os.listdir(not_malicious_directory_path):
    data = {
        'text': process_file(os.path.join(not_malicious_directory_path, filename)),
        'label': False
    }
    train_from_files_false.append(data)




Update classification labels for downloaded dataset (if you're using a dataset from an online source)

In [5]:
imported_false =[]
imported_true = []
for item in hf_dataset['train']:
    if item['malicious'] == True:
        imported_true.append({
            'text': item['decompiled_opcodes'],
            'label': True
        })
    else:
        imported_false.append({
            'text': item['decompiled_opcodes'],
            'label': False
        })



In [6]:
# Create equal sized train set
train_true = imported_true + train_from_files_true
train_false = (imported_false + train_from_files_false)[:len(train_true)]
train_set = train_true + train_false


Get test samples

In [7]:
directory = '../model_training/data/'
malicious_directory_path = os.path.join(directory, 'test/malicious')
not_malicious_directory_path = os.path.join(directory, 'test/not_malicious')

test_from_files = []


for filename in os.listdir(malicious_directory_path):
    data = {
        'text': process_file(os.path.join(malicious_directory_path, filename)),
        'label': True
    }
    test_from_files.append(data)

for filename in os.listdir(not_malicious_directory_path):
    data = {
        'text': process_file(os.path.join(not_malicious_directory_path, filename)),
        'label': False
    }
    test_from_files.append(data)

Data pre prossesing - clean opcodes and tokenize

In [8]:
import pandas as pd

pp_train_set = {
    'text': [None] * len(train_set),
    'label': [None] * len(train_set)
}
pp_test_set = {
    'text': [None] * len(test_from_files),
    'label': [None] * len(test_from_files)
}


for index in range (len(train_set)):
    pp_train_set['text'][index] = clean_tokens(train_set[index]['text'].split())
    pp_train_set['label'][index] = train_set[index]['label']
    
for index in range (len(test_from_files)):
    pp_test_set['text'][index] = clean_tokens(test_from_files[index]['text'].split())
    pp_test_set['label'][index] = test_from_files[index]['label']

In [9]:
df_train = pd.DataFrame(pp_train_set)
df_test = pd.DataFrame(pp_test_set)


In [10]:
text_train =[" ".join(innertext) for innertext in df_train['text']]
text_test =[" ".join(innertext) for innertext in df_test['text']]



Tokenize text

In [11]:
from sklearn.feature_extraction.text import TfidfVectorizer


tfidf = TfidfVectorizer(ngram_range=(1, 6))
X_train = tfidf.fit_transform(text_train)
X_test= tfidf.transform(text_test)
y_labels_train = df_train['label']
y_labels_test= df_test['label']


Save vectorizer

In [12]:
import pickle
from datetime import datetime

current_datetime = datetime.now().strftime("%Y-%m-%d")

filename = f'./models/vectorizer{current_datetime}.pkl'
with open(filename, 'wb') as file:
    pickle.dump(tfidf, file)

Two trainning algorithms - logistic regression and SGD

Train with logistic regression

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression

# Split the data
X_trained, X_validate, y_train, y_validate = train_test_split(X_train, y_labels_train, test_size=0.2, random_state=42)

# Initialize and train the model
clf = LogisticRegression(max_iter=1000)
clf.fit(X_trained, y_train)

# Evaluate the model
y_pred = clf.predict(X_validate)
print("Accuracy:", accuracy_score(y_validate, y_pred))


OR Use SGDClassifier

In [None]:
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression

# Split the data
X_trained, X_validate, y_train, y_validate = train_test_split(X_train, y_labels_train, test_size=0.2, random_state=42)

# Initialize and train the classifier
clf = SGDClassifier(loss='log_loss', random_state=42, max_iter=1000, tol=1e-3)
clf.fit(X_trained, y_train)

# Evaluate the model
y_pred = clf.predict(X_validate)
print("Accuracy:", accuracy_score(y_validate, y_pred))


In [17]:
import pickle
from datetime import datetime

current_datetime = datetime.now().strftime("%Y-%m-%d")

filename = f'./models/model{current_datetime}.pkl'
with open(filename, 'wb') as file:
    pickle.dump(clf, file)

In [None]:
y_pred_test = clf.predict(X_test)
print("Accuracy:", accuracy_score(y_labels_test, y_pred_test))
print([f'Prediction: {y_pred_test[i]}, Actual: {y_labels_test[i]}' for i in range(len(y_pred_test))])