In [1]:
# ! python -m pip install numpy scikit-learn tqdm pandas matplotlib seaborn

In [2]:
"""
This is a module toa be used as a reference for building other modules
"""

import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification, fetch_20newsgroups
from sklearn.preprocessing import LabelEncoder
import gzip
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
import pandas as pd


class GzipClassifier(ClassifierMixin, BaseEstimator):
    """An example classifier which implements a 1-NN algorithm.

    For more information regarding how to build your own classifier, read more
    in the :ref:`User Guide <user_guide>`.

    Parameters
    ----------
    demo_param : str, default='demo'
        A parameter used for demonstation of how to pass and store paramters.

    Attributes
    ----------
    X_ : ndarray, shape (n_samples, n_features)
        The input passed during :meth:`fit`.
    y_ : ndarray, shape (n_samples,)
        The labels passed during :meth:`fit`.
    classes_ : ndarray, shape (n_classes,)
        The classes seen at :meth:`fit`.
    """

    def __init__(self, k=3):
        self.k = k
        self.compressor = "gzip"
        self._set_compressor()

    def fit(self, X, y):
        """A reference implementation of a fitting function for a classifier.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            The training input samples.
        y : array-like, shape (n_samples,)
            The target values. An array of int.

        Returns
        -------
        self : object
            Returns self.
        """
        # Check that X and y have correct shape
        # X, y = check_X_y(X, y)
        # Store the classes seen during fit
        self.classes_ = unique_labels(y)

        self.X_ = X
        self.y_ = y
        Cxs = []

        for x in self.X_:
            Cx = self._compress(x)
            Cxs.append(Cx)
        self.Cx_ = Cxs
        # Return the classifier
        return self

    def _set_compressor(self):
        if self.compressor == "gzip":
            self._compress = self._gzip_compressor
        else:
            raise NotImplementedError(
                f"Compressing with {self.compressor} not supported."
            )

    def _gzip_compressor(self, x):
        return len(gzip.compress(str(x).encode()))

    def _ncd(self, Cx1, x1):
        distance_from_x1 = []
        for x2, Cx2 in zip(self.X_, self.Cx_):
            x2 = str(x2)
            x1x2 = " ".join([x1, x2])
            Cx1x2 = self._compress(x1x2)
            ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
            distance_from_x1.append(ncd)

    def predict(self, X):
        """A reference implementation of a prediction for a classifier.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            The input samples.

        Returns
        -------
        y : ndarray, shape (n_samples,)
            The label for each sample is the label of the closest sample
            seen during fit.
        """
        # Check is fit had been called
        check_is_fitted(self, ["X_", "y_", "Cx_", "_compress"])

        # Input validation
        # X = check_array(X)
        results = []
        for x1 in tqdm(X, desc="Predicting...", leave=False):
            x1 = str(x1)
            Cx1 = self._compress(x1)
            distance_from_x1 = []
            for x2, Cx2 in zip(self.X_, self.Cx_):
                x2 = str(x2)
                x1x2 = " ".join([x1, x2])
                Cx1x2 = self._compress(x1x2)
                ncd = (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)
                distance_from_x1.append(ncd)
            # distance_from_x1 = self._ncd(Cx1, x1)
            sorted_idx = np.argsort(np.array(distance_from_x1))
            top_k_class = list(self.y_[sorted_idx[: self.k]])
            predict_class = max(set(top_k_class), key=top_k_class.count)
            results.append(predict_class)
        return results

In [3]:
# Newsgroup Data

X, y = fetch_20newsgroups(
    subset="train",
    categories=["alt.atheism", "talk.religion.misc"],
    shuffle=True,
    random_state=42,
    return_X_y=True,
)
y = LabelEncoder().fit(y).transform(y)
X_train, X_test, y_train, y_test = train_test_split(X, y)
newsgroup_model = GzipClassifier(k=3)
newsgroup_model.fit(X_train, y_train)
preds = newsgroup_model.predict(X_test)
print()
print(f"Accuracy score is: {round(accuracy_score(y_test, preds), 3)}")

                                                                


Accuracy score is: 0.912




In [4]:
# KDD

df = pd.read_csv(
    "https://gist.githubusercontent.com/simplymathematics/8c6c04bd151950d5ea9e62825db97fdd/raw/34e546e4813f154d11d4f13869b9e3481fc3e829/kdd-nsl.csv",
    header=None,
)
print("Shape of df:", df.shape)
width = df.shape[1]
y = df[width - 2]
print("Shape of y:", y.shape)
del df[width - 2]
X = np.array(df)
print("Shape of X:", X.shape)
del df
new_y = []
for entry in y:
    if entry == "normal":
        new_y.append(0)
    else:
        new_y.append(1)
y = LabelEncoder().fit(new_y).transform(new_y)
print(f"Set of labels: {set(y)}")
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=1000, test_size=100
)
print("Shape of X_train:", X_train.shape)
print("Shape of X_test:", X_test.shape)
kdd_model = GzipClassifier(k=3)
kdd_model.fit(X_train, y_train)
predictions = kdd_model.predict(X_test)
print("KDD-NSL")
print(f"Accuracy score is: {round(accuracy_score(y_test, predictions), 3)}")

