In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np
from collections import Counter

from sklearn.svm import LinearSVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score,roc_auc_score,accuracy_score

import tensorflow as tf
from tensorflow.keras import Input,Model
from tensorflow.keras.layers import Dense, Activation, Layer, Concatenate, Lambda, Normalization
from tensorflow.keras.metrics import Metric, Precision, Recall, AUC
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import plot_model,get_custom_objects
from tensorflow.keras import backend as K


# Car evaluation

#### Łukasz Andryszewski 151930

The dataset used is the Car Evalutaion dataset which can be found [here](https://en.cs.uni-paderborn.de/is/research/research-projects/software/monotone-learning-datasets).

It consists of six criterions and four classes. The criterions are:
- price
- price of the maintenance
- number of doors
- capacity 
- size of luggage boot
- estimated safety

The criteria are normalized between 0 and 1.

Based on them the alternatives are assigned to four sorted classes, which are:
1. unacceptable 
2. acceptable
3. good
4. very good

However here they will be binerized between the second and third class into to:

1. Bad
2. Good

In [None]:
data = pd.read_csv("./data/monodata/car evaluation.csv",header=None)
features = len(data.columns)-1
crits = ["price","maintaince price","doors","capacity","size of luggage", "safety"]
data.columns = crits+["class"]#[f"crit_{i}" for i in range(features)]+["class"]
data

In [None]:
data.loc[data["class"]<=2,"class"] = 0
data.loc[data["class"]>=3,"class"] = 1
data_classless = data.drop(columns="class")
data

In [None]:
data.describe()

In [None]:
def visualize_data(X,y):
    cmap = ["red","blue"]
    #colors = tuple(map(lambda c:cmap[int(c)-1],data["class"]))
    #for i,col in enumerate(data.drop(columns="class")):
    features = X.shape[1]
    fig,axs = plt.subplots(1,2,figsize=(10,6))
    fig.tight_layout()
    for i,x in enumerate(X):
        
        _y = np.copy(x)
        _x = np.arange(features,dtype=np.float64)
        c = int(y[i])
        color = cmap[c]
        #_y = np.copy(X[:,])
        _y += np.random.normal(0,0.025,len(_y))
        _y = np.clip(_y,0,1)
        #_x = np.array([i for _ in range(len(_y))]).astype(np.float64)
        _x += np.random.normal(0,0.1,len(_x))
        axs[c].plot(_x,_y,alpha=0.05,color=color)
        axs[c].set_ylabel(f"Criterion value")
        axs[c].set_xlabel(f"Criterions")
        axs[c].set_yticks(np.arange(1,step=0.0833333))
        axs[c].set_xticks(np.arange(features,step=1))

In [None]:
visualize_data(data_classless.to_numpy(),data["class"].to_numpy())

From the names and distribution of their values in different classes, it can be inferred that the price and maintaince price criterions are of cost types and the rest of the criterions are gain type.

In [None]:
for c,count in Counter(data["class"]).items():
    print(f"Class {c} occurences: {count}")

The data is highly imbalanced, so there is a need for undersampling.

In [None]:
_combs = 1
for col in data_classless:
    _combs *= len(np.unique(data[col]))

print("Possible combinations of data:",_combs)
print("Number of alternatives:",len(data))

The number of possible combinations of all values of the criterions and the number of alternatives is the same. Judging by that suspicious fact, it is safe to assume that the dataset is composed of all possible alternatives or that there is quite a number of repeated alternatives.

For the sake of performance and to avoid learning the most of the space of alternatives the first class needs to be heavily undersampled.

In [None]:
def undersample(X,y,samples=None,class_0=0):
    counts = Counter(y)
    samples = min(counts.values()) if samples == None else samples
    n = len(X)

    new_X = []
    new_y = []
    for i in range(class_0,len(counts)+class_0):
        ids = np.arange(len(y))
        current_class = ids[y==i]

        sampled = np.random.choice(current_class,samples)
        new_X.append(X[sampled])
        new_y.append(y[sampled])

    new_data = np.column_stack((np.concatenate(new_X,axis=0),np.concatenate(new_y,axis=0)))

    np.random.shuffle(new_data)

    return new_data[:,:-1],new_data[:,-1]

In [None]:
new_X,new_y = undersample(data_classless.to_numpy(),data["class"].to_numpy())

In [None]:
visualize_data(new_X,new_y)

Split into train and test data

In [None]:
X_train, X_rest, y_train, y_rest = train_test_split(new_X,new_y,test_size=0.40)
X_test, X_val, y_test, y_val = train_test_split(X_rest,y_rest,test_size=0.50)

## RankSVM method

### Calculate differencese between rows of different classes.

In [None]:
def calculate_2d_differences(array:np.array):
    difs = array[:,np.newaxis,:] - array 
    return difs.reshape(-1,array.shape[1])

def calculate_1d_differences(vector:np.array):
    difs = vector[:,np.newaxis] - vector
    return difs.reshape(-1)

In [None]:
X_train_difs = calculate_2d_differences(X_train)
X_test_difs = calculate_2d_differences(X_test)

y_train_difs = calculate_1d_differences(y_train)
y_test_difs = calculate_1d_differences(y_test)

In [None]:
y_train_filtered = y_train_difs[y_train_difs != 0]
y_test_filtered = y_test_difs[y_test_difs != 0]

X_train_filtered = X_train_difs[y_train_difs != 0]
X_test_filtered = X_test_difs[y_test_difs != 0]

In [None]:
rank_svm = LinearSVC()#make_pipeline(StandardScaler(),LinearSVC())

rank_svm.fit(X_train_filtered,y_train_filtered)

In [None]:
def show_stats(model,X,y):
    y_pred = model.predict(X)
    print(f"Accuracy: {accuracy_score(y,y_pred):.4%}")
    print(f"F1 score: {f1_score(y,y_pred):.4%}")
    auc = np.dot(X,model.coef_.T)
    print(f"AUC: {roc_auc_score(y,auc):.4%}")

In [None]:
print("Performance on train set:\n")
show_stats(rank_svm,X_train_filtered,y_train_filtered)
print("\nPerformance on test set:\n")
show_stats(rank_svm,X_test_filtered,y_test_filtered)

## Tensorflow solutions

In [None]:
class F1Score(Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = Precision()
        self.recall = Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        precision = self.precision.result()
        recall = self.recall.result()
        return 2 * ((precision * recall) / (precision + recall + K.epsilon())) # epsilon to prevent zeroes

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()


In [None]:
def train(model,X_train,y_train,val_data=None,epochs=50,loss='categorical_crossentropy',patience=3):
    early_stopping = EarlyStopping(monitor='loss' if val_data == None else "val_loss", patience=patience, restore_best_weights=True)

    model.compile(
        optimizer="adam",
        loss=loss,
        metrics=["accuracy",AUC(name="auc"),F1Score()])

    history = model.fit(X_train,y_train,
    epochs=epochs,
    callbacks=[early_stopping],
    validation_data=val_data)

    return history

In [None]:
def show_history(history):
    metrics = ["Accuracy","F1_score","AUC"]

    fig, axs = plt.subplots(3, 1,figsize=(10, 8))
    fig.tight_layout(rect=[0, 0.03, 1, 0.95])
    fig.suptitle("Model metrics", fontsize=24)

    for ax,metric in zip(axs,metrics):

        xy = history.history[metric.lower()]

        ax.plot(xy, label=metric)
        ax.plot(xy, label=metric)
        ax.set_xlabel('Epoch')
        ax.set_ylabel(metric)

### ANN-UTADIS

In [None]:
delta = 0.1
def leaky_hard_sigmoid(x):
    return K.switch(x < 0, 
                    x*delta, 
                    K.switch(
                        x > 1,
                        delta*(x-1) + 1,
                        x
                    ))

#get_custom_objects().update({'leaky_hard_sigmoid': Activation(leaky_hard_sigmoid)})

In [None]:
class MonotoneBlock(Layer):
    def __init__(self, units=1, branches=3, **kwargs):
        assert branches >= 2
        super(MonotoneBlock, self).__init__(**kwargs)
        self.units = units
        self.branches = branches

    def build(self, input_shape):
        self.sigmoids = [Dense(self.units,activation=Activation(leaky_hard_sigmoid)) for _ in range(self.branches)]
        for sig in self.sigmoids:
            sig.build(input_shape)
        #self.sigmoids = Concatenate(axis=2)([sigs])
        self.linear = Dense(self.units,activation=None)
        self.linear.build((None,self.branches*self.units))
        super(MonotoneBlock, self).build(input_shape)

    def call(self, inputs):
        x = K.concatenate([sig(inputs) for sig in self.sigmoids],axis=-1)
        x = self.linear(x)
        return x

In [None]:
# class MinMax(Layer):
#     def __init__(self,ideal_alt,anti_ideal_alt,model,**kwargs):
#         super(MinMax,self).__init__(**kwargs)
#         self.ideal_alt = ideal_alt
#         self.anti_ideal_alt = anti_ideal_alt

#     def build(self,input_shape):
#         super(MinMax,self).build(input_shape)

#     def call(self,inputs):


In [None]:
class MinMaxNormalization(Layer):
    def __init__(self, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)

    def call(self, inputs):
        ideal = inputs[0]
        anti = inputs[1]
        inputs = inputs[1:]
        normalized = (inputs - anti) / (ideal - anti + tf.keras.backend.epsilon())
        return normalized

In [None]:
class Thresholder(Layer):
    def __init__(self, thresholds, **kwargs):
        super(Thresholder, self).__init__(**kwargs)
        assert thresholds != None
        # if thresholds[-1] != 1:
        #     thresholds += [1]
        self.thresholds = thresholds

    def call(self, inputs):
        indices = tf.argmax(tf.cast(tf.less(inputs, tf.expand_dims(self.thresholds, axis=0)), tf.float32), axis=1)
        one_hot = tf.one_hot(indices, depth=len(self.thresholds)+1)
        return one_hot

In [None]:
class Thresholder(Layer):
    def __init__(self, thresholds, **kwargs):
        super(Thresholder, self).__init__(**kwargs)
        assert thresholds is not None
        self.thresholds = thresholds

    def call(self, inputs):
        indices = tf.argmax(tf.cast(tf.less(inputs, tf.expand_dims(self.thresholds, axis=0)), tf.float32), axis=1)
        one_hot = tf.one_hot(indices, depth=len(self.thresholds)+1)
        return one_hot

    def compute_output_shape(self, input_shape):
        return (input_shape[0], len(self.thresholds) + 1)

class MinMaxNormalization(Layer):
    def __init__(self, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)

    def call(self, inputs):
        ideal = inputs[0:1,:]  # Extract ideal from inputs
        anti = inputs[1:2,:]    # Extract anti-ideal from inputs
        actual_inputs = inputs[2:, :]  # Extract the rest of the inputs
        normalized = (actual_inputs - anti) / (ideal - anti + tf.keras.backend.epsilon())
        return normalized

    def compute_output_shape(self, input_shape):
        return input_shape


In [None]:
def create_ann_utadis_model(thresholds,ideal_alt,anti_ideal_alt,L=3,n_criteria=features,n_labels=classes):
    inputs = Input(shape=(n_criteria,))
    #ideal_layer = Lambda(lambda x: tf.reshape(tf.constant(ideal_alt),(1,-1)))(inputs)
    #anti_ideal_layer = Lambda(lambda x: tf.reshape(tf.constant(anti_ideal_alt),(1,-1)))(inputs)

    #concat = Concatenate(axis=0)([inputs,ideal_layer,anti_ideal_layer])

    def splitter(x) : 
        split = tf.split(x, n_criteria, 1)
        return split    
    
    #split_layer = Lambda(splitter)(inputs)

    splits = [Lambda(lambda x: x[:, i:i+1],name=f"criteria_{i}")(inputs) for i in range(n_criteria)]

    monotones = [MonotoneBlock(branches=L)(split) for split in splits]
    
    concat = Concatenate(axis=1)(monotones)
    linear = Dense(1,activation=None)(concat)
    norm = Dense(4,activation="sigmoid")(linear)

    #norm = MinMaxNormalization()(linear)

    #thresholder = Thresholder(thresholds)(norm)
    #norm = Normalization()(linear)
    
    return Model(name="ann_utadis",inputs=inputs,outputs = norm)    


In [None]:
thresholds = [0.25,0.5,0.75]
ideal_alt = [0,0,1,1,1,1]
antiideal_alt = [1,1,0,0,0,0]

In [None]:
uta_model = create_ann_utadis_model(thresholds,ideal_alt,antiideal_alt,5)
uta_model.summary()

In [None]:
plot_model(uta_model)

In [None]:
history = train(uta_model,X_train,y_train,val_data=(X_val,y_val),patience=10)

In [None]:
show_history(history)

### Conventional Neural Network

In [None]:
def create_nn_model(features=features):

    inputs = Input((features,))
    layer = Dense(256,activation="relu")(inputs)
    layer = Dense(128,activation="relu")(inputs)
    layer = Dense(64,activation="relu")(layer)
    layer = Dense(32,activation="relu")(layer)
    outputs = Dense(4,activation="sigmoid")(layer)

    return Model(inputs=inputs,outputs=outputs)

In [None]:
nn_model = create_nn_model(features)

nn_model.summary()

In [None]:
plot_model(nn_model)

In [None]:
history = train(nn_model,X_train,y_train,val_data=(X_val,y_val),patience=50)

In [None]:
show_history(history)

## References


<!--bibtex 

@Article{Tehrani2011/08,
  title={Choquistic Regression: Generalizing Logistic Regression using the Choquet Integral},
  author={Ali Fallah Tehrani and Weiwei Cheng and Eyke Hüllermeier},
  year={2011/08},
  booktitle={Proceedings of the 7th conference of the European Society for Fuzzy Logic and Technology (EUSFLAT-11)},
  pages={868-875},
  issn={1951-6851},
  isbn={978-90-78677-00-0},
  url={https://doi.org/10.2991/eusflat.2011.86},
  doi={10.2991/eusflat.2011.86},
  publisher={Atlantis Press}
}
-->