In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import mutual_info_regression, SelectKBest
from sklearn.model_selection import cross_val_score, cross_validate
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from scipy.optimize import differential_evolution
from sklearn.neighbors import KNeighborsClassifier

In [2]:
from google.colab import drive
drive.mount('/content/drive')
drebin = pd.read_csv(\"/content/drive/MyDrive/Faculdade/malware-hunter/JOWMDroid/Drebin215.csv\", sep=\";\")
drebin['class'].replace('B', 1, inplace=True)
drebin['class'].replace('S', 0, inplace=True)

In [10]:
def select_features_with_mi(X, y, theshold=0.05):
    mi_model = mutual_info_regression(X, y)
    scores = pd.Series(mi_model, index=np.array(X.columns))
    selected_features = [feature for feature, score in scores.items() if score >= theshold]
    return X[selected_features]

def get_weights_from_classifiers(X, y, 
                                 classifiers, 
                                 weights_attributes=['coef_', 'feature_importances_'],
                                 train_size=0.8, random_state=1):
    X_train, _, y_train, _ = train_test_split(X, y, train_size=train_size, random_state=random_state)
    weights_list = []
    for classifier in classifiers.values():
        classifier.fit(X_train, y_train)
        for weights_attribute in weights_attributes:
            is_found = False
            if(weights_attribute in dir(classifier)):
                weights = classifier.__getattribute__(weights_attribute)
                # A linha abaixo garante que weights não seja um vetor de vetor (e.g.: [2,3,4,4], e não [[2,3,4,4]]).
                weights = weights if isinstance(weights[0], np.float64) else weights[0]
                weights_list.append(weights)
                is_found = True
                break
        if(not is_found):
            print(f"Vetor de pesos para o classificador {classifier.__class__.__name__} não foi encontrado. Verifique o parametro weights_attributes")
    return weights_list

def get_normalized_weights_average(weights_list):
    normalized_weights = [[]] * len(weights_list)
    for i, weights in enumerate(weights_list):
        max_value = weights.max()
        min_value = weights.min()
        if max_value != min_value:
            normalized_weights[i] = (weights - min_value) / (max_value - min_value)
        else:
            normalized_weights[i] = np.array([0.5] * len(weights))
    return np.average(normalized_weights, axis=0)

def power(v):
    a, y, x = v
    result = y * np.power(x, a)
    return result
power.bounds = [(0.0, 10.0), (10.0 ** -6, 10.0), (0.0, 1.0)]

def exponential(v):
    a, b, y, x = v
    result = y * (np.power(a, (b*x)) - 1) / ((a ** b) - 1)
    return result
exponential.bounds = [(0.0, 10.0), (10.0 ** -6, 10.0), (10.0 ** -6, 10.0), (0.0, 1.0)]
def logarithmic(v):
    a, y, x = v
    result = y * ((np.log(1 + a*x)) / (np.log(1 + a)))
    return result
logarithmic.bounds = [(0.0, 10.0), (10.0 ** 6, 10.0), (0.0, 1.0)]

def hyperbolic(v):
    a, b, y, x = v
    result = (y * ((a*x) / (1 + (b*x)))) / (a / (1 + b))
    return result
hyperbolic.bounds = [(0.0, 10.0), (10.0 ** -6, 10.0), (10.0 ** -6, 10.0), (0.0, 1.0)]

def S_curve(v):
    a, y, x = v
    result = (y * ((1 / (1 + (a * np.exp(-x)))) - (1 / (1 + a)))) / ((1 / (1 + (a*np.exp(-1)))) - (1 / (1+a)))
    return result
S_curve.bounds = [(0.0, 10.0), (10.0 ** -6, 10.0), (0.0, 1.0)]

def objective_function(parameters, *args):
    mapping_function, initial_weights, classifier, X, y, cv, metric = args
    mapped_weights = mapping_function(list(parameters[:len(mapping_function.bounds) - 1]) + list([initial_weights]))
    optimized_classifier = classifier['model'].set_params(**{classifier['parameter_name']: parameters[-1]})
    result = cross_val_score(optimized_classifier, np.multiply(X, mapped_weights), y, cv=cv, scoring=metric)
    return result.mean()

def run_jowmdroid(X, y, weight_classifiers, evaluation_classifiers, mapping_functions,
                  test_size=0.8, random_state=1,
                  popsize=40, maxiter=30, recombination=0.3, disp=False, mutation=0.5, seed=1, 
                  cv=5, scoring=["accuracy", "precision", "recall", "f1"],
                  include_hyperparameter_in_de=True):
    initial_weights = get_normalized_weights_average(get_weights_from_classifiers(X, y, weight_classifiers))
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    solutions = {}
    for classifier in evaluation_classifiers:
        if('parameter_name' not in classifier or 'bound' not in classifier):
            continue
        solutions[classifier['name']] = {}
        for mapping_function in mapping_functions:
            bounds = mapping_function.bounds + [classifier['bound']] if include_hyperparameter_in_de == True else mapping_function.bounds
            solution = differential_evolution(objective_function, bounds=bounds,
                                              args=(mapping_function, initial_weights, classifier, X_train, y_train, cv, scoring[0]),
                                              popsize=popsize, maxiter=maxiter, recombination=recombination, 
                                              disp=disp, mutation=mutation, seed=seed).x
            solutions[classifier['name']][mapping_function.__name__] = solution
        print(f"Melhores parâmetros das funções de mapeamento para o algoritmo {classifier['name']}: {solutions}")
    results = []
    for classifier in evaluation_classifiers:
        if('parameter_name' not in classifier or 'bound' not in classifier):
            continue
        for mapping_function_name, solution in solutions[classifier['name']].items():

            scores = cross_validate(classifier['model'], X_test, y_test, cv=cv, scoring=scoring, fit_params={classifier['parameter_name']: solution[-1]})
            result = {'classifier': classifier['name'],
                        'mapping_function': mapping_function_name}
            for metric in scoring:
                result[metric] = scores["test_" + metric].mean()
            results.append(result)
    return pd.DataFrame(results)


In [4]:
X = drebin.drop(columns='class')
y = drebin['class']
X = select_features_with_mi(X, y)

In [None]:
weight_classifiers = {"SVM" : SVC(kernel='linear'), "RF": RandomForestClassifier(), "LR": LogisticRegression()}
mapping_functions = [power, exponential, logarithmic, hyperbolic, S_curve]
evaluation_classifiers = [
                          { "name": "SVM", "model": SVC(kernel='linear'), "parameter_name": "C", "bound": (1.0, 5.0)},
                          { "name" : "RF", "model": RandomForestClassifier()},
                          { "name" : "LR", "model": LogisticRegression()}
                          ]

results = run_jowmdroid(X, y, weight_classifiers, evaluation_classifiers, mapping_functions, include_hyperparameter_in_de=False)

In [15]:
results