# Label-Flip Poisoning on ML-based Malware Detectors

Project notebook for an 8–10 minute demo of label-flip poisoning against classic ML malware detectors.

- Paper: Aryal et al., "Analysis of Label-Flip Poisoning Attack on Machine Learning Based Malware Detector" — http://arxiv.org/pdf/2301.01044
- Dataset: Kaggle competition "Malware detection" — https://www.kaggle.com/competitions/malware-detection/data

This notebook trains 8 models and compares baseline vs. 10% and 20% label-flip poisoning on the training set while keeping the test set clean.



### Paper context and goals
- Paper: Aryal et al., "Analysis of Label-Flip Poisoning Attack on Machine Learning Based Malware Detector" (`http://arxiv.org/pdf/2301.01044`)
- Threat model: training-time data poisoning via random label flipping; test set remains clean.
- Goal: quantify robustness of 8 classic ML models under 10% and 20% label-flip poisoning.
- Dataset note: The paper used VirusTotal/VirusShare; here we use your local tabular dataset in the same spirit (binary malware label).


### 1) Environment setup
Install Python dependencies required for data processing, modeling, and plotting.


In [None]:
pip -q install --upgrade pip && \
  pip -q install numpy pandas scikit-learn matplotlib seaborn tabulate kaggle joblib



### 2) Core imports and display settings
Load core libraries (NumPy, Pandas, Seaborn/Matplotlib) and set display options.


In [None]:
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

pd.set_option("display.max_columns", 120)
palette = sns.color_palette("deep")



### 3) Helpers and utilities
Import shared helpers from `utils.py` if available; otherwise fall back to minimal inline implementations.


In [None]:
# Helpers: import utils.py if present; otherwise define minimal fallbacks
import sys
from pathlib import Path
import numpy as np
import pandas as pd

try:
    sys.path.append(str(Path.cwd()))
    from utils import (
        infer_label_column, ensure_binary_labels, split_features_labels,
        build_models, build_pipelines, flip_labels, run_experiment,
    )
    print("Using local utils.py")
except Exception as e:
    print("utils.py not found; using fallback helpers")

    def infer_label_column(columns):
        for c in ["label","Label","target","Target","class","Class","malware","Malware","is_malware","HasDetections"]:
            if c in columns: return c
        return None

    def ensure_binary_labels(y):
        if y.dtype == bool:
            return y.astype(int)
        vals = sorted(pd.unique(y))
        assert len(vals) == 2, f"Expected binary labels, got {vals}"
        m = {vals[0]: 0, vals[1]: 1}
        return y.map(m).astype(int)

    from sklearn.pipeline import Pipeline
    from sklearn.compose import ColumnTransformer
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler

    from sklearn.linear_model import SGDClassifier, LogisticRegression, Perceptron
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.svm import LinearSVC
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.neural_network import MLPClassifier

    from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix

    def split_features_labels(df, label_col, drop_non_numeric=True):
        y = ensure_binary_labels(df[label_col])
        X = df.drop(columns=[label_col])
        num_cols = list(X.select_dtypes(include=[np.number]).columns)
        cat_cols = [] if drop_non_numeric else [c for c in X.columns if c not in num_cols]
        if drop_non_numeric:
            X = X[num_cols]
        return X, y, num_cols, cat_cols

    def build_models(random_state=42):
        return {
            "SGD": SGDClassifier(random_state=random_state, max_iter=1000, tol=1e-3),
            "RandomForest": RandomForestClassifier(n_estimators=200, random_state=random_state, n_jobs=-1),
            "LogisticRegression": LogisticRegression(max_iter=1000, solver="liblinear"),
            "KNN": KNeighborsClassifier(n_neighbors=5),
            "LinearSVM": LinearSVC(random_state=random_state),
            "DecisionTree": DecisionTreeClassifier(random_state=random_state),
            "Perceptron": Perceptron(random_state=random_state, max_iter=1000),
            "MLP": MLPClassifier(hidden_layer_sizes=(128,), max_iter=100, early_stopping=True, random_state=random_state),
        }

    def _needs_scaling(name):
        return name in {"SGD","LogisticRegression","KNN","LinearSVM","Perceptron","MLP"}

    def build_pipelines(models, numeric_cols, categorical_cols):
        pipes = {}
        for name, model in models.items():
            num_steps = [("imputer", SimpleImputer(strategy="median"))]
            if _needs_scaling(name):
                num_steps.append(("scaler", StandardScaler()))
            pre = ColumnTransformer([
                ("num", Pipeline(num_steps), numeric_cols),
                ("cat", "drop", categorical_cols),
            ])
            pipes[name] = Pipeline([("pre", pre), ("clf", model)])
        return pipes

    def flip_labels(y, flip_fraction, rng_seed=42):
        rng = np.random.default_rng(rng_seed)
        y_p = y.copy()
        n = len(y_p)
        k = int(flip_fraction * n)
        if k > 0:
            idx = rng.choice(n, size=k, replace=False)
            if hasattr(y_p, 'iloc'):
                y_p.iloc[idx] = 1 - y_p.iloc[idx]
            else:
                y_p[idx] = 1 - y_p[idx]
        else:
            idx = np.array([], dtype=int)
        return (y_p.values if hasattr(y_p, 'values') else y_p), idx

    def run_experiment(X_train, y_train, X_test, y_test, pipelines, flip_fracs=(0.0,0.1,0.2), seed=42):
        rows = []
        cms = {}
        for frac in flip_fracs:
            y_tr_p, _ = flip_labels(pd.Series(y_train), frac, rng_seed=seed)
            for name, pipe in pipelines.items():
                pipe.fit(X_train, y_tr_p)
                y_pred = pipe.predict(X_test)
                rows.append({
                    "model": name,
                    "flip_frac": frac,
                    "acc": accuracy_score(y_test, y_pred),
                    "prec": precision_score(y_test, y_pred, zero_division=0),
                    "rec": recall_score(y_test, y_pred, zero_division=0),
                })
                cms[(name, frac)] = confusion_matrix(y_test, y_pred)
        return pd.DataFrame(rows), cms



