In [1]:
import os
import zipfile
import numpy as np
import json  # <-- Add this import

# 1) Paths
SRC_NPZ   = './dataset/4k_1/train.npz'
DST_DIR   = './dataset/audio_only'
DST_X_NPY = os.path.join(DST_DIR, 'train_x.npy')
DST_Y_NPY = os.path.join(DST_DIR, 'train_y.npy')
DST_NPZ   = os.path.join(DST_DIR, 'train.npz')
os.makedirs(DST_DIR, exist_ok=True)

# 2) Load only the labels (small) to find indices
with np.load(SRC_NPZ) as data:
    y = data['y']                # shape = (N,), dtype uint8
print(f"Total samples in train.npz: {len(y)}")

# 3) Define archive class names & map to label IDs
label_names = [ 'aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma']
# (make sure these exactly match your classes.json for scenario 1!)
# Load the full list of 75 names so we can get their integer IDs:
with open('classes.json') as f:
    all_labels = json.load(f)['1']    # scenario “1” list of 75 names
archive_ids = [ all_labels.index(name) for name in label_names ]
print("Archive label IDs:", archive_ids)

# 4) Compute which indices to keep
mask = np.isin(y, archive_ids)
count = mask.sum()
print(f"Keeping {count} archive‐type fragments out of {len(y)} total.")

# 5) Extract the raw 'x' array to disk so we can mmap it
#    (this only needs to happen once; next runs you can skip if memmap file exists)
MEMMAP_X = './dataset/4k_1/train_x.npy'
if not os.path.exists(MEMMAP_X):
    print("→ Extracting x.npy from train.npz to disk…")
    with zipfile.ZipFile(SRC_NPZ) as zf:
        # find the entry for 'x' (could be 'x.npy' inside the zip)
        member = [m for m in zf.namelist() if m.startswith('x') and m.endswith('.npy')][0]
        zf.extract(member, os.path.dirname(MEMMAP_X))
        os.rename(os.path.join(os.path.dirname(MEMMAP_X), member), MEMMAP_X)

# 6) Open x via memmap
x = np.memmap(MEMMAP_X, mode='r', dtype=np.uint8, shape=(len(y), 4096))

# 7) Allocate new memmap for just archives
out_x = np.memmap(DST_X_NPY,
                  mode='w+',
                  dtype=np.uint8,
                  shape=(count, 4096))
out_y = np.empty((count,), dtype=y.dtype)

# 8) Copy in a single pass
idx_out = 0
for idx_in, keep in enumerate(mask):
    if keep:
        out_x[idx_out] = x[idx_in]
        out_y[idx_out] = y[idx_in]
        idx_out += 1
print("→ Copied all audio fragments.")

# 9) Flush to disk
out_x.flush()
np.save(DST_Y_NPY, out_y)

# 10) (Optional) bundle into a single .npz for convenience
np.savez(DST_NPZ, x=out_x, y=out_y)
print(f"✅ audio only train set written to {DST_NPZ}")


Total samples in train.npz: 6144000
Archive label IDs: [64, 65, 66, 67, 68, 69, 70]
Keeping 573394 archive‐type fragments out of 6144000 total.
→ Copied all audio fragments.
✅ audio only train set written to ./dataset/audio_only\train.npz


In [2]:
import os
import zipfile
import numpy as np
import json  # <-- Add this import

# 1) Define your archive class names & map to label IDs
label_names = [ 'aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma']
# (make sure these exactly match your classes.json for scenario 1!)
# Load the full list of 75 names so we can get their integer IDs:
with open('classes.json') as f:
    all_labels = json.load(f)['1']  # scenario “1” list of 75 names
archive_ids = [all_labels.index(name) for name in label_names]
print("Archive label IDs:", archive_ids)