Shape of df: (125973, 43)
Shape of y: (125973,)
Shape of X: (125973, 42)
Set of labels: {0, 1}
Shape of X_train: (1000, 42)
Shape of X_test: (100, 42)


                                                                

KDD-NSL
Accuracy score is: 0.97




In [5]:
# Truthseeker

df = pd.read_csv(
    "https://gist.githubusercontent.com/simplymathematics/8c6c04bd151950d5ea9e62825db97fdd/raw/34e546e4813f154d11d4f13869b9e3481fc3e829/truthseeker.csv"
)
print("Shape of df:", df.shape)
y = np.array(df["BotScoreBinary"].astype("int"))
print("Shape of y:", y.shape)
del df["BotScoreBinary"]
del df["BotScore"]
X = np.array(df)
print("Shape of X:", X.shape)
print(f"Set of labels: {set(y)}")
X_train, X_test, y_train, y_test = train_test_split(
    X, y, train_size=1000, test_size=100
)
print("Shape of X_train:", X_train.shape)
print("Shape of X_test:", X_test.shape)
truthseeker_model = GzipClassifier(k=7)
truthseeker_model.fit(X_train, y_train)
predictions = truthseeker_model.predict(X_test)
print("Truthseeker")
print(f"Accuracy score is: {round(accuracy_score(y_test, predictions), 3)}")

Shape of df: (134198, 64)
Shape of y: (134198,)
Shape of X: (134198, 62)
Set of labels: {0, 1}
Shape of X_train: (1000, 62)
Shape of X_test: (100, 62)


                                                                

Truthseeker
Accuracy score is: 0.95




In [6]:
# Make Classification Data


X, y = make_classification(
    n_classes=3,
    n_features=10,
    n_informative=10,
    n_redundant=0,
    n_samples=1000,
    n_clusters_per_class=1,
    class_sep=10,
)
y = LabelEncoder().fit(y).transform(y)
X_train, X_test, y_train, y_test = train_test_split(X, y)
model = GzipClassifier(k=1)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
print()
print(f"Accuracy score is: {round(accuracy_score(y_test, predictions), 3)}")

                                                                


Accuracy score is: 0.552




In [30]:
# ART setup
from art.attacks.evasion import HopSkipJump, ZooAttack
from art.estimators.classification import SklearnClassifier

print("Model is BaseEstimator:", isinstance(model, BaseEstimator))
classifier = SklearnClassifier(model=model)
classifier._input_shape = X_train[0].shape
classifier._nb_classes = len(np.unique(y_train))

Model is BaseEstimator: True


In [31]:
print(classifier._input_shape)

(10,)


In [38]:
# Evasion Attack: HopSkipJump
n = 100
attack = HopSkipJump(
    classifier=classifier,
    targeted=False,
    norm=2,
    max_iter=1,
    max_eval=1,
    init_eval=1,
    init_size=1,
)
print("Attack initialized.")
advs = attack.generate(x=X_test[:n])
print("Adversarial examples generated.")
adv_preds = model.predict(advs)
print("HopSkipJump")
print(f"Accuracy score is: {round(accuracy_score(y_test[:n], adv_preds), 3)}")
print(
    f"Attack Success score is: {round(accuracy_score(predictions[:n], adv_preds), 3)}"
)

Attack initialized.


HopSkipJump: 100%|██████████| 100/100 [04:57<00:00,  2.98s/it]  
                                                                

Adversarial examples generated.


                                                                

HopSkipJump
Accuracy score is: 0.37
Attack Success score is: 0.35




In [39]:
# Evasion Attack: ZooAttack
attack = ZooAttack(
    classifier=classifier,
    confidence=0.9,
    targeted=False,
    learning_rate=1e-1,
    max_iter=1,
    binary_search_steps=1,
    initial_const=1e-3,
    abort_early=True,
    use_resize=False,
    use_importance=False,
    nb_parallel=1,
    batch_size=1,
    variable_h=0.01,
)
advs = attack.generate(x=X_test[:n])
adv_preds = model.predict(advs)
print("ZooAttack")
print(f"Accuracy score is: {round(accuracy_score(y_test[:n], adv_preds), 3)}")
print(
    f"Attack Success score is: {round(accuracy_score(predictions[:n], adv_preds), 3)}"
)

ZOO: 100%|██████████| 100/100 [01:00<00:00,  1.66it/s]          
                                                                

ZooAttack
Accuracy score is: 0.54
Attack Success score is: 0.96




In [34]:
# from art.attacks.inference.attribute_inference import AttributeInferenceBlackBox
# n = 10
# attack_feature = 1
# attack = AttributeInferenceBlackBox(classifier)
# attack.fit(X_train[:n])
# predictions = np.array(predictions)

In [27]:
# vars(attack).keys()

dict_keys(['_targeted', '_estimator', '_summary_writer_arg', '_summary_writer', 'norm', 'max_iter', 'max_eval', 'init_eval', 'init_size', 'curr_iter', 'batch_size', 'verbose', 'theta'])

In [35]:
# attack_result = attack.infer(X_test[:n], y_test[:n], pred=predictions[:n])

In [36]:
# from art.attacks.inference.membership_inference import MembershipInferenceBlackBox