<a href="https://colab.research.google.com/github/ckj18/BigDataSecurity/blob/main/TeamProject_MalwareDetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 드라이브 설정

In [None]:
from google.colab import drive
drive.mount('/content/drive/') 

In [None]:
cd '/content/drive/Shareddrives/BigDataSecurity'

In [None]:
ls

## 모듈 불러오기

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, utils, datasets
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, datasets
import torch.utils.data
import os
import time
# 전처리
import cv2
import numpy as np
from skimage.feature import hog

In [None]:
from PIL import ImageFile
from PIL import Image
ImageFile.LOAD_TRUNCATED_IMAGES = True # prevent truncate error

## 데이터셋 불러오기


In [None]:
image_transforms = {
    "train": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),        
    ]),
    "test": transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ])
}

In [None]:
train_data = datasets.ImageFolder(root = './malware/train', 
                                  transform = image_transforms['train'])

test_data = datasets.ImageFolder(root = './malware/val', 
                                 transform = image_transforms['test'])

In [None]:
val_size = int(4 * len(test_data) / 11)
add_size = int(3 * len(test_data) / 11)
test_size = int(4 * len(test_data) / 11)

valid_data, test_data, add_data = torch.utils.data.random_split(test_data, [val_size, test_size, add_size])
train_data = torch.utils.data.ConcatDataset([train_data, add_data])

In [None]:
train_loader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True) # make train loader
valid_loader = torch.utils.data.DataLoader(valid_data, batch_size=128, shuffle=False) # make test loader
test_loader = torch.utils.data.DataLoader(test_data, batch_size=128, shuffle=False) # make test loader

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# functions to show an image


def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
# print(dataiter.next())
images, labels, paths = next(dataiter)

batch_size = 16

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print()
labels = labels.tolist()
print(' '.join(f'{list(classes.keys())[list(classes.values()).index(j)]}' for j in labels))

## EDA 및 전처리

In [None]:
# 전처리 수행을 위한 데이터 로드
batch_size = 32  # 배치 크기 설정

dataiter = iter(train_loader)
num_batches = len(train_loader)  # 배치의 개수

images = []
CLAHE_images = []
WT_images = []

for _ in range(num_batches):
    batch_images, labels, paths = next(dataiter)

    batch_images_processed = []
    batch_CLAHE_images = []
    batch_WT_images = []

    for path in paths:
        image = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

        # CLAHE 
        clahe = cv2.createCLAHE(clipLimit=0.02, tileGridSize=(4,4))
        CLAHE_image = clahe.apply(image)

        # Wavelet transform
        wavelet = 'db5'  # Daubechies family
        level = 2  # Number of decomposition levels
        coeffs = pywt.wavedec2(CLAHE_image, wavelet, level=level)

        # 재구성
        WT_image = pywt.waverec2(coeffs, wavelet)

        batch_images_processed.append(image)
        batch_CLAHE_images.append(CLAHE_image)
        batch_WT_images.append(WT_image)

    images.append(batch_images_processed)
    CLAHE_images.append(batch_CLAHE_images)
    WT_images.append(batch_WT_images)

images = np.array(images)
CLAHE_images = np.array(CLAHE_images)
WT_images = np.array(WT_images)

### Contrast 조정 ( CLAHE ) 시각화

In [None]:
import cv2
import numpy as np
from matplotlib import pyplot as plt

img = cv2.imread(path,0);

# contrast limit가 0.02이고 title의 size는 4X4
clahe = cv2.createCLAHE(clipLimit=0.02, tileGridSize=(4,4))
img2 = clahe.apply(img)

# 시각화
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
axes[0].imshow(img, cmap='gray')
axes[0].set_title('Before')
axes[0].axis('off')
axes[1].imshow(img2, cmap='gray')
axes[1].set_title('After')
axes[1].axis('off')

plt.tight_layout()
plt.show()

###  영상 압축( Wavelet Transform ) 시각화

In [None]:
import pywt
import numpy as np
import cv2
import matplotlib.pyplot as plt

# 이미지 로드
image = cv2.imread(path, cv2.IMREAD_GRAYSCALE)

# Wavelet transform
wavelet = 'db5'  # Daubechies family
level = 2  # Number of decomposition levels
coeffs = pywt.wavedec2(image, wavelet, level=level)

# 재구성
reconstructed_image = pywt.waverec2(coeffs, wavelet)