# 2) Function to extract subsets for val and test
def extract_subset(split):
    SRC_NPZ = f'./dataset/4k_1/{split}.npz'
    DST_DIR = './dataset/audio_only'
    DST_X_NPY = os.path.join(DST_DIR, f'{split}_x.npy')
    DST_Y_NPY = os.path.join(DST_DIR, f'{split}_y.npy')
    DST_NPZ = os.path.join(DST_DIR, f'{split}.npz')
    os.makedirs(DST_DIR, exist_ok=True)

    # 3) Load only the labels (small) to find indices
    with np.load(SRC_NPZ) as data:
        y = data['y']  # shape = (N,), dtype uint8
    print(f"Total samples in {split}.npz: {len(y)}")

    # 4) Compute which indices to keep
    mask = np.isin(y, archive_ids)
    count = mask.sum()
    print(f"Keeping {count} audio‐type fragments out of {len(y)} total.")

    # 5) Extract the raw 'x' array to disk so we can mmap it (only once)
    MEMMAP_X = f'./dataset/4k_1/{split}_x.npy'
    if not os.path.exists(MEMMAP_X):
        print(f"→ Extracting {split}_x.npy from {split}.npz to disk…")
        with zipfile.ZipFile(SRC_NPZ) as zf:
            # find the entry for 'x' (could be 'x.npy' inside the zip)
            member = [m for m in zf.namelist() if m.startswith('x') and m.endswith('.npy')][0]
            zf.extract(member, os.path.dirname(MEMMAP_X))
            os.rename(os.path.join(os.path.dirname(MEMMAP_X), member), MEMMAP_X)

    # 6) Open x via memmap
    x = np.memmap(MEMMAP_X, mode='r', dtype=np.uint8, shape=(len(y), 4096))

    # 7) Allocate new memmap for just archives
    out_x = np.memmap(DST_X_NPY,
                      mode='w+',
                      dtype=np.uint8,
                      shape=(count, 4096))
    out_y = np.empty((count,), dtype=y.dtype)

    # 8) Copy in a single pass
    idx_out = 0
    for idx_in, keep in enumerate(mask):
        if keep:
            out_x[idx_out] = x[idx_in]
            out_y[idx_out] = y[idx_in]
            idx_out += 1
    print(f"→ Copied all audio fragments from {split}.")

    # 9) Flush to disk
    out_x.flush()
    np.save(DST_Y_NPY, out_y)

    # 10) (Optional) bundle into a single .npz for convenience
    np.savez(DST_NPZ, x=out_x, y=out_y)
    print(f"✅ audio-only {split} set written to {DST_NPZ}")


# 11) Run for val and test
for split in ['val', 'test']:
    extract_subset(split)


Archive label IDs: [64, 65, 66, 67, 68, 69, 70]
Total samples in val.npz: 768000
Keeping 71797 audio‐type fragments out of 768000 total.
→ Copied all audio fragments from val.
✅ audio-only val set written to ./dataset/audio_only\val.npz
Total samples in test.npz: 768000
Keeping 71609 audio‐type fragments out of 768000 total.
→ Copied all audio fragments from test.
✅ audio-only test set written to ./dataset/audio_only\test.npz


In [3]:
import os
import numpy as np

# 1) Define your archive labels (must match exactly the ones you filtered)

label_names = [ 'aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma']

def load(split='train', data_dir='./dataset/audio_only'):
    # 1) load raw x,y
    npz = np.load(os.path.join(data_dir, f"{split}.npz"), mmap_mode='r')
    x, y = npz['x'], npz['y']   # x.shape = (N,4096), y in [27..39]

    # 2) build mapping from original IDs to 0–12
    with open('classes.json') as f:
        all_labels = json.load(f)['1']   # list of 75 names
    archive_ids = [ all_labels.index(name) for name in label_names ]
    id2new = { orig:i for i,orig in enumerate(archive_ids) }

    # 3) remap y
    #    we can do it in-place since y is small
    y_remapped = np.vectorize(id2new.get)(y)
    # 4) sanity check
    assert y_remapped.min() == 0 and y_remapped.max() == len(label_names)-1

    return x, y_remapped, label_names


# Example usage:
# x_train, y_train, archive_labels = load_archives('train')
# x_val,   y_val,   _                = load_archives('val')
# x_test,  y_test,  _                = load_archives('test')


In [4]:
x, y, labels = load('train')

In [5]:
import torch

x_tensor = torch.tensor(x, dtype=torch.uint8)  # assuming x contains int byte values (0-255 + padding)
del x
y_tensor = torch.tensor(y, dtype=torch.uint8)
del y
train_dataset = torch.utils.data.TensorDataset(x_tensor, y_tensor)
print(torch.min(y_tensor), torch.max(y_tensor))  # Print the minimum and maximum of your labels
del x_tensor
del y_tensor
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True,num_workers=6)
del train_dataset

tensor(0, dtype=torch.uint8) tensor(6, dtype=torch.uint8)


In [6]:
labels

['aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma']

In [7]:
x, y, labels = load('test')

In [8]:
import torch

