# Treinamentos de Detecção de Anomalias para geração de matriz de confusão com exclusão de extremos

In [1]:
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_auc_score
from sklearn.metrics import f1_score,accuracy_score,precision_score,recall_score
import matplotlib.pyplot as plt
import pandas as pd
import os


import numpy as np

tabelafinal = pd.DataFrame(columns=['algoritmo','contaminacao','acuracia','precisao','recall','fn','fp'])
print(tabelafinal)

def get_confusion_matrix(reais, preditos, labels):
    """
    Uma função que retorna a matriz de confusão para uma classificação binária
    
    Args:
        reais (list): lista de valores reais
        preditos (list): lista de valores preditos pelo modelos
        labels (list): lista de labels a serem avaliados.
            É importante que ela esteja presente, pois usaremos ela para entender
            quem é a classe positiva e quem é a classe negativa
    
    Returns:
        Um numpy.array, no formato:
            numpy.array([
                [ tp, fp ],
                [ fn, tn ]
            ])
    """
    # não implementado
    if len(labels) > 2:
        return None

    if len(reais) != len(preditos):
        return None
    
    # considerando a primeira classe como a positiva, e a segunda a negativa
    true_class = labels[0]
    negative_class = labels[1]

    # valores preditos corretamente
    tp = 0
    tn = 0
    
    # valores preditos incorretamente
    fp = 0
    fn = 0
    
    for (indice, v_real) in enumerate(reais):
        v_predito = preditos[indice]

        # se trata de um valor real da classe positiva
        if v_real == true_class:
            tp += 1 if v_predito == v_real else 0
            fp += 1 if v_predito != v_real else 0
        else:
            tn += 1 if v_predito == v_real else 0
            fn += 1 if v_predito != v_real else 0
    
    return np.array([
        # valores da classe positiva
        [ tp, fp ],
        # valores da classe negativa
        [ fn, tn ]
       
    ])

def acuracia(mc):
  tp = mc[0,0]
  tn = mc[1,1]
  fp = mc[0,1]
  fn = mc[1,0]
  acuracia = (tp+tn)/(tp+fp+tn+fn)
  return acuracia

def recall(mc): 
  tp = mc[0,0]
  fn = mc[1,0]
  return tp/(tp+fn)

def precisao(mc):
  tp = mc[0,0]
  fp = mc[0,1]
  return tp/(tp+fp) 

def fscore(precisao,recall):
  return 2*((precisao*recall)/(precisao+recall))

def avalia(clf_name,Y,y_train_scores,y_train_pred):
    # Avalia e imprime os resultados
    print("Avaliação do modelo "+clf_name+":")
    evaluate_print(clf_name, Y, y_train_scores)
    mc=get_confusion_matrix(reais=Y, preditos=clf.labels_ , labels=[0,1])
    print('Acurácia='+str(acuracia(mc)*100))   
    print('Precisão='+str(precisao(mc)*100))
    print('Recall='+str(recall(mc)*100))
    print('Falso Positivo='+str(mc[0,1]))
    print('Falso Negativo='+str(mc[1,0]))
    print('F1-score='+str(f1_score(Y,y_train_pred)*100))
    print('ROC='+str(roc_auc_score(Y,y_train_scores)))

