## AI 악성코드 탐지 프로젝트

팀명: 사망

팀원: 임준범(20182010)

소감: 결과물이 매우 부족하지만 그래도 튜토리얼 코드를 어느정도 이해하고 csv 출력까지는 성공할 수 있었습니다. 제 실력의 부족함을 실감했고 AI 악성코드 탐지가 어떻게 돌아가는지 이해하는 데 도움이 되었습니다.


In [None]:
!pip install pefile
!pip install numpy
!pip install sklearn
!pip install lightgbm
!pip install tqdm
!pip install pandas
!pip install graphviz

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!unzip /content/drive/MyDrive/데이터.zip

In [None]:
import os
import glob
import json
import pprint
import csv

import numpy as np

from lightgbm import LGBMClassifier

from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier

from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import RFE

In [None]:
SEED = 41

def read_label_csv(path):
    label_table = dict()
    with open(path, "r", encoding="cp949") as f:
        for line in f.readlines()[1:]:
            fname, label = line.strip().split(",")
            label_table[fname] = int(label)
    return label_table

def read_json(path):
    with open(path, "r") as f:
        return json.load(f)

def load_model(**kwargs):
    if kwargs["model"] == "rf":
        return RandomForestClassifier(random_state=kwargs["random_state"], n_jobs=4)
    elif kwargs["model"] == "dt":
        return DecisionTreeClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lgb":
        return LGBMClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "svm":
        return SVC(random_state=kwargs["random_state"])
    elif kwargs["model"] == "lr":
        return LogisticRegression(random_state=kwargs["random_state"], n_jobs=-1)
    elif kwargs["model"] == "knn":
        return KNeighborsClassifier(n_jobs=-1)
    elif kwargs["model"] == "adaboost":
        return AdaBoostClassifier(random_state=kwargs["random_state"])
    elif kwargs["model"] == "mlp":
        return MLPClassifier(random_state=kwargs["random_state"])
    else:
        print("Unsupported Algorithm")
        return None
    

def train(X_train, y_train, model):
    '''
        머신러닝 모델을 선택하여 학습을 진행하는 함수
	
        :param X_train: 학습할 2차원 리스트 특징벡터
        :param y_train: 학습할 1차원 리스트 레이블 벡터
        :param model: 문자열, 선택할 머신러닝 알고리즘
        :return: 학습된 머신러닝 모델 객체
    '''
    clf = load_model(model=model, random_state=SEED)
    clf.fit(X_train, y_train)
    return clf


def evaluate(X_test, y_test, model):
    '''
        학습된 머신러닝 모델로 검증 데이터를 검증하는 함수
	
        :param X_test: 검증할 2차원 리스트 특징 벡터
        :param y_test: 검증할 1차원 리스트 레이블 벡터
        :param model: 학습된 머신러닝 모델 객체
    '''
    predict = model.predict(X_test)
    print("정확도", model.score(X_test, y_test))


## 레이블 테이블 로드

In [None]:
label_1 = read_label_csv("학습데이터_정답.csv")
label_2 = read_label_csv("검증데이터_정답.csv")

## 특징 벡터 생성

In [None]:
class PeminerParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def process_report(self):  
        self.vector = [value for _, value in sorted(self.report.items(), key=lambda x: x[0])]
        return self.vector
    

class EmberParser:
    def __init__(self, path):
        self.report = read_json(path)
        self.vector = []
    
    def get_histogram_info(self):
        histogram = np.array(self.report["histogram"])
        total = histogram.sum()
        vector = histogram / total
        return vector.tolist()
    
    def get_string_info(self):
        strings = self.report["strings"]

        hist_divisor = float(strings['printables']) if strings['printables'] > 0 else 1.0
        vector = [
            strings['numstrings'], 
            strings['avlength'], 
            strings['printables'],
            strings['entropy'], 
            strings['paths'], 
            strings['urls'],
            strings['registry'], 
            strings['MZ']
        ]
        vector += (np.asarray(strings['printabledist']) / hist_divisor).tolist()
        return vector
    
    def get_general_file_info(self):
        general = self.report["general"]
        vector = [
            general['size'], general['vsize'], general['has_debug'], general['exports'], general['imports'],
            general['has_relocations'], general['has_resources'], general['has_signature'], general['has_tls'],
            general['symbols']
        ]
        return vector

    def process_report(self):
        vector = []
        vector += self.get_general_file_info()
        vector += self.get_histogram_info()
        vector += self.get_string_info()
        return vector
    

## 학습데이터 구성

In [None]:
# 데이터의 특징 벡터 모음(2차원 리스트) : X
# 데이터의 레이블 모음(1차원 리스트) : y
X1, y1 = [], []
files = os.listdir("EMBER/학습데이터") + os.listdir("PEMINER/학습데이터")
for fname in files:
    feature_vector = []
    label = label_1[fname.replace('.json','')]
    for data in ["PEMINER/학습데이터", "EMBER/학습데이터"]:
        path = f"{data}/{fname}"
        if data == "PEMINER/학습데이터":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
        
    X1.append(feature_vector)
    y1.append(label)

np.asarray(X1).shape, np.asarray(y1).shape

## 검증데이터 구성

In [None]:
X2, y2 = [], []
files = os.listdir("EMBER/검증데이터")
for fname in files:
    feature_vector = []
    label = label_2[fname.replace('.json','')]
    for data in ["PEMINER/검증데이터",  "EMBER/검증데이터"]:
        path = f"{data}/{fname}"
        if data == "PEMINER/검증데이터":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
        
    X2.append(feature_vector)
    y2.append(label)

np.asarray(X2).shape, np.asarray(y2).shape

## 학습 및 검증

In [None]:
# 학습
models = []
for model in ["lgb"]:
    clf = train(X1, y1, model)
    models.append(clf)


# 검증
# 실제 검증 시에는 제공한 검증데이터를 검증에 사용해야 함
for model in models:
    evaluate(X2, y2, model)

## 테스트데이터 생성


In [None]:
X3 = []
Tfiles = os.listdir("EMBER/테스트데이터")
for fname in Tfiles:
    feature_vector = []
    for data in ["PEMINER/테스트데이터", "EMBER/테스트데이터"]:
        path = f"{data}/{fname}"
        if data == "PEMINER/테스트데이터":
            feature_vector += PeminerParser(path).process_report()
        else:
            feature_vector += EmberParser(path).process_report()
        
    X3.append(feature_vector)

np.asarray(X3).shape

## csv로 출력

In [None]:
def csv_result(X, model):   
    predicts = model.predict(X)
    with open('predict.csv', 'w', encoding='cp949') as f:
        wr = csv.writer(f)
        wr.writerow(['file','predict'])
        for i in range(len(X)):
            wr.writerow([Tfiles[i].replace('.json',''), predicts[i]])
            #print([Tfiles[i].replace('.json',''),predicts[i]])

In [None]:
csv_result(X3, models[0])