x_tensor = torch.tensor(x, dtype=torch.uint8)  # assuming x contains int byte values (0-255 + padding)
del x
y_tensor = torch.tensor(y, dtype=torch.uint8)
del y
test_dataset = torch.utils.data.TensorDataset(x_tensor, y_tensor)
del x_tensor
del y_tensor
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False,num_workers=6)
del test_dataset

In [9]:
x, y, labels = load('val')

In [10]:
import torch

x_tensor = torch.tensor(x, dtype=torch.uint8)  # assuming x contains int byte values (0-255 + padding)
del x
y_tensor = torch.tensor(y, dtype=torch.uint8)
del y
val_dataset = torch.utils.data.TensorDataset(x_tensor, y_tensor)
del x_tensor
del y_tensor
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False,num_workers=6)
del val_dataset

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim, 1)

    def forward(self, gru_output):
        # gru_output: (B, L, H)
        attn_weights = F.softmax(self.attn(gru_output), dim=1)  # (B, L, 1)
        context = torch.sum(attn_weights * gru_output, dim=1)   # (B, H)
        return context

class CNN_GRU_Attn_Classifier(nn.Module):
    def __init__(self, num_classes):
        super(CNN_GRU_Attn_Classifier, self).__init__()

        self.embedding_dim = 64
        self.vocab_size = 257  # 0–255 + 1 for PAD

        # Embedding layer
        self.embedding = nn.Embedding(
            num_embeddings=self.vocab_size,
            embedding_dim=self.embedding_dim,
            padding_idx=256
        )

        # CNN layers
        self.conv1 = nn.Conv1d(self.embedding_dim, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(64)

        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(128)

        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(256)

        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.3)

        # GRU layer
        self.gru = nn.GRU(input_size=256, hidden_size=128, num_layers=1,
                          batch_first=True, bidirectional=True)

        # Attention Layer
        self.attention = Attention(hidden_dim=128 * 2)  # Bidirectional GRU output

        # Fully connected layers
        self.fc1 = nn.Linear(128 * 2, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.embedding(x)        # (B, L, D)
        x = x.permute(0, 2, 1)       # (B, D, L)

        x = self.pool(F.gelu(self.bn1(self.conv1(x))))
        x = self.pool(F.gelu(self.bn2(self.conv2(x))))
        x = self.pool(F.gelu(self.bn3(self.conv3(x))))  # (B, 256, L_out)

        x = x.permute(0, 2, 1)       # (B, L_out, 256)
        gru_out, _ = self.gru(x)     # (B, L_out, 2*128)

        x = self.attention(gru_out)  # (B, 2*128)

        x = self.dropout(F.gelu(self.fc1(x)))
        x = self.fc2(x)

        return x




In [12]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [13]:
print(labels)

['aiff', 'flac', 'm4a', 'mp3', 'ogg', 'wav', 'wma']


In [14]:
device

device(type='cuda')

In [15]:
model = CNN_GRU_Attn_Classifier(num_classes=7).to(device)


In [16]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=1, factor=0.5)



In [17]:
import requests
from pathlib import Path 

# Download helper functions from Learn PyTorch repo (if not already downloaded)
if Path("helper_functions.py").is_file():
  print("helper_functions.py already exists, skipping download")
else:
  print("Downloading helper_functions.py")
  # Note: you need the "raw" GitHub URL for this to work
  request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/helper_functions.py")
  with open("helper_functions.py", "wb") as f:
    f.write(request.content)
from helper_functions import accuracy_fn

helper_functions.py already exists, skipping download


In [18]:
from tqdm import tqdm
import torch

def train_model(model, train_loader, criterion, optimizer, num_epochs=10, device=device):
    model.train()
    
    for epoch in range(num_epochs):
        total_loss = 0.0
        
        # Wrap your DataLoader in tqdm
        batch_iter = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")
        for X, y in batch_iter:
            # send to device and cast to long only per‑batch
            X = X.to(device).long()            
            y = y.to(device).long()
            
            # forward / backward
            y_pred = model(X)
            loss = criterion(y_pred, y)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            total_loss += loss.item()
            
            current_n = batch_iter.n if batch_iter.n > 0 else 1
            batch_iter.set_postfix(loss=total_loss / current_n)
        
        avg_loss = total_loss / len(train_loader)
        print(f"→ Epoch {epoch+1} complete. Avg Loss: {avg_loss:.4f}")
        # scheduler.step()


