# Rodar Teste 3: influência da qtd de antenas

Diferença para os testes anteriores: aqui, cada qtd de antenas deve gerar um modelo diferente.

## Imports

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
from typing import Optional, Any
from pathlib import Path
from time import time
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix
from sklearn.tree import DecisionTreeClassifier
from sklearn.utils import shuffle
from sklearn.model_selection import KFold
from joblib import Parallel, delayed

## Funções

In [2]:
def abrirDataframe(
    dirDatasets:        Path, 
    snr:                float, 
    qtdUsuarios:        int,
    datasetTreinamento: bool
) -> pd.DataFrame:
    
    prefixo    = "train_3_" if datasetTreinamento else "test_3_"        
    caminhoCSV = sorted(dirDatasets.glob(prefixo+str(snr)+"_dB_"+str(qtdUsuarios)+"_users_*.csv"))[-1]
    df         = pd.read_csv(caminhoCSV)
    
    return df

In [3]:
def separarXeY(
    dfTreinamento:        pd.DataFrame,
    dfTeste:              pd.DataFrame, 
    featuresSelecionadas: Optional[list[str]] = None
) -> (pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series, pd.Series):
    
    # SELECIONANDO TODAS AS FEATURES SE FEATURESSELECIONADAS VIER NONE
    if featuresSelecionadas is None:
        featuresSelecionadas = df.columns.drop(["potenciaEspiao", "ataquePresente"])
    
    # SEPARANDO OS ARRAYS
    XTrain           = dfTreinamento[featuresSelecionadas]
    XTest            = dfTeste[featuresSelecionadas]
    yTrain           = dfTreinamento["ataquePresente"]
    yTest            = dfTeste["ataquePresente"]
    qtdAntenasTrain  = dfTreinamento["qtdAntenas"]
    qtdAntenasTest   = dfTeste["qtdAntenas"]
    potEspiaoTrain   = dfTreinamento["potenciaEspiao"]
    potEspiaoTest    = dfTeste["potenciaEspiao"]
    
    return XTrain, XTest, yTrain, yTest, qtdAntenasTrain, qtdAntenasTest, potEspiaoTrain, potEspiaoTest

In [4]:
def deteccaoMachineLearning(
    XTrain:           pd.Series, 
    XTest:            pd.Series, 
    yTrain:           pd.Series, 
    objClassificador: Any
) -> (list[bool], np.float64, np.float64):
    
    # PARA MEDIR O TEMPO DE CLASSIFICACAO DE CADA AMOSTRA E GUARDAR OS RESULTADOS
    arrayTempos = pd.Series(data=np.zeros(len(XTest)), index=XTest.index)
    yPredFinal  = pd.Series(data=np.zeros(len(XTest)), index=XTest.index)

    # PARA CADA QTD DE ANTENAS, VAMOS TREINAR UM MODELO DIFERENTE
    rangeQtdAntenas = np.unique(XTrain["qtdAntenas"])
    for qtdAntenas in rangeQtdAntenas:
        
        # PEGO OS DADOS SO DA QTD DE ANTENS QUE QUERO E TIRO A QTD DE ANTENAS DAS FEATURES
        XTrainQtdAntenasAtual = XTrain.loc[XTrain["qtdAntenas"]==qtdAntenas].copy().drop(["qtdAntenas"], axis=1)
        XTestQtdAntenasAtual  = XTest.loc[XTest["qtdAntenas"]==qtdAntenas].copy().drop(["qtdAntenas"], axis=1)
        yTrainQtdAntenasAtual = yTrain.loc[XTrainQtdAntenasAtual.index].copy()
        
        # TREINANDO O CLASSIFICADOR
        objClassificador.fit(XTrainQtdAntenasAtual, yTrainQtdAntenasAtual)

        # PREDIZENDO OS DADOS DE TESTE
        for id, amostraTeste in XTestQtdAntenasAtual.iterrows():
            tic                 = time()
            yPredFinal.loc[id]  = objClassificador.predict([amostraTeste.to_numpy()])[0]
            toc                 = time()
            arrayTempos.loc[id] = toc-tic
            
    # PD -> lista
    arrayTempos = arrayTempos.tolist()
    yPredFinal = yPredFinal.astype(bool).tolist()
        
    # METRICAS DE TEMPO
    mediaTempo, desvioTempo = np.mean(arrayTempos), np.std(arrayTempos)
        
    return yPredFinal, mediaTempo, desvioTempo

In [5]:
def deteccaoHassan(
    XTest: pd.Series
) -> (np.ndarray, np.float64, np.float64):
    
    arrayTempos = np.zeros(len(XTest))
    yPred       = np.zeros(len(XTest))
    
    for i, (id, amostraTeste) in enumerate(XTest.iterrows()):
        tic            = time()
        yPred[i]       = 1 if amostraTeste["E"] > amostraTeste["eta"] else 0
        toc            = time()
        arrayTempos[i] = toc-tic

    mediaTempo, desvioTempo = np.mean(arrayTempos), np.std(arrayTempos)
        
    return yPred, mediaTempo, desvioTempo

