<a href="https://colab.research.google.com/github/parkminjin99/18_AI_project/blob/main/18_AI_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!unzip /content/drive/My\ Drive/데이터.zip

In [3]:
import os
import glob
import json
import pprint

import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE

In [6]:
def read_label_csv(path):
    label_table = dict()
    with open(path, "r", encoding = "cp949") as f: #csv파일을 읽는다
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table

def read_json(path):
    with open(path, "r") as f:  #json파일을 읽는다 
        return json.load(f)

In [26]:
learn_label_table = read_label_csv("학습데이터_정답.csv")
verify_label_table = read_label_csv("검증데이터_정답.csv")

특징 벡터 생성 예시

*   PEMINER 정보는 모두 수치형 데이터이므로 특별히 가공을 하지 않고 사용 가능
*   EMBER, PESTUDIO 정보는 가공해서 사용해야 할 특징들이 있음 (e.g. imports, exports 등의 문자열 정보를 가지는 데이터)
*   수치형 데이터가 아닌 데이터(범주형 데이터)를 어떻게 가공할 지가 관건 >> 인코딩 (e.g. 원핫인코딩, 레이블인코딩 등)


In [7]:
ember_path = "EMBER/학습데이터/000c4ae5e00a1d4de991a9decf9ecbac59ed5582f5972f05b48bc1a1fe57338a.json"
peminer_path = "PEMINER/학습데이터/000c4ae5e00a1d4de991a9decf9ecbac59ed5582f5972f05b48bc1a1fe57338a.json"
pestudio_path = "PESTUDIO/학습데이터/bda2ab110f80f6fa79b3d650b6ae4565d8e840d7ba5b69e38c2874f63cc423a8.json"

ember_result = read_json(ember_path)
peminer_result = read_json(peminer_path)
pestudio_result = read_json(pestudio_path)

In [None]:
pprint.pprint(ember_result)

In [None]:
pprint.pprint(peminer_result)

In [None]:
pprint.pprint(pestudio_result,depth = 2)

In [9]:
class PeminerParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        '''
            전체 데이터 사용        
        '''
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector

In [10]:
class EmberParser:
    '''
        예제에서 사용하지 않은 특징도 사용하여 벡터화 할 것을 권장
    '''
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def get_histogram_info(self):
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total
        return vector.tolist()
    
    def get_string_info(self):
        strings = self.report["strings"]

        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            strings['paths'], 
            strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()
        return vector
    
    def get_general_file_info(self):
        general = self.report["general"]
        vector = [
            general['size'], general['vsize'], general['has_debug'], general['exports'], general['imports'],
            general['has_relocations'], general['has_resources'], general['has_signature'], general['has_tls'],
            general['symbols']
        ]
        return vector

    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
        vector += self.get_histogram_info()
        vector += self.get_string_info()
        '''
            특징 추가
        '''
        return vector

In [11]:
class PestudioParser:
    '''
        사용할 특징을 선택하여 벡터화 할 것을 권장
    '''
    
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):
        pass

학습데이터 구성


In [None]:
import os
import queue

peminer_data = []

def get_subdir(path):
    global peminer_data
    try:  # 검색이 허가되지 않은 디렉토리 접근에 관한 예외처리
        dirfiles = os.listdir(path)  # path에 해당하는 디렉토리 dirfiles에 추가
    except PermissionError:
        return []

    subdir_list = []
    for each in dirfiles:
        if each.endswith(".json"):  # .txt로 끝나는 파일은 따로 all_txt리스트에 저장
            peminer_data.append(path + each)
        full_name = path + each  # path와 each(디렉토리)
        if os.path.isdir(full_name):
            subdir_list.append(full_name + "/")
    return subdir_list


dir_queue = queue.Queue()
# /Library/ 아래의 모든 하위 디렉토리 찾기
dir_queue.put("PEMINER/학습데이터/")  # 검색하고자 하는 디렉토리를 줄 앞에 세움