# 원본 이미지와 재구성 이미지 시각화
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
axes[0].imshow(image, cmap='gray')
axes[0].set_title('Original Image')
axes[0].axis('off')
axes[1].imshow(reconstructed_image, cmap='gray')
axes[1].set_title('Reconstructed Image')
axes[1].axis('off')

plt.tight_layout()
plt.show()


### HOG

In [None]:
hog = cv2.HOGDescriptor()

hog_features = []

for batch_images in WT_images:
    batch_hog_features = []
    
    for image in batch_images:
        hog_feature = hog.compute(image)  # Compute HOG features
        batch_hog_features.append(hog_feature.flatten())
    
    batch_hog_features = np.array(batch_hog_features)
    hog_features.append(batch_hog_features)

hog_features = np.array(hog_features)

###GIST

In [None]:
pip install python-gist

In [None]:
pip install gists.py

In [None]:
import gist

### SIFT

In [None]:
from matplotlib import pyplot as plt

In [None]:
# SIFT 시각화
img = cv2.imread(path)
gray= cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)

sift = cv2.SIFT_create()
kp = sift.detect(gray,None)

img=cv2.drawKeypoints(gray,kp, img)

plt.figure(figsize=(10, 5))

plt.subplot(1, 3, 1)
plt.imshow(img, cmap='gray')
plt.title('SIFT')

plt.tight_layout()
plt.show()

In [None]:
descriptors = []

for image in enhanced_images:
    # Detect keypoints and compute descriptors
    keypoints, descriptor = sift.detectAndCompute(image, None)
    
    # Store the descriptors
    descriptors.append(descriptor)

###T-SNE

In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

In [None]:
# 클래스 정보 가져오기
class_labels = [list(classes.keys())[list(classes.values()).index(i)] for i in labels]

# 클래스별 색상 매핑
class_colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'pink',
                'lightblue', 'brown', 'gray', 'olive', 'teal', 'navy', 'salmon', 'gold', 'lightgreen', 'lavender',
                'skyblue', 'tan', 'coral', 'orchid', 'darkgreen', 'silver']

In [None]:
# T-SNE 모델 생성 및 학습 (SIFT로 추출한 feature 사용)
# Concatenate the descriptors for each image
all_descriptors = np.concatenate([d for d in descriptors if d is not None], axis=0)

# Apply T-SNE to reduce the dimensionality of the feature matrix
tsne = TSNE(n_components=2, perplexity=10, learning_rate=200, random_state=42)
embedded_features = tsne.fit_transform(all_descriptors)

