In [None]:
from __future__ import print_function, division

from keras.layers import Input, Dense, Activation
from keras.layers.merge import Maximum, Concatenate
# from keras.layers import Maximum, Concatenate
from keras.models import Model #Keras의 Model 클래스를 가져온다, 이 클래스는 Keras 텐서를 인스턴스화하는 데 사용

from sklearn.ensemble import RandomForestClassifier #RandomForest Classifier를 가져옴.(앙상블 학습방법) 
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
import tensorflow as  tf
#import tensorflow.compat.v1 as tf
#tf.compat.v1.disable_eager_execution() # work with tensorflow 2.0
from tensorflow.keras.optimizers import Adam
from model import Model as Classifier #사용자가 정의한 모듈에서 model을 가져와 Classifier로 별칭을 지정
import pickle #pickle 모듈은 python 객체 구조의 직렬화 및 역직렬화를 위한 이진 프로토콜을 구현
from sklearn.datasets import load_svmlight_file #libsvm 형식의 데이터셋을 로드하는데 사용할 수 있는 함수를 가져온다.


import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import numpy as np
import seaborn as sns
sns.set(style = "white")

The test data is a list of 3435 malware examples, each with 3514 API features.

In [None]:
seed_dict = pickle.load(open('feat_dict.pickle', 'rb'), encoding='latin1') #pickle 파일 로드,
features = []
sha1 = []
for key in seed_dict:
    seed_dict[key] = seed_dict[key].toarray()[0] 
    features.append(seed_dict[key])
    sha1.append(key)
feed_feat = np.stack(features)

The training dataset contains 13190 samples, each with 3514 API features. The first 6896 are malware example, and the last 6294 are benign examples.

In [None]:
# Train MalGAN and plot result

# models = ['adv_keep_twocls', 'adv_del_twocls']
#models = ['baseline_adv_delete_one', 'baseline_adv_insert_one', 'baseline_adv_delete_two', \
          #'baseline_adv_insert_rootallbutone', 'baseline_adv_combine_two']
models = ['baseline_checkpoint', 'robust_delete_one', 'robust_insert_one', 'robust_delete_two', \
          'robust_insert_allbutone', 'robust_monotonic', 'robust_combine_two_v2_e18', 'robust_combine_three_e17']

dataframes = []
for model in models:
    malgan = MalGAN(model)
    df = malgan.train(epochs = 50, batch_size= 128)
    dataframes.append(df)

data = pd.concat(dataframes, axis = 0).reset_index(drop=True)
plt.figure()

g = sns.lineplot(x = 'L0', y = 'ERA', data=data, hue='model')
plt.xlabel("$L_0$")
g.set(yticks = [0.00, 0.25, 0.50, 0.75, 1.00])
g.xaxis.set_major_locator(ticker.FixedLocator([10, 200, 500, 1000, 2000, 3514]))
handles, labels = g.get_legend_handles_labels()
g.legend(handles=handles, labels=labels)
fig = g.get_figure()
fig.savefig("result.png")
plt.show()