### 4) Optional: Kaggle authentication (skip if using a local CSV)
If you prefer downloading via Kaggle in Colab, place `kaggle.json` under `~/.kaggle/`. Otherwise, skip this section.


### (Optional) Kaggle authentication
- Only needed if you want to download from Kaggle. For local files, skip this cell.



### 5) (Optional) Download via Kaggle CLI
If you already have the dataset locally, skip this section entirely.


### 6) Configure dataset path (local CSV)
Point `INPUT_CSV` to your local dataset file. If left blank, the notebook will try to auto-detect a CSV in the current tree.

### 6b) Use a local CSV (recommended)
Set `INPUT_CSV` to the path of your already-downloaded dataset. If left blank, the notebook will try to auto-detect a CSV in the current tree.


ad of the 

downloaded

### 7) Load, split, build models, and run experiments
- Load the full dataset and infer the label column.
- Split into stratified train/test.
- Build 8 models and their pipelines.
- Run baseline (clean) and poisoned (10%, 20%) training and evaluate on clean test.


In [None]:
# Local CSV path (recommended): set INPUT_CSV to your downloaded dataset file
INPUT_CSV = ""  # e.g., "/Users/you/Downloads/malware/train.csv"

from glob import glob
if not INPUT_CSV:
    candidates = glob("**/*.csv", recursive=True)
    print("Found CSV candidates:", candidates[:5])
    INPUT_CSV = candidates[0] if candidates else None

print("Using:", INPUT_CSV)
import os
assert INPUT_CSV and os.path.exists(INPUT_CSV), "Please set INPUT_CSV to your local CSV path."



### Sanity checks (paper-aligned)
Quick checks before modeling:
- Dataset shape
- Inferred label column and class balance
- Top missing-value ratios (helps understand imputation impact)



In [None]:
print("Dataset shape:", df.shape)
print("Label column:", label_col)
print("Class balance (head):")
print(df[label_col].value_counts(normalize=True).head())

# Show top-10 columns by missing ratio
na_ratio = df.isna().mean().sort_values(ascending=False)
print("\nTop-10 missing-value ratios:\n", na_ratio.head(10))



In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer

from sklearn.linear_model import SGDClassifier, LogisticRegression, Perceptron
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier

# Try importing local utils; if not present, define minimal helpers inline
try:
    sys.path.append(str(Path.cwd()))
    from utils import (
        infer_label_column, ensure_binary_labels, split_features_labels,
        build_models, build_pipelines, flip_labels, run_experiment,
    )
    USE_LOCAL_UTILS = True