In [None]:
# T-SNE 결과를 시각화 ( SIFT 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(embedded_features[indices, 0], embedded_features[indices, 1], label=label, color=color)
plt.title('T-SNE Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.show()

In [None]:
# T-SNE 모델 생성 및 학습 ( HOG로 추출한 feature 사용)
tsne = TSNE(n_components=2, perplexity=20, learning_rate=200, random_state=42)
tsne_result = tsne.fit_transform(hog_features)

In [None]:
# T-SNE 결과를 시각화 (HOG 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(tsne_result[indices, 0], tsne_result[indices, 1], label=label, color=color)
plt.title('T-SNE Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.legend()
plt.show()

### UMAP

In [None]:
!pip install umap-learn

In [None]:
import umap
import matplotlib.pyplot as plt

In [None]:
# 클래스 정보 가져오기
class_labels = [list(classes.keys())[list(classes.values()).index(i)] for i in labels]

# 클래스별 색상 매핑
class_colors = ['red', 'blue', 'green', 'orange', 'purple', 'yellow', 'cyan', 'magenta', 'lime', 'pink',
                'lightblue', 'brown', 'gray', 'olive', 'teal', 'navy', 'salmon', 'gold', 'lightgreen', 'lavender',
                'skyblue', 'tan', 'coral', 'orchid', 'darkgreen', 'silver']

In [None]:
# UMAP 모델 생성 및 학습 ( SIFT로 추출한 feature 사용)
umap_model = umap.UMAP(n_components=2, learning_rate=200, random_state=42)
umap_result = umap_model.fit_transform(embedded_features)

In [None]:
# UMAP 결과를 시각화 ( SIFT로 추출한 feature 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(umap_result[indices, 0], umap_result[indices, 1], label=label, color=color)
plt.title('UMAP Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.colorbar()
plt.show()

In [None]:
# UMAP 모델 생성 및 학습 ( HOG로 추출한 feature 사용)
umap_model = umap.UMAP(n_components=2, learning_rate=200, random_state=42)
umap_result = umap_model.fit_transform(hog_features)

In [None]:
# UMAP 결과를 시각화 ( HOG로 추출한 feature 사용)
plt.figure(figsize=(10, 10))
for label, color in zip(set(class_labels), class_colors):
    indices = [i for i, x in enumerate(class_labels) if x == label]
    plt.scatter(umap_result[indices, 0], umap_result[indices, 1], label=label, color=color)
plt.title('UMAP Visualization')
plt.xlabel('Dimension 1')
plt.ylabel('Dimension 2')
plt.colorbar()
plt.show()

## 모델 구성

## Machine Learning

In [None]:
# 훈련 데이터 준비
train_images = []
train_labels = []

for images, labels, paths in train_loader:
    # 이미지 데이터를 1차원 벡터로 변환
    images = images.view(images.size(0), -1)
    train_images.append(images.numpy())
    train_labels.append(labels.numpy())

train_images = np.concatenate(train_images, axis=0)
train_labels = np.concatenate(train_labels, axis=0)

# 테스트 데이터 준비
test_images = []
test_labels = []

for images, labels, paths in test_loader:
    # 이미지 데이터를 1차원 벡터로 변환
    images = images.view(images.size(0), -1)
    test_images.append(images.numpy())
    test_labels.append(labels.numpy())

test_images = np.concatenate(test_images, axis=0)
test_labels = np.concatenate(test_labels, axis=0)

# Random Forest 모델 생성 및 학습
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(train_images, train_labels)

# 예측
predictions = model.predict(test_images)

# 정확도 평가
accuracy = accuracy_score(test_labels, predictions)
print("Accuracy:", accuracy)

## Deep Learning

In [None]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        ############### Conv2d, MaxPool2d, Linear 함수에 들어갈 파라미터를 채우세요 ##############
        self.conv1 = nn.Conv2d(3, 5, 3) # in_channel, out_channel, kernel size
        self.pool = nn.MaxPool2d(3, 2) # kernel_size, stride
        self.conv2 = nn.Conv2d(5, 10, 3)
        self.fc1 = nn.Linear(58320, 160) # in_features, out_features
        self.fc2 = nn.Linear(160, 120)
        self.fc3 = nn.Linear(120, 26)
        ###########################################################################################


    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Use GPU if it's available # colab 런타임 유형 변경에서 GPU 선택할 것

In [None]:
model = Net() # define the network
model = model.to(device) # send the network to the device

In [None]:
criterion = nn.CrossEntropyLoss() # loss function, 변경 가능
# optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9, weight_decay=0.001) # optimizer, 변경 가능
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

In [None]:
def calculate_topk_accuracy(y_pred, y, k = 5):
    with torch.no_grad():
        batch_size = y.shape[0]
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        corrects = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct = corrects[:1].reshape(-1).float().sum(0, keepdim = True)
        acc = correct/ batch_size
    return acc

In [None]:
for idx, x in enumerate(train_loader):
  print(x[0])
  print(x[1])

  break

In [None]:
def train(model, iterator, optimizer, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for idx, data in enumerate(iterator):
        
        x = data[0].to(device)
        y = data[1].to(device)
        
        optimizer.zero_grad()
                
        y_pred = model(x)
        
        loss = criterion(y_pred, y)
        
        acc = calculate_topk_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        # scheduler.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    epoch_loss /= len(iterator)
    epoch_acc /= len(iterator)
        
    return epoch_loss, epoch_acc

In [None]:
def evaluate(model, iterator, criterion, device):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():

        for idx, data in enumerate(iterator):

           x = data[0].to(device)
           y = data[1].to(device)

           y_pred = model(x)

           loss = criterion(y_pred, y)

           acc = calculate_topk_accuracy(y_pred, y)

           epoch_loss += loss.item()
           epoch_acc += acc.item()

          
    epoch_loss /= len(iterator)
    epoch_acc /= len(iterator)
        
    return epoch_loss, epoch_acc

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
import time

In [None]:
best_valid_loss = float('inf')
EPOCHS = 10
for epoch in range(EPOCHS):
    
    start_time = time.monotonic()
    
    train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
    valid_loss, valid_acc = evaluate(model, test_loader, criterion, device)
        
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut5-model.pt')

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:6.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc: {valid_acc*100:6.2f}%')

## 모델 학습

## 모델 평가