In [1]:
%load_ext  autoreload
%autoreload 2

In [2]:
import numpy as np
import pandas as pd

from trickster.trickster.search import a_star_search

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegressionCV
from sklearn.svm import SVC
from scipy.spatial import distance

seed = 2018

In [3]:
# Load the file
df = pd.read_csv('data/german_credit_data.csv')
df = df.drop(df.columns[0], axis=1) # remove the index column

# Quantize credit amount, duration and age into 5 bins
amount_series = df.loc[:, 'Credit amount']
df.loc[:, 'Credit amount'] = pd.qcut(amount_series, 5)

duration_series = df.loc[:, 'Duration']
df.loc[:, 'Duration'] = pd.qcut(duration_series, 5)

duration_series = df.loc[:, 'Age']
df.loc[:, 'Age'] = pd.qcut(duration_series, 5)

# Set Job type to object for one-hot encoding
df.loc[:, 'Job'] = df.loc[:, 'Job'].astype(object)

# Perform one-hot encoding
df = pd.get_dummies(df)
# Drop binary features
df = df.drop(columns=['Sex_male', 'Risk_bad'])

print('Examples are represented as {}-dimensional vectors.'.format(len(df.columns)))

# Separate features from targets
df_X = df.iloc[:, :-1]
df_y = df.iloc[:, -1]

Examples are represented as 39-dimensional vectors.


In [4]:
# Convert to numpy
X = df_X.values.astype('int8')
y = df_y.values.astype('int8')
print('Shape of X: {}. Shape of y: {}.'.format(X.shape, y.shape))

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=seed)
X_train.shape, y_train.shape, X_test.shape, y_test.shape

Shape of X: (1000, 38). Shape of y: (1000,).


((900, 38), (900,), (100, 38), (100,))

In [5]:
# Fit logistic regression and perform CV
clf = LogisticRegressionCV(
    Cs=21, 
    cv=5, 
    n_jobs=-1, 
    random_state=seed
)
clf.fit(X_train, y_train)

# Get best score and C value
mean_scores = np.mean(clf.scores_[1], axis=0)
best_idx = np.argmax(mean_scores)
best_score = mean_scores[best_idx]
best_C = clf.Cs_[best_idx]

print('Best score is: {:.2f}%. Best C is: {:.4f}.'.format(best_score*100, best_C))
print('Test score is: {:.2f}%.'.format(clf.score(X_test, y_test)*100))

# Best score is: 73.44%. Best C is: 0.1585.
# Test score is: 71.000%.

Best score is: 73.44%. Best C is: 0.1585.
Test score is: 71.00%.


In [None]:
# Fit SVM with RBF kernel using CV
tuned_parameters = [
    {
        'kernel': ['rbf'], 
        'gamma': [0.01, 0.03, 0.05, 0.07], 
        'C': [1, 3, 5, 7, 10]
    }
]
clf = GridSearchCV(SVC(), tuned_parameters, cv=5, n_jobs=-1)
clf.fit(X_train, y_train)
print('Best params are: {} with score: {:.2f}%.'.format(clf.best_params_, clf.best_score_ * 100))
print('Test score is: {:.2f}%.'.format(clf.score(X_test, y_test)*100))

# Best params are: {'C': 5, 'gamma': 0.03, 'kernel': 'rbf'} with score: 74.44%.
# Test score is: 71.00%.

### Perform adversarial examples search

In [8]:
class Node:
    
    amount_start_idx = df_X.columns.get_loc("Credit amount_(249.999, 1262.0]")
    duration_start_idx = df_X.columns.get_loc("Duration_(3.999, 12.0]")
    purpose_start_idx = df_X.columns.get_loc("Purpose_business")
    
    def __init__(self, x):
        self.root = x
        self.static = self.root[:Node.amount_start_idx]
        self.amount = self.root[Node.amount_start_idx:Node.duration_start_idx]
        self.duration = self.root[Node.duration_start_idx:Node.purpose_start_idx]
        self.purpose = self.root[Node.purpose_start_idx:]
    
    def _roll(self, x, direction='pos'):
        idx = np.argmax(x)
        if direction == 'pos' and idx != len(x) - 1:
            return np.roll(x, 1).tolist()
        elif direction == 'neg' and idx != 0:
            return np.roll(x, -1).tolist()
        return []

    def _expand_roll(self, field):
        child_fields = []
        child_fields.append(self._roll(field, direction='pos'))
        # Comment out next line to prevent duration and amount from decreasing
        child_fields.append(self._roll(field, direction='neg'))
        child_fields = [x for x in child_fields if len(x) > 0]
        return np.array(child_fields, dtype='uint8')
    
    def _expand_all(self, field):
        child_fields = []
        for i in range(1, len(field)):
            child_fields.append(np.roll(field, i))
        return child_fields

    def expand(self):
        children = []
        for c in self._expand_roll(self.amount):
            child = np.concatenate((self.static, c, self.duration, self.purpose))
            children.append(child)
        for c in self._expand_roll(self.duration):
            child = np.concatenate((self.static, self.amount, c, self.purpose))
            children.append(child)
        for c in self._expand_all(self.purpose):
            child = np.concatenate((self.static, self.amount, self.duration, c))
            children.append(child)
        return children
    
    def __repr__(self):
        return '<Node> {}.'.format(self.root)