In [6]:
def predizerEsquemaDeteccao(
    esquemaDeteccao:      str,
    XTrain:               pd.Series, 
    XTest:                pd.Series, 
    yTrain:               pd.Series, 
    classificador:        Any
) -> (np.ndarray, np.float64, np.float64):
    
    if esquemaDeteccao == "ML":
        yPred, tempoMedioPredicao, tempoDesvioPredicao = deteccaoMachineLearning(XTrain, XTest, yTrain, classificador)
    elif esquemaDeteccao == "Hassan":
        yPred, tempoMedioPredicao, tempoDesvioPredicao = deteccaoHassan(XTest)
    else:
        raise Exception("O parâmetro esquemaDeteccao deve ser ML ou Hassan!")
        
    return yPred, tempoMedioPredicao, tempoDesvioPredicao

In [7]:
def obterValoresUnicos(
    arrayQtdAntenas: np.ndarray, 
    arrayPotsEspiao: np.ndarray
) -> (np.ndarray, np.ndarray):
    
    rangeQtdAntenas = np.unique(arrayQtdAntenas)
    rangePotEspiao  = np.unique(arrayPotsEspiao)
    
    return rangeQtdAntenas, rangePotEspiao

In [8]:
def calcularMetricas(
    matrizProbabilidadeDeteccaoCadaKFold: list[np.ndarray],
    matrizConfusaoCadaKFold:              list[np.ndarray],
    acuraciaCadaKFold:                    list[np.float64],
    precisaoCadaKFold:                    list[np.float64],
    revocacaoCadaKFold:                   list[np.float64],
    tempoProcessamentoCadaKFold:          list[np.float64],
    rangePotEspiao:                       np.ndarray,
    rangeQtdAntenas:                      np.ndarray
) -> dict[str, Any]:
    
    # MATRIZ DE PROBABILIDADE DE DETECCAO MEDIA E DESVIO EM CADA POT ESP E CADA QtdUsuarios
    matrizProbabilidadeDeteccaoMedia  = np.mean(matrizProbabilidadeDeteccaoCadaKFold, axis=0)
    matrizProbabilidadeDeteccaoMax    = np.max(matrizProbabilidadeDeteccaoCadaKFold, axis=0)
    matrizProbabilidadeDeteccaoMin    = np.min(matrizProbabilidadeDeteccaoCadaKFold, axis=0)
    matrizProbabilidadeDeteccaoDesvio = np.std(matrizProbabilidadeDeteccaoCadaKFold, axis=0)

    # MATRIZ DE CONFUSAO GERAL (SOMA DE TODAS AS ITERACOES)
    somaMatrizesConfusao = np.sum(matrizConfusaoCadaKFold, axis=0)

    # MEDIA E DESVIO DE OUTRAS METRICAS
    acuraciaMedia, acuraciaDesvio                      = np.mean(acuraciaCadaKFold), np.std(acuraciaCadaKFold)
    precisaoMedia, precisaoDesvio                      = np.mean(precisaoCadaKFold), np.std(precisaoCadaKFold)
    revocacaoMedia, revocacaoDesvio                    = np.mean(revocacaoCadaKFold), np.std(revocacaoCadaKFold)
    tempoProcessamentoMedio, tempoProcessamentoDesvio  = np.mean(tempoProcessamentoCadaKFold), np.std(tempoProcessamentoCadaKFold)
    
    # MONTANDO O DICIONARIO COM AS METRICAS
    dictMetricas = {
        "rangePotEspiao":                    rangePotEspiao.tolist(),
        "rangeQtdAntenas":                   rangeQtdAntenas.tolist(),
        "acuraciaMedia":                     float(acuraciaMedia),
        "acuraciaDesvio":                    float(acuraciaDesvio),
        "precisaoMedia":                     float(precisaoMedia),
        "precisaoDesvio":                    float(precisaoDesvio),
        "revocacaoMedia":                    float(revocacaoMedia),
        "revocacaoDesvio":                   float(revocacaoDesvio),
        "tempoProcessamentoMedio":           float(tempoProcessamentoMedio),
        "tempoProcessamentoDesvio":          float(tempoProcessamentoDesvio),
        "somaMatrizesConfusao":              somaMatrizesConfusao.tolist(),
        "matrizProbabilidadeDeteccaoMedia":  matrizProbabilidadeDeteccaoMedia.tolist(),
        "matrizProbabilidadeDeteccaoMax":    matrizProbabilidadeDeteccaoMax.tolist(),
        "matrizProbabilidadeDeteccaoMin":    matrizProbabilidadeDeteccaoMin.tolist(),
        "matrizProbabilidadeDeteccaoDesvio": matrizProbabilidadeDeteccaoDesvio.tolist()
    }
    
    return dictMetricas

