In [1]:
data_path = '../데이터'

# from google.colab import drive
# drive.mount('/content/drive/')
# data_path = '/content'

# !unzip /content/drive/MyDrive/AI-based_Malware_Detection/데이터.zip

In [2]:
import os
import glob
import json
import pprint

import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE

In [3]:
SEED = 41

# 실제 데이터 중 정답파일에 대해 딕셔너리 형태로 만들어서 
# key를 파일이름으로 만들어주고, 값으로 라벨의 값이 저장되는 딕셔너리 생성
def read_label_csv(path):
    label_table = dict()
    with open(path, "r",encoding='ISO-8859-1') as f:
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table

# json파일 불러오는 함수
def read_json(path): 
    with open(path, "r") as f:
        return json.load(f)

# 우리가 사용하게 될 머신런닝들을 정의하는 함수 -> 밑에 "학습 및 검증" 부분이다
def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None
    
# 모델을 정의한것으로 학습데이터로 학습 시키는 함수
def train(X_train, y_train, model): 
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf

# 정확도를 선출하는 함수
def evaluate(X_test, y_test, model): 
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)
    print(model, "정확도", model.score(X_test, y_test))


## 레이블 테이블 로드

In [4]:
label_table = read_label_csv(data_path+"/학습데이터_정답.csv")
label_table2 = read_label_csv(data_path+"/검증데이터_정답.csv")

## 특징 벡터 생성 예시
- PEMINER 정보는 모두 수치형 데이터이므로 특별히 가공을 하지 않고 사용 가능
- EMBER, PESTUDIO 정보는 가공해서 사용해야 할 특징들이 있음 (e.g. imports, exports 등의 문자열 정보를 가지는 데이터)
- 수치형 데이터가 아닌 데이터(범주형 데이터)를 어떻게 가공할 지가 관건 >> 인코딩 (e.g. 원핫인코딩, 레이블인코딩 등)

### PEMINER 전체 데이터 사용

In [5]:
class PeminerParser: #모든 데이터
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        '''
            전체 데이터 사용        
        '''
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector
    

### EMBER 특징 추출

