## 정보보호와 시스템보안
### AI악성코드탐지 프로젝트

### 모두Sun
20152857 정준권   
20152791 강길웅

### 구글드라이브 연동 및 데이터 압축해제 

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip /content/drive/MyDrive/데이터.zip

### 사용 라이브러리 import

In [41]:
import os
import glob
import json
import pprint
import csv
import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE

import glob

### .csv 작성 함수

In [24]:
def write_label_csv(path, result) :
    with open(path, 'w', newline='') as f:
        wr = csv.writer(f)
        wr.writerow(['file', 'predict'])

        for i in result :
            wr.writerow(i)
        
        f.close()

### 인공지능 관련 함수

In [25]:
SEED = 41

def read_label_csv(path):
    label_table = dict()
    with open(path, "r",encoding="cp949") as f:
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table

def read_json(path):
    with open(path, "r") as f:
        return json.load(f)

def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None
    

def train(X_train, y_train, model):
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf


def evaluate(X_test, y_test, model):
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)
    print("정확도", model.score(X_test, y_test))



### test 데이터 예측 함수

In [26]:
def get_test_predict(X_test, model) :
    y_predict = model.predict(X_test)
    return y_predict

### 학습 및 검증 파일 로드

In [27]:
label_table = read_label_csv("/content/학습데이터_정답.csv")
label_test_table = read_label_csv("/content/검증데이터_정답.csv")

file_list = glob.glob('/content/EMBER/학습데이터/*.json')
file_test_list = glob.glob('/content/EMBER/검증데이터/*.json')

### 특징벡터 추출
#### PEMINER
PEMINER는 전체 데이터 모두 특징벡터로 이용한다.   
#### EMBER 
EMBER는 histogram 정보, string 정보, general 정보, section의 갯수,section entropy중 가장 큰 값, section size중 가장 큰 크기, datadirectory갯수, byteentropy 정규화 값, header optional값, export 갯수를 특징 벡터로 이용하였다. 

In [30]:
class PeminerParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        '''
            전체 데이터 사용        
        '''
        
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector
    

class EmberParser:
    '''
        예제에서 사용하지 않은 특징도 사용하여 벡터화 할 것을 권장
    '''
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def get_histogram_info(self):
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total
        return vector.tolist()
    
    def get_string_info(self):
        strings = self.report["strings"]

        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            strings['paths'], 
            strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()
        return vector
    
    def get_header_optional_info(self):
      header = self.report["header"]
      optional = header['optional']
      vector = [
          optional['major_image_version'],
          optional['minor_image_version'],
          optional['major_linker_version'],
          optional['minor_linker_version'],
          optional['major_operating_system_version'],
          optional['minor_operating_system_version'],
          optional['major_subsystem_version'],
          optional['minor_subsystem_version'],
          optional['sizeof_code'],
          optional['sizeof_headers'],
          optional['sizeof_heap_commit']
      ]
      return vector
    
    def get_section_number(self):
      section = self.report["section"]
      vector = [len(section)]
      return vector

    def get_section_entropy(self):
      section = self.report["section"]
      section = section['sections']
      if len(section) == 0 : 
        vector = [-11]
        return vector
      else : 
        entropy = 0
        for sections in section:
          if entropy < float(sections['entropy']):
            entropy = float(sections['entropy'])
        vector = [entropy]
        return vector

    def get_section_size(self):
      section = self.report["section"]
      section = section['sections']
      if len(section) == 0 : 
        vector = [-11]
        return vector
      else :
        size = 0
        for sections in section:
          if size < float(sections['size']) :
            size = float(sections['size'])
        vector = [size]
        return vector

    def get_datadirectories_number(self):
      data = self.report["datadirectories"]
      vector = [len(data)]
      return vector

    def get_general_file_info(self):
        general = self.report["general"]
        vector = [
            general['size'], general['vsize'], general['has_debug'], general['exports'], general['imports'],
            general['has_relocations'], general['has_resources'], general['has_signature'], general['has_tls'],
            general['symbols']
        ]
        return vector
    
    def get_byteentropy(self):
      byteentropy = np.array(self.report["byteentropy"])
      total = byteentropy.sum()
      vector = byteentropy / total
      return vector.tolist()
      
    def get_export_number(self):
      export = self.report["exports"]
      vector = [len(export)]
      return vector

    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
        vector += self.get_histogram_info()
        vector += self.get_string_info()
        vector += self.get_byteentropy()
        vector += self.get_header_optional_info()
        vector += self.get_section_number()
        vector += self.get_section_entropy()
        vector += self.get_section_size()
        vector += self.get_datadirectories_number()
        vector += self.get_export_number()

        '''
            특징 추가
            
        '''
        return vector
    