In [9]:
def salvarResultados(
    prefixoKeyDict:        str, 
    snr:                   int, 
    qtdUsuarios:           int, 
    qtdSimbolos:           int,
    repetibilidadeDataset: int,
    qtdFolders:            int, 
    featuresSelecionadas:  list[str],
    dictMetricas:          dict[str, Any],
    arquivoSalvar:         Path
) -> None:
    
    # KEY DO JSON QUE VAI GUARDAR ESSE RESULTADO
    key = prefixoKeyDict + "_" + str(time()).replace(".", "")
    
    # INICIANDO O DICIONARIO DE RESULTADOS
    dictResultado = {
        key: {
            "snr":                   snr,
            "qtdUsuarios":           qtdUsuarios,
            "qtdSimbolos":           qtdSimbolos,
            "repetibilidadeDataset": repetibilidadeDataset,
            "qtdFolders":            qtdFolders,
            "featuresSelecionadas":  featuresSelecionadas
        }
    }
    
    # CONCATENANDO COM O DICIONARIO DE METRICAS MEDIAS E DESVIOS
    dictResultado[key] |= dictMetricas
    
    # SALVANDO
    with open(arquivoSalvar, mode="r+") as file:
        dictJSON  = json.load(file)
        dictJSON |= dictResultado
        file.seek(0)
        json.dump(dictJSON, file, indent=4)
        file.truncate()

In [10]:
def rodarTesteDeteccao(
    esquemaDeteccao:      str,
    featuresSelecionadas: list[str],
    dirDatasets:          Path, 
    snr:                  float, 
    qtdUsuarios:          int,
    qtdFolders:           int,
    classificador:        Any
) -> dict[str, Any]:

    # ARRAYS QUE VAO GUARDAR OS RESULTADOS DE CADA ITERACAO DO KFOLD
    matrizProbabilidadeDeteccaoCadaKFold = []
    acuraciaCadaKFold                    = []
    precisaoCadaKFold                    = []
    revocacaoCadaKFold                   = []
    matrizConfusaoCadaKFold              = []
    tempoProcessamentoCadaKFold          = []

    # ABRINDO OS DATAFRAMES DE TREINAMENTO E TESTE E SEPARANDO X E Y
    dfTreinamento = abrirDataframe(dirDatasets, snr, qtdUsuarios, datasetTreinamento=True)
    dfTeste       = abrirDataframe(dirDatasets, snr, qtdUsuarios, datasetTreinamento=False)
    XTrainGlobal, XTestGlobal, yTrainGlobal, yTestGlobal, qtdAntenasTrainGlobal, qtdAntenasTestGlobal, potEspTrainGlobal, potEspTestGlobal = separarXeY(dfTreinamento, dfTeste, featuresSelecionadas)

    # VALORES UNICOS DE QTDANTENAS E POTENCIA DE ESPIAO PRA EU SABER O INDEX DAS MATRIZES DE RESULTADOS
    rangeQtdAntenas, rangePotEspiao = obterValoresUnicos(qtdAntenasTestGlobal, potEspTestGlobal)

    # OBTENDO INDICES DE TREINAMENTO E TESTE COM KFOLD (NAO DA PRA USAR DO JEITO FACIL PQ SAO DOIS DATAFRAMES DIFERENTES)
    objKFold = KFold(n_splits=qtdFolders, shuffle=True)
    pastasTreinamento, _ = zip(*objKFold.split(XTrainGlobal))
    _, pastasTeste       = zip(*objKFold.split(XTestGlobal))

    # RODANDO O KFOLD
    for indexesTreinamento, indexesTeste in zip(pastasTreinamento, pastasTeste):

        # SEPARANDO X, Y, QTDANTENAS E POTENCIAS DO ESPIAO DE TREINAMENTO E TESTE
        XTrain, XTest                     = XTrainGlobal.loc[indexesTreinamento], XTestGlobal.loc[indexesTeste]
        yTrain, yTest                     = yTrainGlobal.loc[indexesTreinamento], yTestGlobal.loc[indexesTeste]
        qtdAntenasTrain, qtdAntenasTest   = qtdAntenasTrainGlobal.loc[indexesTreinamento], qtdAntenasTestGlobal.loc[indexesTeste]
        potEspTrain, potEspTest           = potEspTrainGlobal.loc[indexesTreinamento], potEspTestGlobal.loc[indexesTeste]

        # PREDICAO
        yPred, tempoMedioPredicao, _ = predizerEsquemaDeteccao(esquemaDeteccao, XTrain, XTest, yTrain, classificador)
        
        # GRAVANDO METRICAS
        acuraciaCadaKFold.append(accuracy_score(yTest, yPred))                    
        precisaoCadaKFold.append(precision_score(yTest, yPred))                    
        revocacaoCadaKFold.append(recall_score(yTest, yPred))
        matrizConfusaoCadaKFold.append(confusion_matrix(yTest, yPred))
        tempoProcessamentoCadaKFold.append(tempoMedioPredicao)          

        # AGREGO NAS MATRIZES CONTADORAS
        matrizContadoraDeteccoes   = np.zeros(shape=(len(rangePotEspiao), len(rangeQtdAntenas)))
        matrizContadoraOcorrencias = np.zeros(matrizContadoraDeteccoes.shape)
        for (indexTesteAtual, predicaoAtual) in zip(yTest.index, yPred):

            # PEGANDO EM QUAL LINHA E EM QUAL COLUNA DA MATRIZ EU VOU COLOCAR O RESULTADO
            indexPotEsp     = np.where(rangePotEspiao==potEspTest[indexTesteAtual])[0][0]
            indexQtdAntenas = np.where(rangeQtdAntenas==qtdAntenasTest[indexTesteAtual])[0][0]

            # COLOCO NAS MATRIZES DE RESULTADOS
            matrizContadoraDeteccoes[indexPotEsp][indexQtdAntenas]   += predicaoAtual
            matrizContadoraOcorrencias[indexPotEsp][indexQtdAntenas] += 1

        # MATRIZ DA PROBABILIDADE DE DETECCAO EM CADA POT ESPIAO (LINHAS) E CADA QtdAntenas (COLUNAS)
        matrizProbabilidadeDeteccaoCadaKFold.append(matrizContadoraDeteccoes/matrizContadoraOcorrencias)

    # CALCULANDO TODAS AS MEDIAS E DESVIOS DAS METRICAS AO LONGO DO KFOLD E COLOCANDO NUM DICIONARIO
    dictMetricas = calcularMetricas(matrizProbabilidadeDeteccaoCadaKFold, matrizConfusaoCadaKFold, acuraciaCadaKFold, precisaoCadaKFold, revocacaoCadaKFold, tempoProcessamentoCadaKFold, rangePotEspiao, rangeQtdAntenas)

    return dictMetricas