In [6]:
# 미리 사용된 특징들 같은경우 
class EmberParser:
    '''
        예제에서 사용하지 않은 특징도 사용하여 벡터화 할 것을 권장
    '''
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    # histogram 특징 추출
    def get_histogram_info(self):
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total # 평균값으로 특징 가공
        return vector.tolist()
    
    # strings 특징 추출
    # 문자열 수, 평균 길이, 문자 histogram, URLS 등과 같은 다양한 패턴과 일치하는 문자열의 수
    def get_string_info(self):
        strings = self.report["strings"]

        # printables의 크기가 0이상이면 실수형태로 저장
        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            # strings['paths'], 
            # strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        # printabledist의 평균값을 리스트로 저장
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()
        return vector
    
    # general 특징 추출
    # import, export, symbol의 수와 파일의 relocation, resources, signature 등이 있는지 여부에 대한 수 
    def get_general_file_info(self):
        general = self.report["general"]
        vector = [
            general['size'], 
            general['vsize'], 
            general['has_debug'], 
            general['exports'], 
            general['imports'],
            general['has_relocations'], 
            general['has_resources'], 
            general['has_signature'], 
            general['has_tls'],
            general['symbols']
        ]
        return vector

    # 특징 추가
    def get_histogram_max(self):
        histogram = np.array(self.report["histogram"])
        vector = [np.max(histogram)] # 최대값으로 특징 가공
        return vector

    def get_byteentropy_info(self):
        byteentropy = np.array(self.report["byteentropy"])
        total = byteentropy.sum()
        vector = byteentropy / total
        return vector.tolist()
        
    def get_byteentropy_max(self):
        byteentropy = np.array(self.report["byteentropy"])
        vector = [np.max(byteentropy)]
        return vector

    def get_byteennum(self):
        byteentropy = np.array(self.report["byteentropy"])
        vector = [len(byteentropy)]
        return vector

    #byteentropy 특징 벡터 추가
    def get_byteentropy(self): 
        byteentropy = np.array(self.report["byteentropy"])
        vector = byteentropy
        return vector.tolist()

    def get_string_max(self):
        strings = np.array(self.report["strings"]['printabledist'])
        vector = [np.max(strings)]
        return vector

    # 파일에 컴파일된 시스템에 대한 세부정보이다.
    # 링커, 이미지, 운영체제 버전 등등을 제공한다.
    def get_header_info(self): #header 추가
        header = self.report['header']
        vector = [
            header['coff']['timestamp'],
            header['coff']['machine'],
            header['coff']['characteristics'],
            header['optional']['subsystem'],
            header['optional']['dll_characteristics'],
            header['optional']['magic'],
            header['optional']['major_image_version'],
            header['optional']['minor_image_version'],
            header['optional']['major_linker_version'],
            header['optional']['minor_linker_version'],
            header['optional']['major_operating_system_version'],
            header['optional']['minor_operating_system_version'],
            header['optional']['major_subsystem_version'],
            header['optional']['minor_subsystem_version'],
            header['optional']['sizeof_code'],
            header['optional']['sizeof_headers'],
            header['optional']['sizeof_heap_commit'],
        ]
        return vector

    def get_sizeof_code(self):
        header = np.array(self.report["header"]["optional"]["sizeof_code"])
        vector = [float(header)]
        return vector

    # section에 대한 정보들 
    # 섹션의 이름, 크기, 엔트로피 및 각 섹션에 대해 주어진 기타 정보를 가진 모든 섹션의 목록
    def get_section_number(self):
        section = self.report["section"]
        vector = [len(section)]
        return vector

    def get_sectionsize_min(self):
        section = np.array(self.report["section"]["sections"])
        size=[999999999]
        for sections in section:
            size += sections["size"]
        vector = [min(size)]
        return vector

    def get_section_size(self):
        section = self.report["section"]['sections']
        size = [-1]
        for sections in section:
            size += [sections["size"]]
        vector = [max(size)]
        return vector
        
    def get_sectionvsize_max(self):
        section = np.array(self.report["section"]["sections"])
        vsize=[999999999]
        for sections in section:
            vsize += [sections["vsize"]]
        vector = [min(vsize)]
        return vector

    def get_entropy_max(self):
        section = np.array(self.report["section"]["sections"])
        entropy=[999999999]
        for sections in section:
            entropy += [sections["entropy"]]
        vector = [max(entropy)]
        return vector

    def get_bigentropy_len(self):
        section = np.array(self.report["section"]["sections"])
        cnt = 0
        for sections in section:
            if sections["entropy"] >= 6:
                cnt += 1
        vector = [cnt]
        return vector

    def get_section_info(self):
        section = self.report['section']['sections']
        vector = []
        for sections in section:
            vector += [
                sections['size'],
                sections['entropy'],
                sections['vsize'],
            ]
        return vector

    def numimport(self):
        imports = self.report["imports"]
        vector = [len(imports)]
        return vector

    def numexport(self):
        exports = self.report["exports"]
        vector = [len(exports)]
        return vector

    def get_datadirnum(self):
        datadirectories = np.array(self.report["datadirectories"])
        vector = [len(datadirectories)]
        return vector

    def get_datadirsize_max(self):
        datadirectories = np.array(self.report["datadirectories"])
        size=[-1]
        for data in datadirectories:
            size.append(data["size"])
        vector = [max(size)]
        return vector

    # datadirectories size 특징 추가
    def get_datadirectories(self): 
        datadirectories = self.report["datadirectories"]
        vector = []
        for data in datadirectories:
            # vector += data["size"], data["virtual_address"]
            vector += [data["size"] if data["size"]>100 else 0]
            vector += [data["virtual_address"] if data["virtual_address"]>100000 else 0]
        return vector


    def process_report(self):
        vector = []
        # 선택하지 않은 feature
        # vector += self.get_histogram_info()
        # vector += self.get_string_info()
        # vector += self.get_byteentropy_info()
        # vector += self.get_byteentropy_max()
        # vector += self.get_sizeof_code()
        # vector += self.get_section_size()
        # vector += self.get_sectionvsize_max()
        # vector += self.numimport()       
        # vector += self.numexport()
        # vector += self.get_datadirsize_max()

        # 최종 선택 feature
        vector += self.get_general_file_info()
        vector += self.get_histogram_max() 
        vector += self.get_byteennum()
        vector += self.get_byteentropy() 
        vector += self.get_string_max()   
        vector += self.get_section_number()
        vector += self.get_entropy_max() 
        vector += self.get_bigentropy_len()
        vector += self.get_datadirnum()

        return vector


### PESTUDIO 특징 추출