while not dir_queue.empty():  # 큐가 비어있는 상태인지 확인
    dir_name = dir_queue.get()  # 큐에서 빼낸 dir_name
    subdir_names = get_subdir(dir_name)  # dir_name의 하위 디렉토리를 subdir_names에 저장
    for each in subdir_names:  # subdir_names리스트에 있는 디렉토리들을 다시 큐에 넣음
        dir_queue.put(each)

for dir in peminer_data:
    print(dir)

In [None]:
import os
import queue

ember_data = []

def get_subdir(path):
    global ember_data
    try:  # 검색이 허가되지 않은 디렉토리 접근에 관한 예외처리
        dirfiles = os.listdir(path)  # path에 해당하는 디렉토리 dirfiles에 추가
    except PermissionError:
        return []

    subdir_list = []
    for each in dirfiles:
        if each.endswith(".json"):  # .txt로 끝나는 파일은 따로 all_txt리스트에 저장
            ember_data.append(path + each)
        full_name = path + each  # path와 each(디렉토리)
        if os.path.isdir(full_name):
            subdir_list.append(full_name + "/")
    return subdir_list


dir_queue = queue.Queue()
# /Library/ 아래의 모든 하위 디렉토리 찾기
dir_queue.put("EMBER/학습데이터/")  # 검색하고자 하는 디렉토리를 줄 앞에 세움

while not dir_queue.empty():  # 큐가 비어있는 상태인지 확인
    dir_name = dir_queue.get()  # 큐에서 빼낸 dir_name
    subdir_names = get_subdir(dir_name)  # dir_name의 하위 디렉토리를 subdir_names에 저장
    for each in subdir_names:  # subdir_names리스트에 있는 디렉토리들을 다시 큐에 넣음
        dir_queue.put(each)

for dir in ember_data:
    print(dir)

In [45]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
X, y = [], []

for path in peminer_data:
  feature_vector = []
  temp, fdata, fname = path.strip().split("/")
  fname = fname.replace(".json","")
  label = learn_label_table[fname]
  for data in ["PEMINER", "EMBER"]:
        path = f"{data}/{fdata}/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
  X.append(feature_vector)
  y.append(label)

for path in ember_data:
  feature_vector = []
  temp, fdata, fname = path.strip().split("/")
  fname = fname.replace(".json","")
  label = learn_label_table[fname]
  for data in ["PEMINER", "EMBER"]:
        path = f"{data}/{fdata}/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
  X.append(feature_vector)
  y.append(label)

np.asarray(X).shape, np.asarray(y).shape

((40000, 558), (40000,))

In [42]:
X, y = [], []
for fname_ in ["학습데이터/000c4ae5e00a1d4de991a9decf9ecbac59ed5582f5972f05b48bc1a1fe57338a", "학습데이터/00ed7bc707559e6e63818b2bba0ac6b338ba17d95aea6f0838cbdc40cb9acd94"]:
    feature_vector = []
    fdata, fname = fname_.strip().split("/")
    label = learn_label_table[fname]
    for data in ["PEMINER", "EMBER"]:
        path = f"{data}/{fname_}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
    X.append(feature_vector)
    y.append(label)

np.asarray(X).shape, np.asarray(y).shape

((2, 558), (2,))

검증데이터 구성

In [None]:
import os
import queue

peminer_veri_data = []

def get_subdir(path):
    global peminer_veri_data
    try:  # 검색이 허가되지 않은 디렉토리 접근에 관한 예외처리
        dirfiles = os.listdir(path)  # path에 해당하는 디렉토리 dirfiles에 추가
    except PermissionError:
        return []

    subdir_list = []
    for each in dirfiles:
        if each.endswith(".json"):  # .txt로 끝나는 파일은 따로 all_txt리스트에 저장
            peminer_veri_data.append(path + each)
        full_name = path + each  # path와 each(디렉토리)
        if os.path.isdir(full_name):
            subdir_list.append(full_name + "/")
    return subdir_list


dir_queue = queue.Queue()
# /Library/ 아래의 모든 하위 디렉토리 찾기
dir_queue.put("PEMINER/검증데이터/")  # 검색하고자 하는 디렉토리를 줄 앞에 세움