except Exception as e:
    USE_LOCAL_UTILS = False

    def infer_label_column(columns):
        for c in ["label","Label","target","Target","class","Class","malware","Malware","is_malware","HasDetections"]:
            if c in columns: return c
        return None

    def ensure_binary_labels(y):
        if y.dtype == bool:
            return y.astype(int)
        vals = sorted(pd.unique(y))
        assert len(vals) == 2, f"Expected binary labels, got {vals}"
        m = {vals[0]: 0, vals[1]: 1}
        return y.map(m).astype(int)

    def split_features_labels(df, label_col, drop_non_numeric=True):
        y = ensure_binary_labels(df[label_col])
        X = df.drop(columns=[label_col])
        num_cols = list(X.select_dtypes(include=[np.number]).columns)
        cat_cols = [] if drop_non_numeric else [c for c in X.columns if c not in num_cols]
        if drop_non_numeric:
            X = X[num_cols]
        return X, y, num_cols, cat_cols

    def build_models(random_state=42):
        return {
            "SGD": SGDClassifier(random_state=random_state, max_iter=1000, tol=1e-3),
            "RandomForest": RandomForestClassifier(n_estimators=200, random_state=random_state, n_jobs=-1),
            "LogisticRegression": LogisticRegression(max_iter=1000, solver="liblinear"),
            "KNN": KNeighborsClassifier(n_neighbors=5),
            "LinearSVM": LinearSVC(random_state=random_state),
            "DecisionTree": DecisionTreeClassifier(random_state=random_state),
            "Perceptron": Perceptron(random_state=random_state, max_iter=1000),
            "MLP": MLPClassifier(hidden_layer_sizes=(128,), max_iter=100, early_stopping=True, random_state=random_state),
        }

    def _needs_scaling(name):
        return name in {"SGD","LogisticRegression","KNN","LinearSVM","Perceptron","MLP"}

    def build_pipelines(models, numeric_cols, categorical_cols):
        pipes = {}
        for name, model in models.items():
            num_steps = [("imputer", SimpleImputer(strategy="median"))]
            if _needs_scaling(name):
                num_steps.append(("scaler", StandardScaler()))
            pre = ColumnTransformer([
                ("num", Pipeline(num_steps), numeric_cols),
                ("cat", "drop", categorical_cols),
            ])
            pipes[name] = Pipeline([("pre", pre), ("clf", model)])
        return pipes

    def flip_labels(y, flip_fraction, rng_seed=42):
        rng = np.random.default_rng(rng_seed)
        y_p = y.copy()
        n = len(y_p)
        k = int(flip_fraction * n)
        if k > 0:
            idx = rng.choice(n, size=k, replace=False)
            y_p.iloc[idx] = 1 - y_p.iloc[idx]
        else:
            idx = np.array([], dtype=int)
        return y_p.values if hasattr(y_p, 'values') else y_p, idx

    def run_experiment(X_train, y_train, X_test, y_test, pipelines, flip_fracs=(0.0,0.1,0.2), seed=42):
        rows = []
        cms = {}
        for frac in flip_fracs:
            y_tr_p, _ = flip_labels(pd.Series(y_train), frac, rng_seed=seed)
            for name, pipe in pipelines.items():
                pipe.fit(X_train, y_tr_p)
                y_pred = pipe.predict(X_test)
                rows.append({
                    "model": name,
                    "flip_frac": frac,
                    "acc": accuracy_score(y_test, y_pred),
                    "prec": precision_score(y_test, y_pred, zero_division=0),
                    "rec": recall_score(y_test, y_pred, zero_division=0),
                })
                cms[(name, frac)] = confusion_matrix(y_test, y_pred)
        return pd.DataFrame(rows), cms

# Load dataset
assert INPUT_CSV is not None, "Please set INPUT_CSV to a CSV path."
if 'df' not in locals():
    df = pd.read_csv(INPUT_CSV)
label_col = infer_label_column(df.columns)
if label_col is None:
    raise ValueError("Could not infer label column; please set it manually.")
print("Label column:", label_col)

# For runtime, optionally sample with stratified approach; default is FULL dataset

def stratified_sample(df, label_col, sample_size=10_000, seed=42):
    if sample_size <= 0 or sample_size >= len(df):
        return df
    rng = np.random.default_rng(seed)
    parts = []
    for v, g in df.groupby(label_col):
        frac = len(g) / len(df)
        k = max(1, int(round(frac * sample_size)))
        idx = rng.choice(len(g), size=min(k, len(g)), replace=False)
        parts.append(g.iloc[idx])
    return pd.concat(parts).sample(frac=1.0, random_state=seed).reset_index(drop=True)

SAMPLE_SIZE = 0  # set >0 (e.g., 10000) to run a faster demo; 0 = full dataset
df_sampled = stratified_sample(df, label_col=label_col, sample_size=SAMPLE_SIZE, seed=42)
print("Rows used:", len(df_sampled))

X, y, num_cols, cat_cols = split_features_labels(df_sampled, label_col=label_col, drop_non_numeric=True)
X_train, X_test, y_train, y_test = train_test_split(
    X, y.values, test_size=0.2, random_state=42, stratify=y.values
)
print("Train/Test:", len(X_train), len(X_test))

models = build_models(random_state=42)
pipes = build_pipelines(models, numeric_cols=num_cols, categorical_cols=cat_cols)
results, cms = run_experiment(
    X_train, y_train, X_test, y_test, pipelines=pipes, flip_fracs=(0.0,0.1,0.2), seed=42
)
results.sort_values(["model","flip_frac"], inplace=True)
results



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

pivot_acc = results.pivot(index="model", columns="flip_frac", values="acc")
plt.figure(figsize=(10,4))
sns.heatmap(pivot_acc, annot=True, fmt=".3f", cmap="viridis")
plt.title("Accuracy by Model and Flip Fraction")
plt.show()

# Plot confusion matrices for a couple of models at 20%
max_frac = results["flip_frac"].max()
for m in ["LogisticRegression", "RandomForest", "MLP"]:
    key = (m, max_frac)
    if key in cms:
        cm = cms[key]
        plt.figure(figsize=(3.8,3.2))
        sns.heatmap(cm, annot=True, fmt="d", cbar=False, cmap="Blues",
                    xticklabels=["Benign","Malware"], yticklabels=["Benign","Malware"])
        plt.title(f"{m} @ flip={max_frac}")
        plt.ylabel("True")
        plt.xlabel("Predicted")
        plt.tight_layout()
        plt.show()



### Discussion
- Which models showed the largest degradation as flip increased?
- Did precision or recall suffer more? For malware detection, recall is critical (missed malware).
- How do results align with the paper's observations?

Reference: Aryal et al., http://arxiv.org/pdf/2301.01044