In [7]:
class PestudioParser:
    def __init__(self, path):
        try:
          self.report = read_json(path)
        except:
          self.report = None
        self.vector = []
    '''
        사용할 특징을 선택하여 벡터화 할 것을 권장
    '''
    def overview_entropy(self):
        try:
            image = self.report["image"]["overview"]["entropy"]
            vector = [float(image)]
        except:
            vector = [-1]
        return vector

    def get_indicators_info(self):
        try:
            image = self.report["image"]["indicators"]["indicator"]
            cnt = 0
            for images in image:
                if(images["@severity"] == "1" or images["@severity"] == "2"):
                    cnt += 1
            vector = [cnt]
        except:
            vector = [-1]
        return vector

    #indicators 필드의 severity 검사. 1인경우 위험도가 가장 높음
    #severity의 평균적인 값을 구함
    def get_indicators_ave(self):
        try:
            image = self.report['image']['indicators']['indicator']
            avg = 0
            for images in image:
                avg += int(images['@severity'])
            vector = [avg/len(image)]
        except:
            vector = [0]
        return vector


    def numsection(self):
        try:
            image = self.report["image"]["sections"]["section"]
            vector = [(len(image))]
        except:
            vector = [-1]
        return vector

    def blacklist(self):
        try:
            image = self.report["image"]["sections"]["section"]
            cnt = 0
            for images in image:
                if images["@blacklisted"] == "x":
                    cnt += 1
            vector = [cnt]
        except:
            vector = [-1]
        return vector

    def libraries_blacklist(self):
        try:
            image = self.report["image"]["libraries"]["library"]
            cnt = 0
            for images in image:
                if images["@blacklist"] == "x":
                    cnt += 1
            vector = [cnt]
        except:
            vector = [-1]
        return vector

    # 악성코드에서 자주 사용하는 API에 대한 blacklist를 제공하고, 그 리스트에 따라 API가 분류된다.
    def import_blacklist(self):
        try:
            image = self.report["image"]["imports"]["import"]
            cnt = 0
            for images in image:
                if images["@blacklist"] == "x":
                    cnt += 1
            vector = [cnt]
        except:
            vector = [-1]
        return vector

    def size(self):
        try:
            image = self.report["image"]["resources"]["instance"]
            size = []
            for images in image:
                size.append(int(images['@size']))
            vector = [max(size)]
        except:
            vector = [-1]
        return vector     

    def entropy(self):
        try:
            image = self.report["image"]["resources"]["instance"]
            entropy = []
            for images in image:
                entropy.append(int(images['@entropy']))
            vector = [max(entropy)]
        except:
            vector = [-1]
        return vector   

    def get_debug(self):
        try:
            image = self.report["image"]["debug"]  
            vector = [0 if image == 'n/a' else 1]
        except:
            vector = [-1]
        return vector

    def get_string_leng(self):
        try:
            image = self.report["image"]['strings']['ascii']['string']
            vector = [len(image)]
        except:
            vector = [-1]
        return vector

    def string_size_max(self):
        try:
            image = self.report["image"]["strings"]["string"]
            size = [-1]
            for images in image:
                size.append(int(images['@size']))
            vector = [max(size)]
        except:
            vector = [-1]
        return vector

    # tls callback을 이용한 악성코드가 존재한다
    # 악성 코드를 파일의 이 부분에 저장하여 애플리케이션이 프로세스를 시작하기 전에 악용하는 경우가 있음
    def get_tls_callbacks(self):
        try:
            image = self.report["image"]["tls-callbacks"]
            vector = [0 if image == 'n/a' else 1]
        except:
            vector = [-1]
        return vector

    # 인증서가 있다면 신뢰할 수 있기에 검사함
    def get_certificate(self):
        try:
            image = self.report["image"]['certificate']
            vector = [0 if image == 'n/a' else 1]
        except:
            vector = [-1]
        return vector

    def get_overlay(self):
        try:
            image = self.report["image"]["overlay"]
            vector = [0 if image == 'n/a' else 1]
        except:
            vector = [-1]
        return vector   

    # checksum이 0인지 여부를 체크함
    def checksum_compare(self):
        try:
            image = self.report['image']['optional-header']['file-checksum']
            if image == '0x00000000':
                vector = [0]
            else:
                vector = [2]
        except:
            vector = [1]
        return vector

    # section 필드를 검사함
    # txt 쓰기 권한이 있는 경우 악성 코드일 가능성이 있다.
    def get_section(self):
        try:
            section = self.report['image']['sections']['section'][0]
            if section['@writable'] == "x": 
                vector = [1]
            else:
                vector = [0]
        except:
            vector = [0]
        return vector

    

    # string 필드에 network가 많이 나오면 악성코드 가능성 증가
    def get_string_network(self):
        try:
            image = self.report['image']['strings']['ascii']['string']
            cnt = 0
            for images in image:
                if images['@group'] == 'network':
                    cnt+=1
            vector = [cnt]
        except:
            vector = [0]
        return vector

    def process_report(self):
        vector = []
        # 선택하지 않은 feature
        # vector += self.get_indicators_info()
        # vector += self.numsection()
        # vector += self.blacklist()
        # vector += self.libraries_blacklist()
        # vector += self.get_debug()
        # vector += self.get_certificate()
        # vector += self.checksum_compare()
        # vector += self.get_section()

        # 최종 선택 feature
        vector += self.overview_entropy()
        vector += self.import_blacklist()
        vector += self.size()
        vector += self.entropy()
        vector += self.get_string_leng()
        vector += self.string_size_max()
        vector += self.get_tls_callbacks()
        vector += self.get_overlay()
        vector += self.get_indicators_ave()
        vector += self.get_string_network()
        
        return vector