def gera_matriz_de_confusao(Y,y_train_pred,clf_name):
    cm = confusion_matrix(Y,y_train_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Normal','Anomalia'])
    disp.plot(cmap=plt.cm.Blues)
    disp.ax_.set(title='Matriz de Confusão do '+clf_name,xlabel='Valores preditos', ylabel='Valores reais')
    plt.show()
      
def calcula_anomalias(clf_name,parametros):
    menorfn=len(X)
    melhoracuracia=0
    mcont=0
    contamination=0.01
    while contamination<0.11:
        funcao=clf_name+'('+parametros+'contamination='+str(contamination)+')'
        clf = eval(funcao)
        clf.fit(X)
        mc=get_confusion_matrix(reais=Y, preditos=clf.labels_ , labels=[0,1])
        ac=acuracia(mc)
        fn=mc[1,0]   # inverti
        if fn<menorfn:
           mcont=contamination
           menorfn=fn
           melhoracuracia=ac
        else:
            if fn==menorfn:
                if ac>melhoracuracia:
                   mcont=contamination
                   menorfn=fn
                   melhoracuracia=ac                
        contamination=contamination+0.01
    contamination=mcont
    print('Contaminação utilizada='+str(contamination))
    clf = eval(funcao)
    clf.fit(X)
    return clf

def grava_avaliacao(clf_name,Y,y_train_scores,y_train_pred,contamination):
    # Avalia e imprime os resultados
    print("Avaliação do modelo "+clf_name+":")
    evaluate_print(clf_name, Y, y_train_scores)
    mc=get_confusion_matrix(reais=Y, preditos=clf.labels_ , labels=[0,1])
    ac=round(acuracia(mc)*100,2)
    print('Acurácia='+str(ac))   
    pc=round(precisao(mc)*100,2)
    print('Precisão='+str(pc))
    rc=round(recall(mc)*100,2)
    print('Recall='+str(rc))
    fn=mc[1,0] 
    fp=mc[0,1]
    print('Falso Negativo='+str(fn))
    print('Falso Positivo='+str(fp))
    tabelafinal.loc[len(tabelafinal.index)] = [clf_name,contamination,ac,pc,rc,fn,fp] 

Empty DataFrame
Columns: [algoritmo, contaminacao, acuracia, precisao, recall, fn, fp]
Index: []


# Carrega dados de arquivo CSV  
Separando catmat_id= 445485 -> Descrição:'ÁGUA MINERAL NATURAL, TIPO SEM GÁS MATERIAL EMBALAGEM PLÁSTICO TIPO EMBALAGEM 
RETORNÁVEL', Grupo: '89', SUBSISTÊNCIA Classe: '8960', BEBIDAS NÃO ALCOÓLICAS PDM: '19555', ÁGUA MINERAL NATURAL

In [2]:
def load_itens(path):
    csv_path = os.path.join(path,"aguatabela2.csv")
    return pd.read_csv(csv_path)

df=load_itens(".")
#df = df.drop(['data'],axis=1)
print(df)
dfajustado=df

      data  quantidade  valor_unitario  anomalia
0      1.0         420           12.00         0
1      2.0       28000            5.88         1
2     15.0         360            7.70         0
3     15.0         120            7.70         0
4     16.0        2985            4.20         0
..     ...         ...             ...       ...
389  419.0        1440           11.37         0
390  419.0        4608            0.62         1
391   37.0        1440           10.58         0
392   37.0        4608            0.54         1
393   95.0          10           20.00         1

[394 rows x 4 columns]


# Pré-processamento retira os 2,5% menores valores e os 2,5% maiores valores

In [3]:
dfajuste= pd.read_sql('SELECT valor_unitario FROM siasg.agua', dbConnection);
menor=dfajuste.quantile(0.025)
maior=dfajuste.quantile(0.975)
print(menor[0])
print(maior[0])
dfajuste2= pd.read_sql('SELECT quantidade FROM siasg.agua', dbConnection);
menorqtd=dfajuste2.quantile(0.05)
maiorqtd=dfajuste2.quantile(0.95)
print(menorqtd[0])
print(maiorqtd[0])
dfajustado=df.loc[df["valor_unitario"] > menor[0]]
dfajustado=dfajustado.loc[dfajustado["valor_unitario"] < maior[0]]
dfajustado=dfajustado.loc[dfajustado["quantidade"] > menorqtd[0]]
dfajustado=dfajustado.loc[dfajustado["quantidade"] < maiorqtd[0]]
print(dfajustado)
print(dfajustado.describe())

NameError: name 'dbConnection' is not defined

# Separação do Label dos dados

In [None]:

X = dfajustado.iloc[:, :-1]
Y = dfajustado.iloc[:, -1]
print(X)
print(Y)

# Pré-processamento normalização dos dados (min-max)

In [None]:
# -*- coding: utf-8 -*-
"""Example of using SUOD for accelerating outlier detection
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function
from sklearn.preprocessing import StandardScaler

import os
import sys

from pyod.models.suod import SUOD
from pyod.models.lof import LOF
from pyod.models.iforest import IForest
from pyod.models.knn import KNN
from pyod.models.pca import PCA
from pyod.models.hbos import HBOS
from pyod.models.ecod import ECOD
from pyod.models.copod import COPOD
from pyod.models.ocsvm import OCSVM
from pyod.utils.utility import standardizer
from pyod.utils.data import generate_data
from pyod.utils.data import evaluate_print
from pyod.utils.example import visualize

X = dfajustado
contamination=0.15



In [None]:
from pyod.models.pca import PCA
# train PCA detector
clf_name = 'PCA'
#clf = PCA(n_components=3,n_selected_components=2,contamination=contamination)
#clf.fit(X)
clf=calcula_anomalias('PCA','n_components=3,n_selected_components=2,')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)

grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.ocsvm import OCSVM
# train OCSVM detector
clf_name = 'OCSVM'

clf=calcula_anomalias(clf_name,'')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.lof import LOF
# train LOF detector
clf_name = 'LOF'
itens = len(X)/2
vizinhos=2
macuracia=0
nv=2
while vizinhos<itens: 
    clf = LOF(n_neighbors=vizinhos,contamination=contamination)
    clf.fit(X)
    y_train_pred = clf.labels_  # binary labels (0: inliers, 1: outliers)        
    ac=accuracy_score(Y,y_train_pred)
    if ac>macuracia:
        nv=vizinhos
        macuracia=ac
    vizinhos=vizinhos+1

print(nv)    

clf=calcula_anomalias(clf_name,'n_neighbors='+str(nv)+',')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.cblof import CBLOF
# train CBLOF detector
clf_name = 'CBLOF' 

clf=calcula_anomalias(clf_name,'')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.cof import COF
# train COF detector
clf_name = 'COF' 
clf=calcula_anomalias(clf_name,'')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.hbos import HBOS

# train HBOS detector
clf_name = 'HBOS'

clf=calcula_anomalias(clf_name,'n_bins=35, alpha=contamination,')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.knn import KNN
# train KNN detector
clf_name = 'KNN'
clf=calcula_anomalias(clf_name,'n_neighbors=35,')

avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.sod import SOD
# train SOD detector
clf_name = 'SOD' 
clf=calcula_anomalias(clf_name,'')

avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.copod import COPOD
# train COPOD detector
clf_name = 'COPOD'
clf=calcula_anomalias(clf_name,'')

avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.ecod import ECOD
# train ECOD detector
clf_name = 'ECOD'
clf=calcula_anomalias(clf_name,'')

avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.iforest import IForest
# train IForest detector
clf_name = 'IForest'
itens = len(X)
estimadores = len(X)//10
if estimadores<2: estimadores=2

clf=calcula_anomalias(clf_name,'n_estimators=estimadores,')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.loda import LODA
# train LODA detector
clf_name = 'LODA' 
clf=calcula_anomalias(clf_name,'')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.deep_svdd import DeepSVDD
# train DeepSVDD detector
clf_name = 'DeepSVDD'
clf=calcula_anomalias(clf_name,'verbose=0,preprocessing=True,')

avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
from pyod.models.gmm import GMM
# train GMM detector
clf_name = 'GMM' 
clf=calcula_anomalias(clf_name,'')
avalia(clf_name,Y,clf.decision_scores_,clf.labels_)
print('')
gera_matriz_de_confusao(Y,clf.labels_ ,clf_name)
grava_avaliacao(clf_name,Y,clf.decision_scores_,clf.labels_,round(contamination,3))

In [None]:
tabelafinal.sort_values("acuracia", axis = 0, ascending = False,
                 inplace = True, na_position ='last')
print(tabelafinal)