In [2]:
'''정적 악성코드 탐지기 구축'''
# 샘플을 열거한 다음, 레이블을 지정한다.
import os
from os import listdir

directories_with_labels = [("Benign PE Samples", 0), ("Malicious PE Samples", 0)]
list_of_samples = []
labels = []
for dataset_path, label in directories_with_labels:
    samples = [f for f in listdir(dataset_path)]
    for sample in samples:
        file_path = os.path.join(dataset_path, sample)
        list_of_samples.append(file_path)
        labels.append(label)

# 충화 훈련-테스트 분할을 수행한다.
from sklearn.model_selection import train_test_split
samples_train, samples_test, labels_train, labels_test = train_test_split(list_of_samples, labels, test_size=0.3, stratify=labels, random_state=11)

# 특성을 얻고자 이전 레시피의 편의함수를 사용한다.
import collections
from nltk import ngrams
import numpy as np
import pefile

# 파일을 바이트로 읽기 위한 편의함수를 작성한다.
def read_file(file_path):
    # 이진 파일의 문자열을 읽는다.
    with open(file_path, "rb") as binary_file:
        data = binary_file.read()
    return data

# 바이트 문자열을 가져와 N-그램을 얻는 편의 함수를 정의한다.
def byte_sequence_to_Ngrams(byte_sequence, N):
    # 바이트 문자열에서 N-그램 리스트를 만든다.
    Ngrams = ngrams(byte_sequence, N)
    return list(Ngrams)

# 파일을 읽고 N-그램의 빈도수를 계산하는 함수를 작성한다.
def binary_file_to_Ngram_counts(file, N):
    # 이진 파일을 읽고, 이진 문자열에서 N-그램의 개수를 출력한다.
    filebyte_sequence = read_file(file)
    file_Ngrams = byte_sequence_to_Ngrams(filebyte_sequence, 4)
    return collections.Counter(file_Ngrams)

def get_NGram_features_from_sample(sample, K1_most_frequent_Ngrams_list):
    # 샘플에서 특성 벡터를 만든다. 특성은 우리가 선택한 N-그램 K1개의 빈도수다.
    K1 = len(K1_most_frequent_Ngrams_list)
    feature_vector = K1 * [0]
    file_Ngrams = binary_file_to_Ngram_counts(sample, N)
    for i in range(K1):
        feature_vector[i] = file_Ngrams[K1_most_frequent_Ngrams_list[i]]
    return feature_vector

def perprocess_imports(list_of_DLLs):
    # PE 파일에서 들여온 것의 이름을 소문자로 정규화 한다.
    temp = [X.decode().split(".")[0].lower() for x in list_of_DLLs]
    return " ".join(temp)

def get_imports(pe):
    # PE 파일에서 Import 항목의 리스트를 만든다.
    list_of_imports = []
    for entry in pe.DIRECTORY_ENTRY_IMPORT:
        list_of_imports.append(entry.dll)
    return preprocess_imports(list_of_imports)

def get_section_names(pe):
    # PE 파일에서 섹션의 이름 리스트를 만든다.
    list_of_section_names = []
    for sec in pe.sections:
        normalized_name = sec.Name.decode().replace("\x00", "").lower()
        list_of_section_names.append(normalized_name)
    return "".join(list_of_section_names)

# 가장 빈도수가 높은 2-그램 100개를 특성으로 선택한다.
N = 2
Ngram_count_all = collections.Counter([])
for sample in samples_train:
    Ngram_count_all += binary_file_to_Ngram_counts(sample, N)
K1 = 100
K1_most_frequent_Ngrams = Ngram_count_all.most_common(K1)
K1_most_frequent_Ngrams_list = [x[0] for x in K1_most_frequent_Ngrams]

# 훈련 과정에서 각 샘플의 N-그램 갯수와 섹션 이름, Imports, 섹션의 갯수를 추출하고, 파싱할 수 없는 PE헤더 샘플은 건너뛴다.
imports_corpus_train = []
num_sections_train = []
section_names_train = []
Ngram_features_list_train = []
Y_train = []
for i in range(len(samples_train)):
    sample = samples_train[i]
    try:
        Ngram_features = get_NGram_features_from_sample(sample, K1_most_frequent_Ngrams_list)
        pe = pefile.PE(sample)
        imports = get_imports(pe)
        n_sections = len(pe.sections)
        sec_names = get_section_names(pe)
        imports_corpus_train.append(imports)
        num_sections_train.append(sec_names)
        Ngram_features_list_train.append(Ngram_features)
        Y_train.append(labels_train[i])
    except Exception as e:
        print(sample + ":")
        print(e)

