In [1]:
import jsonlines
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import hashlib

In [2]:
class ReadData():
    def __init__(self, data, num):
        self.data = data
        self.data_list = []
        self.num = num

    def split_data(self):
        with open(self.data, 'r', encoding='utf-8') as f:
            for item in jsonlines.Reader(f):
                self.data_list.append(item)
        datas, labels = [], []
        for data in self.data_list[:self.num]:
            if data['label'] >= 0:
                datas.append(data)
                labels.append(data['label'])
        return datas, labels

In [3]:
def wash(x):
    return [str(i) for i in x]

class FeatureExtractor():
    def __init__(self, texts):
        self.texts = texts

    def byteentropy(self):
        return np.array([text['byteentropy'] for text in self.texts])

    def histogram(self):
        return np.array([text['histogram'] for text in self.texts])

    def hash256(self, items):
        all_hash256 = []
        for item in items:
            hash256 = [0]*256
            for name in wash(item):
                hash_value = int(hashlib.sha256(name.encode()).hexdigest(), 16) % 256
                hash256[hash_value] += 1
            all_hash256.append(hash256)
        return np.array(all_hash256)

    def imports_hash(self):
        return self.hash256([text['imports'] for text in self.texts])

    def header_hash(self):
        return self.hash256([text['header'] for text in self.texts])

    def get_features(self):
        entropy = self.byteentropy()
        histogram = self.histogram()
        imports = self.imports_hash()
        header = self.header_hash()
        features = np.concatenate([entropy, imports, histogram, header], axis=1)
        return features


In [4]:
class MalwareClassifier(nn.Module):
    def __init__(self, input_dim):
        super(MalwareClassifier, self).__init__()
        
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=1, kernel_size=1, stride=1, padding=0),
            nn.Identity() 
        )
        
        self.rnn = nn.LSTM(input_size=input_dim, hidden_size=input_dim, num_layers=1, batch_first=True)
        
        self.model = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.BatchNorm1d(1024),
            nn.PReLU(),
            nn.Dropout(0.3),

            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.PReLU(),
            nn.Dropout(0.3),

            nn.Linear(512, 2)
        )

    def forward(self, x):
        x = torch.log10(1 + x)
        
        x = x.unsqueeze(1)
        x = self.cnn(x) 
        x = x.squeeze(1)
        

        x = x.unsqueeze(1)  
        x, _ = self.rnn(x) 
        x = x.squeeze(1)  
        
   
        logits = self.model(x)
        return logits

In [5]:
texts, labels = ReadData('./ember_dataset/train_features_1.jsonl', 5000).split_data()
labels = np.array(labels)

In [6]:
extractor = FeatureExtractor(texts)
features = extractor.get_features()
print("特征维度:", features.shape)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

特征维度: (3185, 1024)


In [7]:
X_train, X_test, y_train, y_test = train_test_split(
        features, labels, test_size=0.2, random_state=42, stratify=labels)

In [8]:
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.long).to(device)

In [9]:
input_dim = X_train.shape[1]
model = MalwareClassifier(input_dim).to(device)

In [10]:
def weights_init(m):
    if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.BatchNorm1d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)

model.apply(weights_init)

MalwareClassifier(
  (cnn): Sequential(
    (0): Conv1d(1, 1, kernel_size=(1,), stride=(1,))
    (1): Identity()
  )
  (rnn): LSTM(1024, 1024, batch_first=True)
  (model): Sequential(
    (0): Linear(in_features=1024, out_features=1024, bias=True)
    (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): PReLU(num_parameters=1)
    (3): Dropout(p=0.3, inplace=False)
    (4): Linear(in_features=1024, out_features=512, bias=True)
    (5): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): PReLU(num_parameters=1)
    (7): Dropout(p=0.3, inplace=False)
    (8): Linear(in_features=512, out_features=2, bias=True)
  )
)

In [11]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)
epochs = 100
batch_size = 256


In [None]:
model.train()
for epoch in range(epochs):
    permutation = torch.randperm(X_train.size()[0])
    epoch_loss = 0
    for i in range(0, X_train.size()[0], batch_size):
        indices = permutation[i:i+batch_size]
        batch_x, batch_y = X_train[indices], y_train[indices]

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / (X_train.size()[0] // batch_size)
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}')

Epoch [1/100], Loss: 1.1780
Epoch [2/100], Loss: 0.9537
Epoch [3/100], Loss: 0.8077
Epoch [4/100], Loss: 0.7266
Epoch [5/100], Loss: 0.6851
Epoch [6/100], Loss: 0.6240
Epoch [7/100], Loss: 0.5828
Epoch [8/100], Loss: 0.5488
Epoch [9/100], Loss: 0.5262
Epoch [10/100], Loss: 0.4820
Epoch [11/100], Loss: 0.4879
Epoch [12/100], Loss: 0.4311
Epoch [13/100], Loss: 0.4117
Epoch [14/100], Loss: 0.4051
Epoch [15/100], Loss: 0.3677
Epoch [16/100], Loss: 0.3631
Epoch [17/100], Loss: 0.3524
Epoch [18/100], Loss: 0.3232
Epoch [19/100], Loss: 0.3235
Epoch [20/100], Loss: 0.3091
Epoch [21/100], Loss: 0.2836
Epoch [22/100], Loss: 0.2902
Epoch [23/100], Loss: 0.2745
Epoch [24/100], Loss: 0.2696
Epoch [25/100], Loss: 0.2517
Epoch [26/100], Loss: 0.2535
Epoch [27/100], Loss: 0.2442
Epoch [28/100], Loss: 0.2462
Epoch [29/100], Loss: 0.2450
Epoch [30/100], Loss: 0.2184
Epoch [31/100], Loss: 0.2172
Epoch [32/100], Loss: 0.2248
Epoch [33/100], Loss: 0.2244
Epoch [34/100], Loss: 0.2050
Epoch [35/100], Loss: 0

In [None]:
model.eval()
with torch.no_grad():
    # 直接在 GPU 上进行推理
    outputs = model(X_test)  # X_test 已经在 GPU 上
    _, predicted = torch.max(outputs, 1)
    
    # 将结果移回 CPU 并转换为 numpy 数组
    y_test_cpu = y_test.cpu().numpy()
    predicted_cpu = predicted.cpu().numpy()
    
    # 计算准确率和分类报告
    accuracy = accuracy_score(y_test_cpu, predicted_cpu)
    print(f'Test Accuracy: {accuracy:.4f}')
    print(classification_report(y_test_cpu, predicted_cpu, digits=4))


Test Accuracy: 0.9451
              precision    recall  f1-score   support

           0     0.9279    0.9156    0.9217       225
           1     0.9542    0.9612    0.9577       412

    accuracy                         0.9451       637
   macro avg     0.9411    0.9384    0.9397       637
weighted avg     0.9449    0.9451    0.9450       637



In [None]:
model_cpu = model.to('cpu')

# 保存模型
torch.save(model_cpu.state_dict(), 'malware_classifier_cpu.pth')