In [19]:
def test_model(model, test_loader, criterion, device=device):
    model.eval()  # Set model to evaluation mode
    with torch.inference_mode():
        test_loss, test_acc = 0, 0
        
        for X, y in test_loader:
            X, y = X.to(torch.long).to(device), y.to(torch.long).to(device)
    

            test_pred = model(X)  # Forward pass
            loss = criterion(test_pred, y)  # Compute loss
            test_loss += loss.item()
            test_acc += accuracy_fn(y_true=y, y_pred=test_pred.argmax(dim=1))
            
        test_loss /= len(test_loader)
        test_acc /= len(test_loader)
    
    print(f"Test Loss: {test_loss:.4f}, Accuracy: {test_acc:.2f}%")
    scheduler.step(test_loss)



In [20]:


train_model(model, train_loader, criterion, optimizer, num_epochs=1)
test_model(model, val_loader, criterion)

 

Epoch 1/1: 100%|██████████| 8960/8960 [25:33<00:00,  5.84batch/s, loss=0.0528]


→ Epoch 1 complete. Avg Loss: 0.0528
Test Loss: 0.0224, Accuracy: 99.33%


In [None]:


train_model(model, train_loader, criterion, optimizer, num_epochs=1)
test_model(model, val_loader, criterion)

 

Epoch 1/1:   0%|          | 0/8960 [00:00<?, ?batch/s]

In [21]:
torch.save(model.state_dict(),'./models/FFTaudio.pth')

In [None]:
model.load_state_dict(torch.load('./models/FFTexecutables.pth', weights_only=True))

In [None]:
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np

def plot_confusion_and_most_confused(model, dataloader, classes, device='cuda'):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device).to(torch.long)
            y = y.to(device).to(torch.long)

            outputs = model(X)
            preds = torch.argmax(outputs, dim=1)

            all_preds.append(preds.cpu())
            all_labels.append(y.cpu())

    # Concatenate all batches
    all_preds = torch.cat(all_preds)
    all_labels = torch.cat(all_labels)

    # Compute confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    cm_normalized = cm.astype('float') / cm.sum(axis=1, keepdims=True)

    # Plot confusion matrix
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=new_classes, yticklabels=new_classes)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Normalized Confusion Matrix')
    plt.show()

    # Find most confused class pairs (excluding correct predictions)
    cm_copy = cm.copy()
    np.fill_diagonal(cm_copy, 0)  # Ignore diagonal (correct predictions)

    confused_pairs = []
    for true_idx in range(len(classes)):
        for pred_idx in range(len(classes)):
            if cm_copy[true_idx, pred_idx] > 0:
                confused_pairs.append((cm_copy[true_idx, pred_idx], classes[true_idx], classes[pred_idx]))

    confused_pairs.sort(reverse=True)

    print("\nTop 5 Most Confused Class Pairs:")
    for count, true_class, pred_class in confused_pairs[:5]:
        print(f"True '{true_class}' → Predicted '{pred_class}': {count} times")



In [None]:
classes = ['apk', 'jar', 'msi', 'dmg', '7z', 'bz2', 'deb', 'gz', 'pkg', 'rar', 'rpm', 'xz', 'zip']