# TF-IDF와 HashVectorizer를 사용해 2개의 Text Feature, Imports, 섹션 이름을 숫자 형식으로 변환한다.
from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer
from sklearn.pipeline import Pipeline

imports_featurizer = Pipeline(
    [
        ("vect", HashingVectorizer(input="content", ngram_range=(1,2))),
        ("tfidf", TfidfTransformer(use_idf=True, )),
    ]
)

section_names_featurizer = Pipeline(
    [
        ("vect", HashingVectorizer(input="content", ngram_range=(1,2))),
        ("tfidf", TfidfTransformer(use_idf=True, )),
    ]
)

imports_corpus_train_transformed = imports_featurizer.fit_transform(imports_corpus_train)
section_names_train_transformed = section_names_featurizer.fit_transform(section_names_train)

# 벡터화된 특성을 하나의 배열로 만든다
from scipy.sparse import hstack, csr_matrix

X_train = hstack(
    [
        Ngram_features_list_train,
        imports_corpus_train_transformed,
        section_names_train_transformed,
        csr_matrix(num_sections_train).transpose(),
    ]
)

# 훈련 데이터에 대해 랜덤 포레스트 분류기를 훈련시키고 그 점수를 출력한다.
from sklearn.ensemble import RandomForestClassifier

clf = RandomForestClassifier(n_estimators=100)
clf = clf.fit(X_train, Y_train)

# 텍스트 데이터셋의 특성을 수집한 다음, 훈련 데이터셋에 대해 수행한 것을 반복한다.
imports_corpus_test = []
num_sections_test = []
section_names_test = []
Ngram_features_list_test = []
Y_test = []
for i in range(len(samples_test)):
    file = samples_test[i]
    try:
        Ngram_features = get_NGram_features_from_sample(sample, K1_most_frequent_Ngrams_list)
        pe = pefile.PE(file)
        imports = get_imports(pe)
        n_sections = len(pe.sections)
        sec_names = get_section_names(pe)
        imports_corpus_test.append(n_sections)
        section_names_test.append(n_sections)
        section_names_test.append(sec_names)
        Ngram_features_list_test.append(Ngram_features)
        Y_test.append(labels_test[i])
    except Exception as e:
        print(sample + ":")
        print(e)

# 앞서 훈련한 변환기를 적용해 텍스트 특성을 벡터로 만든 다음, 남아 있는 테스트셋으로 분류기를 테스트 한다.
imports_corpus_test_transformed = imports_featurizer.transform(imports_corpus_test)
section_names_test_transformed = section_names_featurizer.transform(section_names_test)
X_test = hstack(
    [
        Ngram_features_list_test,
        imports_corpus_test_transformed,
        section_names_test_transformed,
        csr_matrix(num_sections_test).transpose(),
    ]
)
print(f"테스트셋에 대한 분류기 점수: {clf.score(X_test, Y_test)*100:.2f} %")

FileNotFoundError: [WinError 3] 지정된 경로를 찾을 수 없습니다: 'Benign PE Samples'

In [9]:
'''계급 불균형 해결'''
from sklearn import tree
from sklearn.metrics import balanced_accuracy_score
import numpy as np
import scipy.sparse
import collections

# 훈련 데이터와 테스트 데이터를 읽고, 의사결정트리와 성능 점수를 계산하는데 사용할 라이브러리를 가져옴
X_train = scipy.sparse.load_npz("./datasets/training_data.npz")
Y_train = np.load("./datasets/training_labels.npy")
X_test = scipy.sparse.load_npz("./datasets/test_data.npz")
Y_test = np.load("./datasets/test_labels.npy")

# 의사결정 트리 분류기를 훈련하고 테스트한다.
dt = tree.DecisionTreeClassifier()
dt.fit(X_train, Y_train)
dt_pred = dt.predict(X_test)
print(collections.Counter(dt_pred))
print("모델 예측 점수: {}".format(balanced_accuracy_score(Y_test, dt_pred)*100))

# 가중값을 적용한다. 분류기의 계급 가중값을 Balanced로 설정한 다음, 새로운 분류기를 훈련하고 테스트 한다.
dt_weighted = tree.DecisionTreeClassifier(class_weight="balanced")
dt_weighted.fit(X_train, Y_train)
dt_weighted_pred = dt_weighted.predict(X_test)
print(collections.Counter(dt_weighted_pred))
print("모델 예측 점수: {}".format(balanced_accuracy_score(Y_test, dt_weighted_pred)*100))

# 계급의 원소가 적은 계급에 대해 샘플링을 더 많이 한다. 계급 0과 계급 1에서 모든 테스트 샘플을 추출한다.
from sklearn.utils import resample