In [None]:
class MalGAN():
    def __init__(self, model_name):
        #API 피처의 차원 수와 노이즈 벡터의 차원 수를  설정
        self.apifeature_dims = 3514
        self.z_dims = 100   # noise appended at the end of example
        self.model_name = model_name

        self.hide_layers = 256
        self.generator_layers = [self.apifeature_dims+self.z_dims, self.hide_layers, self.apifeature_dims]
        self.substitute_detector_layers = [self.apifeature_dims, self.hide_layers, 1]
        #사전에 학습된 모델을 로드하여 Black-box 판별자를 구성합니다.
        self.blackbox, self.sess = self.build_blackbox_detector(self.model_name) 
        self.optimizer = Adam(lr=0.001)
        
        # Build and compile the substitute_detector
        # 판별자 구성: 생성자가 생성한 악성코드 샘플을 입력으로 받아 이 샘플이 진짜인지 가짜인지를 판별하는 역할
        self.substitute_detector = self.build_substitute_detector()
        self.substitute_detector.compile(loss='binary_crossentropy', optimizer=self.optimizer, metrics=['accuracy'])
        
        
        # Build the generator
        #생성자는 실제 데이터와 노이즈를 입력으로 받아 새로운 데이터를 생성하는 역할
        self.generator = self.build_generator()

        # The generator takes malware and noise as input and generates adversarial malware examples
        #생성자에 입력으로 들어갈 실제 악성 코드 피처와 노이즈에 대한 입력 레이어를 정의, 이 두 입력 레이어를 input이라는 리스트에 저장
        example = Input(shape=(self.apifeature_dims,))
        noise = Input(shape=(self.z_dims,))
        input = [example, noise]
        malware_examples = self.generator(input)

        # For the combined model we will only train the generator
        #GAN 훈련과정에서 생성자만 훈련하기 위해 판별자의 가중치를 고정
        self.substitute_detector.trainable = False

        # The discriminator takes generated images as input and determines validity
        #판별자는 생성자가 생성한 새로운 악성코드 샘플을 입력으로 받아 그 샘플이 진짜 악성코드인지 아닌지 판별
        validity = self.substitute_detector(malware_examples)

        # The combined model  (stacked generator and substitute_detector)
        self.combined = Model(input, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=self.optimizer)
        
        
    def build_generator(self):
        #생성자는 실제 데이터와 노이즈를 입력으로 받아 새로운 데이터를 생성하는 역할
        #각각의 코드는 생성자에 입력될 실제 악성코드 피처와 노이즈에 대한 입력 레이어를 정의
        example = Input(shape=(self.apifeature_dims,))
        noise = Input(shape=(self.z_dims,))
        x = Concatenate(axis=1)([example, noise]) # example과 noise를 연결하여 하나의 벡터로 만든다. 이렇게 만들어진 벡터가 생성자의 입력으로 사용
        for dim in self.generator_layers[1:]:
            x = Dense(dim)(x)
            x = Activation(activation='sigmoid')(x)
        x = Maximum()([example, x])
        #입력 레이어(example and noise) 출력 레이어(x)를 이용해 케라스 모델을 생성
        generator = Model([example, noise], x, name='generator')
        generator.summary()
        return generator
     # 판별자 구성: 생성자가 생성한 악성코드 샘플을 입력으로 받아 이 샘플이 진짜인지 가짜인지를 판별하는 역할
    def build_substitute_detector(self):

        input = Input(shape=(self.substitute_detector_layers[0],))
        x = input #입력레이어를 x에 저장
        for dim in self.substitute_detector_layers[1:]:
            x = Dense(dim)(x)
            x = Activation(activation='sigmoid')(x)
        substitute_detector = Model(input, x, name='substitute_detector') # 입력 레이어와 출력레이어를 이용해 케라스 모델을 생성 이 모델이 바로 판별자
        substitute_detector.summary()
        return substitute_detector    
    

    def build_blackbox_detector(self,model_name):
        #저장된 모델의 경로를 설정 .ckpt확장자 
        PATH = "adv_trained/{}.ckpt".format(model_name)
        # Clear the current graph in each run, to avoid variable duplication
        tf.reset_default_graph()
        model = Classifier() #외부에서 임포트 된 클래스
        saver = tf.train.Saver() #모델의 변수를 저장하거나 불러오는 데 사용
        sess = tf.Session() #텐서플로우의 연산을  실행
        sess.run(tf.global_variables_initializer()) #전역 변수 지역 변수 초기화
        sess.run(tf.local_variables_initializer())
                    
        saver.restore(sess, PATH)#특정 세션에서 저장된 모델 변수를 불러옴
        print ("load model from:", PATH) # 불러온 모델 경로 출력
        
        return model, sess
        
    def train(self, epochs, batch_size):
        
        model = self.blackbox
        sess = self.sess
        
                
        # Load test dataset (all malware)
        #테스트 데이터셋 feat_dict.pickle 파일에서 로드: 이 데이터셋에는 malware 예시들만 포함되어 있음
        seed_dict = pickle.load(open('feat_dict.pickle', 'rb'), encoding='latin1')
        features = [] #dense한 벡터로 변환 되어 리스트에 저장
        sha1 = [] #각 예시의 hash 값이 리스트에 저장
        dist_dict = {} # [key]: hash [value]: L0 distance
        for key in seed_dict:
            seed_dict[key] = seed_dict[key].toarray()[0]
            features.append(seed_dict[key])
            sha1.append(key)
        feed_feat = np.stack(features) #모든 malware 예시들을 하나의 numpy array로 쌓아 놓음
        xtest_mal, ytest_mal = feed_feat, np.ones(len(feed_feat))# xtest_mal 이 array를 복사, ytest_mal은 각 malware 예시에 대응하는 레이블을 담음
        

        # Load training dataset
        #train 파일 로드: 이 데이터셋은 benign예시와 malware 예시 모두 포함
        train_x, train_y = load_svmlight_file("train_data.libsvm",
                                       n_features=3514,
                                       multilabel=False, 
                                       zero_based=False,
                                       query_id=False)
        
        train_x = train_x.toarray()
        xtrain_ben = train_x[6896:] #benign 예시
        ytrain_ben = train_y[6896:] #benign 레이블
        xtrain_mal = train_x[0:6896] #malware 예시              
        ytrain_mal = train_y[0:6896] #malware 레이블
        
        # Since the training dataset is unbalanced, we randomly choose sample from benign dataset
        # and add them to the end to make up the gap
        #학습 데이터셋은 benign 예시들이 malware예시들보다 적다. 따라서 benign 예시들 중 일부를 임의로 선택하여 데이터셋에 추가. 균형 맞추기
        idx = np.random.randint(0, xtrain_ben.shape[0], 6896 - 6294) #benign 데이터셋에서 랜덤하게 선택할 인덱스를 생성. 선택할 개수는 현재의 malware 개수와 benign 개수 차이 만큼
        add_on = xtrain_ben[idx] #생성한 인덱스를 사용하여 benign 데이터에서 해당 인덱스에 해당하는 예시를 선택
        add_on_label = ytrain_ben[idx] # 선택된 bengin예시 레이블 선택
        xtrain_ben = np.concatenate((xtrain_ben, add_on), axis=0) #기존 benign 데이터셋에 선택된 benign 예시를 추가
        ytrain_ben = np.concatenate((ytrain_ben, add_on_label), axis = 0) #기존의 benign 레이블에 benign 레이블 추가

        Test_TPR = []
        d_loss_list, g_loss_list = [], []
        
        #각 epoch에 대한 substitute_detector와 generator의 훈련을 수행
        #루프는 주어진 에폭 수만큼 모델 훈련을 반복한다.
        for epoch in range(epochs): 
            
            # Each epoch goes through all the data in the training set
            start = 0                
                
            for step in range(xtrain_mal.shape[0] // batch_size): 
                
                # ---------------------
                #  Train substitute_detector
                # ---------------------
                #xmal_batch와xben_batch는 각각 악성 및 정상 데이터의 배치를 가져온다. noise는 생성자에 입력으로 사용되는 랜덤 노이즈를 생성
                xmal_batch = xtrain_mal[start : start + batch_size]  
                noise = np.random.uniform(0, 1, (batch_size, self.z_dims))

                xben_batch = xtrain_ben[start : start + batch_size]
                start = start + batch_size
                
                # predict using blackbox detector        
                #blackbox 모델을 사용해 benign 배치에 대한 예측을 수행      
                yben_batch = sess.run(model.y_pred,\
                    feed_dict={model.x_input:xben_batch})

                # Generate a batch of new malware examples
                #생성자는 gen_examples를 생성하는데 사용되며, 악성데이터 배치와 노이즈를 입력으로 사용
                gen_examples = self.generator.predict([xmal_batch, noise])
                #생성된 악성 예제에 대한 라벨을 생성: 라벨은 blackbox모델을 사용해 생성
                ymal_batch = sess.run(model.y_pred,\
                                      feed_dict={model.x_input:np.ones(gen_examples.shape)*(gen_examples > 0.5)})
                
                # Train the substitute_detector
                #substitute_decoder는 실제 benign 데이터와 생성된 악성 예쩨에 대해 훈련. 손실 계산
                d_loss_real = self.substitute_detector.train_on_batch(xben_batch, yben_batch)
                d_loss_fake = self.substitute_detector.train_on_batch(gen_examples, ymal_batch)
                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
                

                # ---------------------
                #  Train Generator
                # ---------------------
                #생성자는 악성 데이터 배치와 노이즈를 입력으로 사용해 훈련
                #이는 생성자가 substitute_detector를 속이도록 만드는데 악성예제를 0으로 분류
                noise = np.random.uniform(0, 1, (batch_size, self.z_dims))
                g_loss = self.combined.train_on_batch([xmal_batch,noise], np.zeros((batch_size, 1)))

            
            # After each epoch, Evaluate evasion performance on the test dataset
            # try different noise for 3 times
            for j in range(3): # 새로운 노이즈를 사용하여 악성예제를 생성하고, 그 성능을 평가하는 과정을 세 번 반복
                noise = np.random.uniform(0, 1, (xtest_mal.shape[0], self.z_dims))
                gen_examples = self.generator.predict([xtest_mal, noise]) #테스트 세트의 악성 데이터와 생성된 노이즈를 사용하여 생성자로 부터 얻은 악성예제
                #얼마나 잘 분류하는지를 측정    
                TPR = sess.run(model.accuracy,\
                        feed_dict={model.x_input:np.ones(gen_examples.shape)*(gen_examples > 0.5), model.y_input: np.ones(gen_examples.shape[0],)})
            
                Test_TPR.append(TPR) 
                #생성된 예제를 이진 형태로 변환. 생성된 예제의 값이 0보다 크면 1 
                transformed_to_bin = np.ones(gen_examples.shape)*(gen_examples > 0.5)
                #blackbox model을 사용하여 변환된 악성 예제에 대한 예측 라벨을 생성
                pred_y_label = sess.run(model.y_pred,\
                                     feed_dict={model.x_input:np.ones(gen_examples.shape)*(gen_examples > 0.5)})
            
            
            
                # remove successfully evaded malware examples from xtest_mal
                #생성된 악성 예제가 blackbox 피하는데 성공한 경우, 실제로는 악성이지만 blackbox 모델이 정상으로 분류한 경우 테스트 세트에서 제거
                #해당 예제와 원본 예제간의 LO 거리를 계산하여 저장
                #LO거리는 두 벡터간의 다른 두 요소의 수를 나타내므로, 원본 악성 예제와 생성된 악성 예제간의 차이를 나타냄
                i = 0
                while i  < pred_y_label.shape[0]:
                    if pred_y_label[i] == 0: # should be 1 but predict 0
                    #print(sha1[i], xtrain_mal[i])
                    # calculate L0 distance and put to dictionary
                        L0 = np.sum(transformed_to_bin[i]) - np.sum(xtest_mal[i]) #insertion only
                        dist_dict[sha1[i]] = L0  # [key]: hash [value]: L0 distance
                        xtest_mal = np.delete(xtest_mal, i, 0)
                        pred_y_label = np.delete(pred_y_label, i, 0)
                        sha1 = sha1[:i] + sha1[i+1:]
                    else:
                        i += 1
                # 현재 테스트 세트에 남아있는 악성 예제의 수를 출력합니다. 
                print("remaining malware examples:", xtest_mal.shape[0])
                if xtest_mal.shape[0] == 0:
                    break #successful evade all

        
            # Print and record the progress
            print("[MalGAN] epoch(%d) [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch+1, d_loss[0], 100*d_loss[1], g_loss))
            print("[Classifier] Test TPR on remaining test data: %f" % (Test_TPR[-1]))
            d_loss_list.append(d_loss[0])
            g_loss_list.append(g_loss)
            if xtest_mal.shape[0] == 0:
                break #successful evade all
            

        sess.close()
        
        # AFTER all epochs
        # Plot the progress --> loss
        # d_loss_df = pd.DataFrame(dict(epoch =np.arange(1, len(d_loss_list)+1),\
        #                               dataset = "d loss",\
        #                               loss = np.asarray(d_loss_list, dtype=np.float32)))
        # g_loss_df = pd.DataFrame(dict(epoch =np.arange(1, len(g_loss_list)+1),\
        #                               dataset = "g loss",\
        #                               loss = np.asarray(g_loss_list, dtype=np.float32)))

        # loss_df = pd.concat([d_loss_df,g_loss_df], axis=0).reset_index(drop=True)      
        # plt.figure()
        # loss_plot = sns.lineplot(x = 'epoch', y = 'loss', hue ='dataset', data = loss_df)
        # handles, labels = loss_plot.get_legend_handles_labels()
        # loss_plot.legend(handles=handles, labels=labels)
        # plt.show()
        # fig = loss_plot.get_figure()
        # fig.savefig("{}_loss".format(self.model_name))  # save loss plot
        
        # get corresponding dataframe for different models
        #ERA는 성공적으로 피해진 악성 예제의 비율을 나타내는 지표
        ERA = []
        success_num = 0
        # Calculate ERA for each L0 distance
        #LO거리가 i인 각 악성예제에 대해, 해당 예제가 Black Box 모델을 피하는데 성공했다면 success_num을 증가
        #LO거리는 원래의 악성예제와 생성된 악성 예제 간에 변화된 피처의 수
        #남아있는 악성예제의 수를 전체 악성 예제 수로 나눈다.
        for i in range(3515): # 0 - 3514 features
            for key in dist_dict:
                if dist_dict[key] == i:
                    success_num += 1
            ERA.append((3435 - success_num) / 3435)
        
        # report ERA if not completely evaded
        #만약 모든 악성예제가 피하지 못하였다면 이를 출력하여 리포팅
        if ERA[-1] != 0:
            print("{} is not completely evaded after {} epochs. ERA = {}".format(self.model_name, epochs, ERA[-1]))
            
        #이 부분은 모델의 이름에 따라 그래프의 이름을 지정    
        if self.model_name == 'baseline_checkpoint': curve_name = 'Baseline'
        if self.model_name == "baseline_adv_delete_one": curve_name = 'Adv Retrain A'
        if self.model_name == "robust_delete_one": curve_name = 'Robust A' 
        if self.model_name == "baseline_adv_insert_one": curve_name = 'Adv Retrain B'
        if self.model_name == "robust_insert_one": curve_name = 'Robust B'
        if self.model_name == "baseline_adv_delete_two": curve_name = 'Adv Retrain C'
        if self.model_name == "robust_delete_two": curve_name = 'Robust C'
        if self.model_name == "baseline_adv_insert_rootallbutone": curve_name = 'Adv Retrain D'
        if self.model_name == "adv_keep_twocls": curve_name = 'Ensemble D Base Learner'
        if self.model_name == "robust_monotonic": curve_name = 'Robust E'
        if self.model_name == "baseline_adv_combine_two": curve_name = 'Adv Retrain A+B'
        if self.model_name == "adv_del_twocls": curve_name = 'Ensemble A+B Base Learner'
        if self.model_name == "robust_combine_two_v2_e18": curve_name = 'Robust A+B'
        if self.model_name == "robust_insert_allbutone": curve_name = 'Robust D'
        if self.model_name == "robust_combine_three_e17": curve_name = 'Robust A+B+E'
        #각 LO거리에 대한 ERA를 포함하는 데이터 프레임을 만들고 반환(ERA, 모델이름, 그리고 LO 거리를 각각 포함)
        model_df = pd.DataFrame(dict(ERA = np.asarray(ERA, dtype=np.float32),\
                                     model = curve_name, L0 = np.arange(3515)))
        return model_df

In [None]:
# Train MalGAN and plot result

# models = ['adv_keep_twocls', 'adv_del_twocls']
#models = ['baseline_adv_delete_one', 'baseline_adv_insert_one', 'baseline_adv_delete_two', \
          #'baseline_adv_insert_rootallbutone', 'baseline_adv_combine_two']
#여러 모델을 대상으로 MalGAN을 학습시키고 그 결과를 그래프로 표시하는 과정
models = ['baseline_checkpoint', 'robust_delete_one', 'robust_insert_one', 'robust_delete_two', \
          'robust_insert_allbutone', 'robust_monotonic', 'robust_combine_two_v2_e18', 'robust_combine_three_e17']
#각 모델에 대해 MalGAN을 생성하고 학습을 수행
dataframes = []
for model in models:
    malgan = MalGAN(model)
    df = malgan.train(epochs = 50, batch_size= 128)
    dataframes.append(df)

data = pd.concat(dataframes, axis = 0).reset_index(drop=True)
plt.figure()

g = sns.lineplot(x = 'L0', y = 'ERA', data=data, hue='model')
plt.xlabel("$L_0$")
g.set(yticks = [0.00, 0.25, 0.50, 0.75, 1.00])
g.xaxis.set_major_locator(ticker.FixedLocator([10, 200, 500, 1000, 2000, 3514]))
handles, labels = g.get_legend_handles_labels()
g.legend(handles=handles, labels=labels)
fig = g.get_figure()
fig.savefig("result.png")
plt.show()