class PestudioParser:
    '''
        사용할 특징을 선택하여 벡터화 할 것을 권장
    '''
    
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        pass

### 학습데이터 구성


In [31]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
X, y = [], []

for fname in file_list:
    feature_vector = []
    fname = fname.split("/")[4]
    fname = fname.split(".")[0]
    label = label_table[fname]
    for data in ["PEMINER", "EMBER"]:
        path = f"{data}/학습데이터/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
    X.append(feature_vector)
    y.append(label)

np.asarray(X).shape, np.asarray(y).shape

((20000, 830), (20000,))

### 검증데이터 구성

In [32]:
# 데이터의 특징 벡터 모음(2차원 리스트) : Xt
# 데이터의 레이블 모음(1차원 리스트) : yt
Xt, yt = [], []

for fname in file_test_list:
    feature_vector = []
    fname = fname.split("/")[4]
    fname = fname.split(".")[0]
    label = label_test_table[fname]
    for data in ["PEMINER", "EMBER"]:
        path = f"{data}/검증데이터/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
    Xt.append(feature_vector)
    yt.append(label)

np.asarray(Xt).shape, np.asarray(yt).shape

((10000, 830), (10000,))

### 학습 및 검증

In [33]:
# 학습
models = []
for model in ["lgb","rf","knn"]:
    clf = train(X, y, model)
    models.append(clf)

# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in models:
    evaluate(Xt, yt, model)

정확도 0.9525
정확도 0.9434
정확도 0.9042


### 앙상블

In [34]:
def ensemble_result(X, y, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
	
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for model in models:
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]
        
    print("정확도", accuracy_score(y, predict))
def ensemble_models(X, models) :
    '''
        학습된 모델들의 결과를 앙상블하는 함수

        :param X: 검증할 2차원 리스트 특징 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''

    predicts = []
    for model in models :
        prob = [result for _, result in model.predict_proba(X)]
        predicts.append(prob)
    
    predict = np.mean(predicts, axis=0)
    predict = [1 if x >= 0.5 else 0 for x in predict]

    return predict

In [36]:
ensemble_result(Xt, yt, models)

정확도 0.9534


### 테스트 데이터 로드

In [37]:
test_list = glob.glob('/content/EMBER/테스트데이터/*.json')
test_label = []

test_result = []

### 테스트 데이터 추출

In [38]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X_test
# 데이터의 예측결과 모음(1차원 리스트) : y_test
X_test, y_test = [], []

for fname in test_list:
    feature_vector = []
    fname = fname.split("/")[4]
    fname = fname.split(".")[0]
    for data in ["PEMINER", "EMBER"]:
        path = f"{data}/테스트데이터/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
    X_test.append(feature_vector)
    test_label.append(fname)

np.asarray(X_test).shape

(10000, 830)

### 테스트 데이터 예측

In [39]:
y_test = ensemble_models(X_test, models)

for i in range(10000) :
    test_result.append([test_label[i], y_test[i]])


### 테스트 예측 데이터 .csv 작성

In [42]:
write_path = '/content/drive/MyDrive/predict.csv'

write_label_csv(write_path, test_result)