## Parâmetros iniciais

In [11]:
# PARAMETROS
classificador              = DecisionTreeClassifier()
snr                        = 10
qtdUsuarios                = 64
qtdSimbolos                = 300
repetibilidadeDataset      = 100
qtdFolders                 = 10
nucleosProcessador         = -1
verbose                    = 100
featuresSelecionadasML     = ["qtdAntenas", "E"] # antenas entra como feature aqui, mas vai servir so pra separar os modelos
featuresSelecionadasHassan = ["E", "eta"]
dirDatasets                = Path("../../datasets/test_3/")

## Iniciando o JSON que vai guardar os resultados

In [12]:
# INICIANDO O JSON QUE VAI GUARDAR OS RESULTADOS
dictResultados = {}
arquivoSalvar  = Path("../../results/test_3/detection_probability_" + str(time()).replace(".", "") + ".json")
with open(arquivoSalvar, mode="w") as file:
    json.dump(dictResultados, file)

## Rodando os testes de detecção com machine learning e Hassan

In [13]:
# PARAMETROS INICIAIS
prefixoKeyDict       = classificador.__class__.__name__
esquemaDeteccao      = "ML"
featuresSelecionadas = featuresSelecionadasML

# RODANDO O TESTE E CALCULANDO METRICAS
dictMetricas = rodarTesteDeteccao(esquemaDeteccao, featuresSelecionadas, dirDatasets, snr, qtdUsuarios, qtdFolders, classificador)

# SALVANDO O RESULTADO
salvarResultados(prefixoKeyDict, snr, qtdUsuarios, qtdSimbolos, repetibilidadeDataset, qtdFolders, featuresSelecionadas, dictMetricas, arquivoSalvar)

In [14]:
# PARAMETROS INICIAIS
prefixoKeyDict       = "Hassan"
esquemaDeteccao      = prefixoKeyDict
featuresSelecionadas = featuresSelecionadasHassan

# RODANDO O TESTE E CALCULANDO METRICAS
dictMetricas = rodarTesteDeteccao(esquemaDeteccao, featuresSelecionadas, dirDatasets, snr, qtdUsuarios, qtdFolders, classificador)

# SALVANDO O RESULTADO
salvarResultados(prefixoKeyDict, snr, qtdUsuarios, qtdSimbolos, repetibilidadeDataset, qtdFolders, featuresSelecionadas, dictMetricas, arquivoSalvar)