## 학습데이터 구성
- 특징 벡터 구성은 2차원이 되어야함 e.g.  [vector_1, vector_2, ..., vector_n]

- 각 벡터는 1차원 리스트, 벡터 크기는 모두 같아야함

In [8]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
# 실제 데이터를 가지고온다
X, y = [], []

path_dir = f"{data_path}/PEMINER/학습데이터"

file_list = os.listdir(path_dir)

for i in range(len(file_list)):
    feature_vector = []
    for data in ["PEMINER","EMBER","PESTUDIO"]:
        path = f"{data_path}/{data}/학습데이터/{file_list[i]}"
        label = label_table[file_list[i].split('.')[0]]
        if data == "PEMINER": 
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            feature_vector += PestudioParser(path).process_report()
            
    X.append(feature_vector)
    y.append(label)

print(np.asarray(X).shape, np.asarray(y).shape)

(20000, 471) (20000,)


### 검증데이터 구성

In [9]:
X_test, y_test = [], []

path_dir = f"{data_path}/PEMINER/검증데이터"
file_list = os.listdir(path_dir)

for i in range(len(file_list)):
    feature_vector = []
    for data in ["PEMINER","EMBER","PESTUDIO"]:
        path = f"{data_path}/{data}/검증데이터/{file_list[i]}"
        label = label_table2[file_list[i].split('.')[0]]
        if data == "PEMINER": 
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            feature_vector += PestudioParser(path).process_report()
    
    X_test.append(feature_vector)
    y_test.append(label)
    
print(np.asarray(X_test).shape, np.asarray(y_test).shape)

(10000, 471) (10000,)


## 학습 및 검증

In [10]:
# 학습 : 피마이너랑 엠버를 학습시켜서 학습된 특징벡터들을 models에 넣는다 
models = []
for model in ["rf", "lgb"]:
    clf = train(X, y, model)
    models.append(clf)

# 검증 피마이너랑 엠버를 각각 검증 
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in models:
    evaluate(X_test, y_test, model)

RandomForestClassifier(n_jobs=4, random_state=41) 정확도 0.9505
LGBMClassifier(random_state=41) 정확도 0.9542


In [11]:
def ensemble_result(X, y, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
        
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]
        
    print("앙상블 후 정확도", accuracy_score(y, predict))

In [12]:
ensemble_result(X_test, y_test, models)

앙상블 후 정확도 0.957


### 테스트 데이터 구성

In [13]:
test_vector = []
test_label = []

path_dir = f"{data_path}/PEMINER/테스트데이터"

file_list = os.listdir(path_dir)

for i in range(len(file_list)):
    feature_vector = []
    for data in ["PEMINER","EMBER","PESTUDIO"]:
        path = f"{data_path}/{data}/테스트데이터/{file_list[i]}"
        label = file_list[i].split('.')[0]
        if data == "PEMINER": 
            feature_vector += PeminerParser(path).process_report()
        elif data == "EMBER":
            feature_vector += EmberParser(path).process_report()
        elif data == "PESTUDIO":
            feature_vector += PestudioParser(path).process_report()
    test_vector.append(feature_vector)
    test_label.append(label)

print(np.asarray(test_vector).shape)

(10000, 471)


### 테스트 데이터 학습 및 앙상블

In [14]:
def ensemble_data(X, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
          
        :param X: 검증할 2차원 리스트 특징 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''

    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [ 1 if x >= 0.5 else 0 for x in predict ]
        
    return predict
  
result = ensemble_data(test_vector, models)

### 테스트 데이터 예측 : CSV 파일 생성

In [15]:
import csv

predict_test = []

for i in range(len(test_label)):
    predict_test.append([test_label[i], result[i]])

path = "./result.csv"

with open(path, 'w', newline='') as f:
    w = csv.writer(f)
    w.writerow(['file', 'predict'])
    for i in predict_test :
        w.writerow(i)
        
    f.close()