# 데이터 셋 만들기

In [None]:
from IPython.display import display
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np

import torchvision.transforms as transforms

import os

from matplotlib.pyplot import imshow

from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

import matplotlib.image as img
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score

torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.cuda.manual_seed_all(0)

if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [None]:
!pip install -q wandb
import wandb
wandb.login()


[34m[1mwandb[0m: Currently logged in as: [33mvinnyshin[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [None]:
class CustomDataset(Dataset):
  def __init__(self, imgs, malices, names):
        super().__init__()
        self.imgs = imgs
        self.malices = malices
        self.names = names
        
  def __len__(self):
      return len(self.imgs)
  
  def __getitem__(self,index):
      return self.imgs[index], self.malices[index], self.names[index]

In [None]:
from zipfile import ZipFile
from PIL import Image
import zipfile
import cv2
import numpy as np
import random

def convertPILImageToCV2(pil_image):
  np_image = np.array(pil_image)
  open_cv_image = np.array(pil_image) 

  open_cv_image = open_cv_image[:, :, ::-1].copy() 

  return open_cv_image

benign_path = '/content/drive/MyDrive/Malware Analysis/kisa_dataset/benign_image.zip'
malign_path = '/content/drive/MyDrive/Malware Analysis/kisa_dataset/malign_image.zip'
benign_zip = ZipFile(benign_path,'r')
malign_zip = ZipFile(malign_path,'r')

#benign
benign_info_list = benign_zip.infolist()
benign_name_list = benign_zip.namelist()
malign_info_list = malign_zip.infolist()
malign_name_list = malign_zip.namelist()

benign_set = []
malign_set = []

for i in range(len(benign_info_list)):
  _file = benign_zip.open(benign_info_list[i])
  img = Image.open(_file)
  img = convertPILImageToCV2(img)
  name = benign_name_list[i]
  benign_set.append((img, 0, name))
  

for i in range(len(malign_info_list)):
  _file = malign_zip.open(malign_info_list[i])
  img = Image.open(_file)
  img = convertPILImageToCV2(img)
  name = malign_name_list[i]
  malign_set.append((img, 1, name))
  

#데이터 셔플링
random.shuffle(benign_set)
random.shuffle(malign_set)
# 셔플된 데이터에서 테스트셋, 트레인셋 구성.

num_each_test = 400

raw_test_set = benign_set[:num_each_test] + malign_set[:num_each_test]
random.shuffle(raw_test_set)
raw_train_set = benign_set[num_each_test:] + malign_set[num_each_test:]
random.shuffle(raw_train_set)


## Normalizing

In [None]:
train_meanRGB = [np.mean(x, axis=(1,2)) for x,_, __ in raw_train_set]
train_stdRGB = [np.std(x, axis=(1,2)) for x,_, __ in raw_train_set]

train_meanR = np.mean([m[0] for m in train_meanRGB])
train_meanG = np.mean([m[1] for m in train_meanRGB])
train_meanB = np.mean([m[2] for m in train_meanRGB])

train_stdR = np.mean([s[0] for s in train_stdRGB])
train_stdG = np.mean([s[1] for s in train_stdRGB])
train_stdB = np.mean([s[2] for s in train_stdRGB])


test_meanRGB = [np.mean(x, axis=(1,2)) for x,_, __ in raw_test_set]
test_stdRGB = [np.std(x, axis=(1,2)) for x,_, __ in raw_test_set]

test_meanR = np.mean([m[0] for m in test_meanRGB])
test_meanG = np.mean([m[1] for m in test_meanRGB])
test_meanB = np.mean([m[2] for m in test_meanRGB])

test_stdR = np.mean([s[0] for s in test_stdRGB])
test_stdG = np.mean([s[1] for s in test_stdRGB])
test_stdB = np.mean([s[2] for s in test_stdRGB])

normalized_zero_r = (0-test_meanR)/test_stdR
print(normalized_zero_r)

-0.10809985121652183


In [None]:
train_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.ToTensor(),
                                      transforms.Normalize([train_meanR,train_meanG,train_meanB],[train_stdR, train_stdG, train_stdB])
                                      ])
test_transform = transforms.Compose([transforms.ToPILImage(),
                                      transforms.ToTensor(),
                                     transforms.Normalize([test_meanR, test_meanG, test_meanB],[test_stdR, test_stdG, test_stdB])
                                      ])

train_imgs, train_malices, train_names = [],[], []
test_imgs, test_malices, test_names = [],[],[]


for img,mal,name in raw_train_set:
  train_imgs.append(train_transform(img))
  train_malices.append(mal)
  train_names.append(name)
for img,mal,name in raw_test_set:
  test_imgs.append(test_transform(img))
  test_malices.append(mal)
  test_names.append(name)


train_set = CustomDataset(train_imgs, train_malices, train_names)
test_set = CustomDataset(test_imgs, test_malices, test_names)

# 모델 정의

In [None]:
hyperparameter_defaults = dict(
    dataset = "MALWARE",
    gpu = "colab",
    dropout = 0.3,
    layer_1_out_channels = 10,
    layer_2_out_channels = 20,
    layer_3_out_channels = 30,
    layer_4_out_features = 300,
    layer_5_out_features = 150,
    layer_6_out_features = 75,
    batch_size = 256,
    learning_rate = 0.001,
    weight_decay = 1e-5,
    epochs = 80,
    kernel_size = 5,
    )

wandb.init(config=hyperparameter_defaults, project="Malware_analysis")
config = wandb.config

In [None]:
sweep_config = {
  "name" : "deep-layer",
  "method" : "random",
  "metric" : {
      "goal" : "minimize",
      "name" : "valid_loss"
  },
  "parameters" : {
    "epochs": {
      "distribution" : "int_uniform",
      "min" : 10,
      "max" : 200,
    },
    "dropout" : {
      "values" : [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]
    },
    "dropout2" : {
      "values" : [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]
    },
    "dropout3" : {
      "values" : [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]
    },
    "layer_1_out_channels" : {
        "distribution" : "int_uniform",
        "min" : 10,
        "max" : 30,
    },
    "layer_2_out_channels" : {
        "distribution" : "int_uniform",
        "min" : 20,
        "max" : 40,
    },
    "layer_3_out_channels" : {
        "distribution" : "int_uniform",
        "min" : 30,
        "max" : 50,
    },
    "layer_4_out_features" : {
        "distribution" : "int_uniform",
        "min" : 300,
        "max" : 500,
    },
    "layer_5_out_features" : {
        "distribution" : "int_uniform",
        "min" : 100,
        "max" : 300,
    },
    "layer_6_out_features" : {
        "distribution" : "int_uniform",
        "min" : 30,
        "max" : 100,
    },
    "learning_rate" : {
        "distribution" : "log_uniform_values",
        "min": 1e-4,
        "max": 1e-1,
    },
    "weight_decay" : {
        "distribution" : "log_uniform_values",
        "min": 1e-4,
        "max": 1e-1,
    }
  },
  "early_terminate": {
      "type": "hyperband",
      "eta" : 3,
      "min_iter" : 3,
  }
}

sweep_id = wandb.sweep(sweep_config, project="Malware_analysis")

Create sweep with ID: 8tm94kg8
Sweep URL: https://wandb.ai/vinnyshin/Malware_analysis/sweeps/8tm94kg8


In [None]:
class CNN(nn.Module): 
    def __init__(self, config):
      super(CNN, self).__init__()
      self.config = config

      self.image_size = 256

      self.layer1 = nn.Sequential(
          nn.Conv2d(in_channels=3, out_channels=config.layer_1_out_channels, kernel_size=5),
          nn.BatchNorm2d(config.layer_1_out_channels),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout(config.dropout)
      )

      self.image_size = int(((self.image_size - 5) + 1) / 2)

      self.layer2 = nn.Sequential(
          nn.Conv2d(in_channels=config.layer_1_out_channels, out_channels=config.layer_2_out_channels, kernel_size=5),
          nn.BatchNorm2d(config.layer_2_out_channels),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout(config.dropout)
      )

      self.image_size = int(((self.image_size - 5) + 1) / 2)

      self.layer3 = nn.Sequential(
          nn.Conv2d(in_channels=config.layer_2_out_channels, out_channels=config.layer_3_out_channels, kernel_size=5),
          nn.BatchNorm2d(config.layer_3_out_channels),
          nn.ReLU(inplace=True),
          nn.MaxPool2d(2),
          nn.Dropout(config.dropout)
      )

      self.image_size = int(((self.image_size - 5) + 1) / 2)
      
      self.layer4 = nn.Sequential(
          nn.Linear(in_features= config.layer_3_out_channels * self.image_size * self.image_size, out_features=config.layer_4_out_features, bias=True),
          nn.BatchNorm1d(config.layer_4_out_features),
          nn.ReLU(inplace=True),
          nn.Dropout(config.dropout2)
      )

      self.layer5 = nn.Sequential(
          nn.Linear(in_features= config.layer_4_out_features, out_features=config.layer_5_out_features, bias=True),
          nn.BatchNorm1d(config.layer_5_out_features),
          nn.ReLU(inplace=True),
          nn.Dropout(config.dropout2)
      )

      self.layer6 = nn.Sequential(
          nn.Linear(in_features= config.layer_5_out_features, out_features=config.layer_6_out_features, bias=True),
          # nn.BatchNorm1d(config.layer_6_out_features),
          nn.ReLU(inplace=True),
          nn.Dropout(config.dropout3)
      )

      self.layer7 = nn.Sequential(
          nn.Linear(in_features=config.layer_6_out_features, out_features=2, bias=True),
      )

    def forward(self, x):
      x = self.layer1(x)
      x = self.layer2(x)
      x = self.layer3(x)
      x = x.view(-1, self.config.layer_3_out_channels * self.image_size * self.image_size)
      x = self.layer4(x)
      x = self.layer5(x)
      x = self.layer6(x)
      x = self.layer7(x)
      return x

In [None]:
# torch.cuda.empty_cache()

In [None]:
train_loader = DataLoader(dataset = train_set, batch_size = config.batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(dataset = test_set, batch_size = config.batch_size, shuffle=False, num_workers=0)

# Training 

In [None]:
def train():
  with wandb.init() as run:
    train_losses = []
    valid_losses = []
    
    config = wandb.config

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    criterion = nn.CrossEntropyLoss()
    
    model = CNN(config).to(device)
    optimizer = torch.optim.Adam(model.parameters(),lr = config.learning_rate, weight_decay=config.weight_decay)

    max_f1 = 0

    for epoch in range(1, config.epochs + 1):
        # keep-track-of-training-and-validation-loss
        train_loss = 0.0
        valid_loss = 0.0
        
        # training-the-model
        model.train()
        
        for data, target,_ in train_loader:
            
            # move-tensors-to-GPU 
            data = data.to(device)
            target = target.to(device)
            
            # clear-the-gradients-of-all-optimized-variables
            optimizer.zero_grad()
            # forward-pass: compute-predicted-outputs-by-passing-inputs-to-the-model
            output = model(data)
            # calculate-the-batch-loss
            loss = criterion(output, target)
            # backward-pass: compute-gradient-of-the-loss-wrt-model-parameters
            loss.backward()
            # perform-a-ingle-optimization-step (parameter-update)
            optimizer.step()
            # update-training-loss
            train_loss += loss.item() * data.size(0)
            
        # validate-the-model
        model.eval()

        correct = 0
        total = 0
        predict = []
        with torch.no_grad():
          for data, target,_ in test_loader:
              
              data = data.to(device)
              target = target.to(device)
              output = model(data)

              loss = criterion(output, target)
              # update-average-validation-loss 
              valid_loss += loss.item() * data.size(0)

              _, predicted = torch.max(output.data, 1)
              predict.extend(predicted.detach().cpu())
              total += target.size(0)
              correct += (predicted == target).sum().item()
          # calculate-average-losses
          train_loss = train_loss/len(train_loader.sampler)
          valid_loss = valid_loss/len(test_loader.sampler)
          train_losses.append(train_loss)
          valid_losses.append(valid_loss)
          # print-training/validation-statistics 
          print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
              epoch, train_loss, valid_loss))
          f1 = f1_score(test_set.malices, predict)
          if (max_f1 < f1):
            max_f1 = f1
          
          print('f1 score of the model: {}'.format(f1))
          print('Test Accuracy of the model: {} %'.format(100 * correct / total))
          
          wandb.log({
              "train_loss": train_loss,
              "valid_loss": valid_loss,
              "f1_score": f1,
              "Accuracy": 100 * correct / total,
          })

In [14]:
count = 300
wandb.agent(sweep_id, function=train, count=count)

Output hidden; open in https://colab.research.google.com to view.

# Testing

In [15]:
# test-the-model
model.eval()  # it-disables-dropout
predict = []
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels,_ in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        predict.extend(predicted.detach().cpu())
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
          
    print('Test Accuracy of the model: {} %'.format(100 * correct / total))

# Save 
torch.save(model.state_dict(), 'model.ckpt')

NameError: ignored

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

plt.plot(train_losses, label='Training loss')
plt.plot(valid_losses, label='test loss')
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend(frameon=False)

In [None]:
from sklearn.metrics import confusion_matrix, f1_score
t = confusion_matrix(test_set.malices, predict)
f1 = f1_score(test_set.malices, predict)
print(t)

np_predict = np.array(predict)
np_malices = np.array(test_set.malices)


t = np_predict == np_malices # T : 정답이 맞는 경우 
f = np_predict != np_malices # F : 정답에 틀린 경우

# 각 그룹의 베이직 블록 개수, 픽셀 개수, (베이직 블록 - 픽셀 개수), 그룹당 평균, 중간
index_true = np.where(t)[0]
index_false = np.where(f)[0]
benign_block_dir = '/content/drive/MyDrive/Malware Analysis/kisa_dataset/benign_block/'
malign_block_dir = '/content/drive/MyDrive/Malware Analysis/kisa_dataset/malign_block/'

In [None]:
import torch
import math


E = 0.0000001

def truncate(num, n):
    integer = int(num * (10**n))/(10**n)
    return float(integer)

def get_num_pixel(tensor_r, normalized_zero_r):
  pixels = (tensor_r.numpy() - normalized_zero_r) > E 
  cnt = np.count_nonzero(pixels)
  return cnt

true_benign_basic_block_nums = []
true_malign_basic_block_nums = []
true_benign_pixel_nums = []
true_malign_pixel_nums = []

false_benign_basic_block_nums = []
false_malign_basic_block_nums = []
false_benign_pixel_nums = []
false_malign_pixel_nums = []

normalized_zero_r = (0-test_meanR)/test_stdR
normalized_zero_r = truncate(normalized_zero_r,8)

demo = 40 # 최대 400까지 

for idx in index_true[:demo]:
  tensor, malice, name = test_set[idx]
  name = name.replace('.png','.txt')
  path = benign_block_dir+name if malice == 0 else malign_block_dir+name
  # 베이직 블록 개수 세기 위해, 파일을 엽니다. 
  block_file = open(path,'r')
  num_block = len(block_file.readlines())
  num_pixels = get_num_pixel(tensor[0], normalized_zero_r)
  
  # 베이직 블록 개수, 픽셀 개수를 입력합니다. 
  if malice == 0:
    true_benign_basic_block_nums.append(num_block)
    true_benign_pixel_nums.append(num_pixels)
    
  else: 
    true_malign_basic_block_nums.append(num_block)
    true_malign_pixel_nums.append(num_pixels)

print()
for idx in index_false[:demo]:
  tensor, malice, name = test_set[idx]
  name = name.replace('.png','.txt')
  path = benign_block_dir+name if malice == 0 else malign_block_dir+name
  # 베이직 블록 개수 세기 위해, 파일을 엽니다. 
  block_file = open(path,'r')
  num_block = len(block_file.readlines())
  num_pixels = get_num_pixel(tensor[0], normalized_zero_r)
  
  # 베이직 블록 개수, 픽셀 개수를 입력합니다. 
  if malice == 0:
    false_benign_basic_block_nums.append(num_block)
    false_benign_pixel_nums.append(num_pixels)
  else: 
    false_malign_basic_block_nums.append(num_block)
    false_malign_pixel_nums.append(num_pixels)

  

In [None]:
# True_benign
# True_malign
# False_benign
# False_malign
def stat(block_nums, pixel_nums, label):
  min_block = min(block_nums)
  mean_block = np.mean(block_nums)
  min_pixels = min(pixel_nums)
  mean_pixels = np.mean(pixel_nums)
  diff = np.array(block_nums) - np.array(pixel_nums)
  min_diff = min(diff)
  mean_diff = np.mean(diff)

  median_block = np.median(block_nums)
  median_pixels = np.median(pixel_nums)
  
  print(f'====={label}=====')
  print(f'블록 개수 평균 : {mean_block}, 픽셀 개수 평균 : {mean_pixels}, 평균 차이 : {mean_diff} 소실률: {(mean_pixels/mean_block) * 100}%')
  print(f'중간값 블록 : {median_block}, 중간값 픽셀 : {median_pixels}')

stat(true_benign_basic_block_nums, true_benign_pixel_nums, "True benign")
stat(true_malign_basic_block_nums, true_malign_pixel_nums, 'True malign')
stat(false_benign_basic_block_nums, false_benign_pixel_nums, 'False benign')
stat(false_malign_basic_block_nums, false_malign_pixel_nums, 'False malign')

## Visualization 

In [None]:
import torch
import torchvision
import torchvision.transforms as T
from PIL import Image

import matplotlib.pyplot as plt
import torchvision.transforms as transforms


indecies = np.where(t)[0]

cnt = 0

for i in indecies:
  # wandb.log({"fakge": [wandb.Image(i.reshape([256, 256, 3]), mode="RGB") for i in test_set[i][0]]})
  tf = T.ToPILImage()
  img_t = tf(raw_test_set[i][0])
  plt.imshow(img_t)
  plt.show()
  print('ang')
  
  cnt += 1
  if cnt > 20 :
    break


In [None]:
indecies = np.where(f)[0]

cnt = 0

for i in indecies:
  # wandb.log({"fakge": [wandb.Image(i.reshape([256, 256, 3]), mode="RGB") for i in test_set[i][0]]})
  tf = T.ToPILImage()
  img_t = tf(raw_test_set[i][0])
  plt.imshow(img_t)
  plt.show()
  print('ang')
  
  cnt += 1
  if cnt > 20 :
    break