In [12]:
from trickster.trickster.search import a_star_search

def _expand_fn(x):    
    node = Node(x)
    children = node.expand()
    costs = [distance.euclidean(x, c) for c in children]
    return list(zip(children, costs))

def _goal_fn(x, clf, confidence):
    return clf.predict_proba([x])[:, 1][0] >= confidence

def _heuristic_fn(x, clf, norm):
    return np.abs(clf.decision_function([x]))[0] / np.linalg.norm(clf.coef_, ord=norm)

def _hash_fn(x):
    return hash(x.tostring())

def find_adversarial(x, clf, norm=2, confidence=0.5):
    if not clf.predict([x])[0] == 0:
        raise Exception('Initial example is already classified as good risk.')
        
    return a_star_search(
        start_node=x, 
        expand_fn=_expand_fn, 
        goal_fn=lambda x: _goal_fn(x, clf, confidence), 
        heuristic_fn=lambda x: _heuristic_fn(x, clf, norm), 
        hash_fn=_hash_fn
    )

In [15]:
adversarials = []
labels = list(df_X)

amount_start_idx = df_X.columns.get_loc("Credit amount_(249.999, 1262.0]")
duration_start_idx = df_X.columns.get_loc("Duration_(3.999, 12.0]")
purpose_start_idx = df_X.columns.get_loc("Purpose_business")

i = 0
for x in X_test:
    
    # Check if example is already good risk
    if clf.predict([x])[0] == 1:
        continue
    
    i += 1
    print('\n>>>> Looking into (negative) test example #{}.'.format(i))
    x_adv, cost = find_adversarial(x, clf, norm=2, confidence=0.5)
    if x_adv is None:
        continue
    
    confidence = clf.predict_proba([x_adv])[:, 1][0] * 100
    print('>> Adversarial example found with a cost: {:.2f} and confidence: {:.2f}.'.format(cost**2, confidence))
    
    assert clf.predict([x_adv]) == 1
    
        
    changes_idx = np.where(np.logical_xor(x, x_adv) == True)[0]
    for idx in changes_idx:
        print('>> Label: {}. Change: {} -> {}.'.format(labels[idx], x[idx], x_adv[idx]))


>>>> Looking into (negative) test example #1.
>> Adversarial example found with a cost: 2.00 and confidence: 55.03.
>> Label: Duration_(12.0, 15.0]. Change: 0 -> 1.
>> Label: Duration_(15.0, 24.0]. Change: 1 -> 0.

>>>> Looking into (negative) test example #2.
>> Adversarial example found with a cost: 2.00 and confidence: 60.05.
>> Label: Credit amount_(2852.4, 4720.0]. Change: 0 -> 1.
>> Label: Credit amount_(4720.0, 18424.0]. Change: 1 -> 0.

>>>> Looking into (negative) test example #3.
>> Adversarial example found with a cost: 8.00 and confidence: 51.05.
>> Label: Credit amount_(1906.8, 2852.4]. Change: 1 -> 0.
>> Label: Credit amount_(2852.4, 4720.0]. Change: 0 -> 1.
>> Label: Duration_(12.0, 15.0]. Change: 0 -> 1.
>> Label: Duration_(15.0, 24.0]. Change: 1 -> 0.

>>>> Looking into (negative) test example #4.
>> Adversarial example found with a cost: 2.00 and confidence: 51.46.
>> Label: Credit amount_(2852.4, 4720.0]. Change: 0 -> 1.
>> Label: Credit amount_(4720.0, 18424.0]. Ch