while not dir_queue.empty():  # 큐가 비어있는 상태인지 확인
    dir_name = dir_queue.get()  # 큐에서 빼낸 dir_name
    subdir_names = get_subdir(dir_name)  # dir_name의 하위 디렉토리를 subdir_names에 저장
    for each in subdir_names:  # subdir_names리스트에 있는 디렉토리들을 다시 큐에 넣음
        dir_queue.put(each)

for dir in peminer_veri_data:
    print(dir)

In [None]:
import os
import queue

ember_veri_data = []

def get_subdir(path):
    global ember_veri_data
    try:  # 검색이 허가되지 않은 디렉토리 접근에 관한 예외처리
        dirfiles = os.listdir(path)  # path에 해당하는 디렉토리 dirfiles에 추가
    except PermissionError:
        return []

    subdir_list = []
    for each in dirfiles:
        if each.endswith(".json"):  # .txt로 끝나는 파일은 따로 all_txt리스트에 저장
            ember_veri_data.append(path + each)
        full_name = path + each  # path와 each(디렉토리)
        if os.path.isdir(full_name):
            subdir_list.append(full_name + "/")
    return subdir_list


dir_queue = queue.Queue()
# /Library/ 아래의 모든 하위 디렉토리 찾기
dir_queue.put("EMBER/검증데이터/")  # 검색하고자 하는 디렉토리를 줄 앞에 세움

while not dir_queue.empty():  # 큐가 비어있는 상태인지 확인
    dir_name = dir_queue.get()  # 큐에서 빼낸 dir_name
    subdir_names = get_subdir(dir_name)  # dir_name의 하위 디렉토리를 subdir_names에 저장
    for each in subdir_names:  # subdir_names리스트에 있는 디렉토리들을 다시 큐에 넣음
        dir_queue.put(each)

for dir in ember_veri_data:
    print(dir)

In [55]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
A, b = [], []

for path in peminer_veri_data:
  feature_vector = []
  temp, fdata, fname = path.strip().split("/")
  fname = fname.replace(".json","")
  label = verify_label_table[fname]
  for data in ["PEMINER", "EMBER"]:
        path = f"{data}/{fdata}/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
  A.append(feature_vector)
  b.append(label)

for path in ember_veri_data:
  feature_vector = []
  temp, fdata, fname = path.strip().split("/")
  fname = fname.replace(".json","")
  label = verify_label_table[fname]
  for data in ["PEMINER", "EMBER"]:
        path = f"{data}/{fdata}/{fname}.json"
        if data == "PEMINER":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
  A.append(feature_vector)
  b.append(label)

np.asarray(A).shape, np.asarray(b).shape

((20000, 558), (20000,))

학습 및 검증

In [48]:
SEED = 41

def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt": # decision tree
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None

def train(X_train, y_train, model):
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf

def evaluate(X_test, y_test, model):
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)
    print("정확도", model.score(X_test, y_test))


In [56]:
# 학습
models = []
for model in ["rf", "lgb"]:
    clf = train(X, y, model)
    models.append(clf)

# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in models:
    evaluate(A, b, model)

정확도 0.9436
정확도 0.9523


앙상블 예제

In [57]:
def ensemble_result(X, y, models):
    '''
        학습된 모델들의 결과를 앙상블하는 함수
	
        :param X: 검증할 2차원 리스트 특징 벡터
        :param y: 검증할 1차원 리스트 레이블 벡터
        :param models: 1개 이상의 학습된 머신러닝 모델 객체를 가지는 1차원 리스트
    '''
    
    # Soft Voting
    # https://devkor.tistory.com/entry/Soft-Voting-%EA%B3%BC-Hard-Voting
    predicts = []
    for i in range(len(X)):
        probs = []
        for model in models:
            prob = model.predict_proba(X)[i][1]
            probs.append(prob)
        predict = 1 if np.mean(probs) >= 0.5 else 0
        predicts.append(predict)
        
    print("정확도", accuracy_score(y, predicts))

In [None]:
ensemble_result(A, b, models)