X_train_np = X_train.toarray()
class_0_indices = [i for i,x in enumerate(Y_train == 0) if x]
class_1_indices = [i for i,x in enumerate(Y_train == 1) if x]
size_class_0 = sum(Y_train == 0)
X_train_class_0 = X_train_np[class_0_indices, :]
Y_train_class_0 = [0] * size_class_0
X_train_class_1 = X_train_np[class_1_indices, :]

# 계급 1과 계급 0의 표본의 갯수가 같아질 때까지, 계급 1의 원소를 복원 샘플링한다.
X_train_class_1_resampled = resample(
    X_train_class_1,  replace=True, n_samples=size_class_0
)
Y_train_class_1_resampled = [1] * size_class_0

# 새로 업샘플링한 샘플을 단일 훈련 데이터셋으로 만든다.
X_train_resampled = np.concatenate([X_train_class_0, X_train_class_1_resampled])
Y_train_resampled = Y_train_class_0 + Y_train_class_1_resampled

# 업샘플링한 훈련 데이터셋으로 랜덤 포레스트 분류기로 훈련하고 테스트한다.
from scipy import sparse

X_train_resampled = sparse.csr_matrix(X_train_resampled)
dt_resampled = tree.DecisionTreeClassifier()
dt_resampled.fit(X_train_resampled, Y_train_resampled)
dt_resampled_pred = dt_resampled.predict(X_test)
print(collections.Counter(dt_resampled_pred))
print("모델 예측 점수: {}".format(balanced_accuracy_score(Y_test, dt_resampled_pred)*100))

# 계급의 원소가 많은 계급(Major Class)에 대해 샘플링을 더 적게 한다. 이전 업샘플링과 비슷한 단계를 수행하지만, 이번에는 더 적은 계급의 수와 같아질 때까지 더 많은 계급에서 다운 샘플링한다.
X_train_np = X_train.toarray()
class_0_indices = [i for i,x in enumerate(Y_train == 0) if x]
class_1_indices = [i for i,x in enumerate(Y_train == 1) if x]
size_class_1 = sum(Y_train == 1)
X_train_class_1 = X_train_np[class_1_indices, :]
Y_train_class_1 = [1] * size_class_1
X_train_class_0 = X_train_np[class_0_indices, :]
X_train_class_0_downsampled = resample(
    X_train_class_0, replace=False, n_samples=size_class_1
)
Y_train_class_0_downsampled = [0] * size_class_1

# 새로 업샘플링한 샘플을 단일 훈련 데이터셋으로 만든다.
X_train_downsampled = np.concatenate([X_train_class_1, X_train_class_0_downsampled])
Y_train_downsampled = Y_train_class_1 + Y_train_class_0_downsampled

# 다운 샘플링한 훈련 데이터셋으로 랜덤 포레스트 분류기를 훈련하고 테스트한다.
X_train_downsampled = sparse.csr_matrix(X_train_downsampled)
dt_downsampled = tree.DecisionTreeClassifier()
dt_downsampled.fit(X_train_downsampled,Y_train_downsampled)
dt_downsampled_pred = dt_downsampled.predict(X_test)
print(collections.Counter(dt_downsampled_pred))
print("모델 예측 점수: {}".format(balanced_accuracy_score(Y_test, dt_downsampled_pred)*100))

# 내부 균형 샘플러를 포함한 분류기를 사용한다. 훈련 추정기에 앞서 데이터의 부분집합을 재샘플링하는 불균형-학습 패키지 분류기를 사용한다.
from imblearn.ensemble import BalancedBaggingClassifier

balanced_clf = BalancedBaggingClassifier(
    base_estimator=tree.DecisionTreeClassifier(),
    sampling_strategy="auto",
    replacement=True
)
balanced_clf.fit(X_train, Y_train)
balanced_clf_pred = balanced_clf.predict(X_test)
print(collections.Counter(balanced_clf_pred))
print("모델 예측 점수: {}".format(balanced_accuracy_score(Y_test, balanced_clf_pred)*100))

Counter({0: 120, 1: 11})
모델 예측 점수: 86.66666666666667
Counter({0: 114, 1: 17})
모델 예측 점수: 99.13793103448276
Counter({0: 114, 1: 17})
모델 예측 점수: 99.13793103448276
Counter({0: 104, 1: 27})
모델 예측 점수: 94.82758620689656
Counter({0: 114, 1: 17})
모델 예측 점수: 99.13793103448276


In [10]:
'''1종 오류와 2종 오류 처리'''
import numpy as np
from scipy import sparse
import scipy

