<a href="https://colab.research.google.com/github/tahsin599/MachineLearning/blob/main/malware.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from imblearn.over_sampling import SMOTE
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import random
import os

# Set seeds for reproducibility
def seed_all(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

seed_all()

class CNN1DImproved(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(CNN1DImproved, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(1, 32, kernel_size=3),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(32, 64, kernel_size=3),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.flatten_dim = self._get_flatten_dim(input_dim)
        self.fc_layers = nn.Sequential(
            nn.Linear(self.flatten_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def _get_flatten_dim(self, input_dim):
        x = torch.zeros(1, 1, input_dim)
        x = self.conv1(x)
        x = self.conv2(x)
        return x.view(1, -1).size(1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        return self.fc_layers(x)

def apply_smote(X, y):
    smote = SMOTE(random_state=42)
    return smote.fit_resample(X, y)

def load_npz_with_cnn_pytorch(path_npz='mh100_perm_intent.npz', output_file='head_output.txt', cnn_report_file='cnn_report.txt'):
    loaded = np.load(path_npz, allow_pickle=True)
    features = loaded['features']
    sha256 = loaded['sha256']
    pacote = loaded['pacote']
    labels = loaded['label']
    columns_names = loaded['column_names']

    feature_names = columns_names[2:-1]
    df = pd.DataFrame(features, columns=feature_names)
    df.insert(0, 'SHA256', sha256)
    df.insert(1, 'PACOTE', pacote)
    df['label'] = labels

    with open(output_file, 'w') as f:
        f.write(df.head().to_string(index=False))

    X = df[feature_names].astype(np.uint8).values
    y = df['label'].astype(np.uint8)

    X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)
    X_train_res, y_train_res = apply_smote(X_train, y_train)

    X_train_res = X_train_res / 255.0
    X_test = X_test / 255.0

    X_train_tensor = torch.tensor(X_train_res, dtype=torch.float32).unsqueeze(1)
    y_train_tensor = torch.tensor(np.array(y_train_res), dtype=torch.long)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
    y_test_tensor = torch.tensor(np.array(y_test), dtype=torch.long)

    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    model = CNN1DImproved(X_train_res.shape[1], len(np.unique(y)))
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.5)

    best_loss = float('inf')
    patience_counter = 0
    early_patience = 5

    model.train()
    for epoch in range(30):
        total_loss = 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        scheduler.step(avg_loss)

        print(f"Epoch {epoch+1} - Avg Loss: {avg_loss:.4f}")

        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), 'best_cnn_model.pt')
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= early_patience:
                print("Early stopping triggered.")
                break

    model.load_state_dict(torch.load('best_cnn_model.pt'))
    model.eval()
    with torch.no_grad():
        outputs = model(X_test_tensor)
        _, y_pred = torch.max(outputs, 1)

    report = classification_report(y_test_tensor.numpy(), y_pred.numpy(), digits=4)
    with open(cnn_report_file, 'w') as f:
        f.write("CNN Classification Report (Improved PyTorch)\n")
        f.write(report)

# Run
load_npz_with_cnn_pytorch()


Epoch 1 - Avg Loss: 0.2901
Epoch 2 - Avg Loss: 0.2391
Epoch 3 - Avg Loss: 0.2233