plot_confusion_and_most_confused(model, val_loader, new_classes, device='cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
# Original classes
classes = ['apk', 'jar', 'msi', 'dmg', '7z', 'bz2', 'deb', 'gz', 'pkg', 'rar', 'rpm', 'xz', 'zip']

# Define merging rules
merge_map = {
    'xz': 'compressed',
    '7z': 'compressed',
    'deb': 'compressed',
    'bz2': 'archive',
    'dmg': 'archive'
}

# Create the final list of new classes
new_classes = sorted(set(merge_map.values()).union(set([cls for cls in classes if cls not in merge_map])))
print("New Classes:", new_classes)

# Function to map old labels to new ones
def remap_labels(old_labels):
    return [merge_map.get(label, label) for label in old_labels]

# Example usage
# Suppose `y_true` and `y_pred` are your true and predicted labels
# y_true_new = remap_labels(y_true)
# y_pred_new = remap_labels(y_pred)

# Update your model's num_classes accordingly:
# num_classes = len(new_classes)


In [None]:
# First define the remap_labels function
def remap_labels(old_labels):
    return [merge_map.get(label, label) for label in old_labels]

# Before evaluating:
y_true_remapped = remap_labels(y_true)
y_pred_remapped = remap_labels(y_pred)


In [None]:
# 1. Define your merge_map
merge_map = {
    '7z': '7z/xz/deb',
    'xz': '7z/xz/deb',
    'deb': '7z/xz/deb',
    'bz2': 'bz2/dmg',
    'dmg': 'bz2/dmg',
    'zip': 'zip/rar',
    'rar': 'zip/rar'
}

# 2. Define remap function
def remap_labels(old_labels):
    return [merge_map.get(label, label) for label in old_labels]

# 3. Initialize arrays to collect true and predicted labels
all_preds = []
all_labels = []

# 4. Put model in evaluation mode
model.eval()

# 5. No gradient tracking
with torch.no_grad():
    for X_batch, y_batch in val_loader:   # or test_loader
        X_batch = X_batch.to(device)
        X_batch = X_batch.to(torch.long)
        y_batch = y_batch.to(device).to(torch.long)

        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

# 6. Now you have arrays
y_true = [classes[i] for i in all_labels]
y_pred = [classes[i] for i in all_preds]

# 7. Remap
y_true_remapped = remap_labels(y_true)
y_pred_remapped = remap_labels(y_pred)

# 8. New class labels after merging
new_classes = sorted(list(set(y_true_remapped)))

# 9. Plot confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# Step 1: Get the raw confusion matrix
cm = confusion_matrix(y_true_remapped, y_pred_remapped, labels=new_classes)

# Step 2: Normalize it row-wise (so each row sums to 1)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Step 3: Plot it
plt.figure(figsize=(10,8))
sns.heatmap(cm_normalized, annot=True, fmt=".2f", cmap='Blues',
            xticklabels=new_classes, yticklabels=new_classes)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Normalized Confusion Matrix (After Merging)')
plt.show()


In [None]:
import os
import numpy as np
import zipfile
import json

# Merge mapping: old label name → new label int
MERGE_GROUPS = {
    0: ['7z', 'xz', 'deb'],
    1: ['bz2', 'dmg'],
    2: ['zip', 'rar'],
}

# Flatten for lookup: archive name → merged label
merge_name2label = {}
for new_label, names in MERGE_GROUPS.items():
    for n in names:
        merge_name2label[n] = new_label

print("Merged archive groups:", merge_name2label)

def merge_and_save(split):
    SRC = f'./dataset/archives_only/{split}.npz'
    DST_DIR = './dataset/merged_archives'
    os.makedirs(DST_DIR, exist_ok=True)
    DST_X = os.path.join(DST_DIR, f'{split}_x.npy')
    DST_Y = os.path.join(DST_DIR, f'{split}_y.npy')
    DST_NPZ = os.path.join(DST_DIR, f'{split}.npz')

    # 1. Load all labels
    with open('classes.json') as f:
        all_labels = json.load(f)['1']

    # 2. Compute old label id → merged label mapping
    merge_id2label = {}
    for name, newlab in merge_name2label.items():
        idx = all_labels.index(name)
        merge_id2label[idx] = newlab

    print(f"{split}: Merging these label ids:", merge_id2label)

    # 3. Load y and filter indices
    with np.load(SRC) as data:
        y = data['y']
        N = len(y)
    keep_mask = np.ones_like(y, dtype=bool)
    count = N
    print(f"{split}: Keeping {count} of {N} samples.")

    # 4. Memmap x if needed
    MEMMAP_X = SRC.replace('.npz', '_x.npy')
    if not os.path.exists(MEMMAP_X):
        print(f"→ Extracting {MEMMAP_X} from {SRC} ...")
        with zipfile.ZipFile(SRC) as zf:
            member = [m for m in zf.namelist() if m.startswith('x') and m.endswith('.npy')][0]
            zf.extract(member, os.path.dirname(MEMMAP_X))
            os.rename(os.path.join(os.path.dirname(MEMMAP_X), member), MEMMAP_X)
    x = np.memmap(MEMMAP_X, mode='r', dtype=np.uint8, shape=(N, 4096))

    # 5. Allocate output
    out_x = np.memmap(DST_X, mode='w+', dtype=np.uint8, shape=(count, 4096))
    out_y = np.empty((count,), dtype=np.uint8)

    # 6. Copy and relabel
    # 6. Copy and relabel
    idx_out = 0
    for idx_in in range(N):
        if keep_mask[idx_in]:
            out_x[idx_out] = x[idx_in]
            orig_label = y[idx_in]
            # If it's in our merge map, merge it. Otherwise, keep as is.
            out_y[idx_out] = merge_id2label.get(orig_label, orig_label)
            idx_out += 1

for split in ['train', 'val', 'test']:
    merge_and_save(split)