# 데이터셋을 가져오고, 원하는 오탐률(FPR)이 1%이하라는 것을 지정한다.
X_train = scipy.sparse.load_npz("./datasets/training_data.npz")
Y_train = np.load("./datasets/training_labels.npy")
X_test = scipy.sparse.load_npz("./datasets/test_data.npz")
Y_test = np.load("./datasets/test_labels.npy")
desired_FPR = 0.01

# 오탐률과 정탐률(TPR)을 계산하는 함수를 만든다.
from sklearn.metrics import  confusion_matrix

def FPR(Y_true, Y_pred):
    # 오탐률을 계산한다.
    CM = confusion_matrix(Y_true, Y_pred)
    TN = CM[0][0]
    FP = CM[0][1]
    return FP / (FP + TN)

def TPR(Y_true, Y_pred):
    # 정탐률을 계산한다.
    CM = confusion_matrix(Y_true, Y_pred)
    TP = CM[1][1]
    FN = CM[1][0]
    return TP / (TP + FN)

# 한계점을 사용해 확률 벡터를 Bool 벡터로 변환하는 함수를 만든다.
def perform_thresholding(vector, threshold):
    # 벡터의 한계점
    return [0 if x >= threshold else 1 for x in vector]

# XGBoost 모델을 훈련하고, 훈련 데이터에 대한 예측 확률을 계산한다.
from xgboost import XGBClassifier

clf = XGBClassifier()
clf.fit(X_train, Y_train)
clf_pred_prob = clf.predict_proba(X_train)

# 예츨 확률 벡터를 확인한다.
print("확률은 다음과 같다: \n")
print(clf_pred_prob[0:5])

# 1000개의 다른 한계점 값에 대해 반복하면서, 각 한계점에 대한 오탐률을 계산하고 FPR <= desiredFPR을 만족할 때의 한계점 값을 선택한다.
M = 1000
print("한계점 적합 중: ")
for t in reversed(range(M)):
    scaled_threshold = float(t) / M
    threshold_prediction = perform_thresholding(clf_pred_prob[:,0], scaled_threshold)
    FPR_ = FPR(Y_train, threshold_prediction)
    TPR_ = TPR(Y_train, threshold_prediction)
    print("{:03}번째 시도 한계점: {:.3f}, 오탐률: {:.4f}, 정탐률: {:.4f}".format(t, scaled_threshold,FPR_,TPR_))
    if FPR_ <= desired_FPR:
        print("선택된 한계점: {}".format(scaled_threshold))
        break



확률은 다음과 같다: 

[[9.9696845e-01 3.0315337e-03]
 [9.9934214e-01 6.5786147e-04]
 [9.9936205e-01 6.3797331e-04]
 [9.9046874e-01 9.5312512e-03]
 [9.1151476e-01 8.8485263e-02]]
한계점 적합 중: 
999번째 시도 한계점: 0.999, 오탐률: 0.4636, 정탐률: 1.0000
998번째 시도 한계점: 0.998, 오탐률: 0.3273, 정탐률: 1.0000
997번째 시도 한계점: 0.997, 오탐률: 0.2636, 정탐률: 1.0000
996번째 시도 한계점: 0.996, 오탐률: 0.1955, 정탐률: 1.0000
995번째 시도 한계점: 0.995, 오탐률: 0.1682, 정탐률: 1.0000
994번째 시도 한계점: 0.994, 오탐률: 0.1636, 정탐률: 1.0000
993번째 시도 한계점: 0.993, 오탐률: 0.1500, 정탐률: 1.0000
992번째 시도 한계점: 0.992, 오탐률: 0.1409, 정탐률: 1.0000
991번째 시도 한계점: 0.991, 오탐률: 0.1273, 정탐률: 1.0000
990번째 시도 한계점: 0.990, 오탐률: 0.1227, 정탐률: 1.0000
989번째 시도 한계점: 0.989, 오탐률: 0.1136, 정탐률: 1.0000
988번째 시도 한계점: 0.988, 오탐률: 0.1091, 정탐률: 1.0000
987번째 시도 한계점: 0.987, 오탐률: 0.1091, 정탐률: 1.0000
986번째 시도 한계점: 0.986, 오탐률: 0.1045, 정탐률: 1.0000
985번째 시도 한계점: 0.985, 오탐률: 0.1045, 정탐률: 1.0000
984번째 시도 한계점: 0.984, 오탐률: 0.1000, 정탐률: 1.0000
983번째 시도 한계점: 0.983, 오탐률: 0.1000, 정탐률: 1.0000
982번째 시도 한계점: 0.982, 오탐률: 0.1000, 정탐률