In [None]:
import pandas as pd
import numpy as np
from sklearn.neighbors import KNeighborsClassifier

from datetime import datetime, timedelta
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import os
import sys
import logging
from logging.handlers import TimedRotatingFileHandler

In [None]:
ALGORITHM = "knn"
RANDOM_SEED = 42
TAMANHO_DA_JANELA_EM_DIAS = 7
TIPO_ILUMINANCIA = "caixa"

LOG_SAVE_PATH = "logs_iluminancia"
LOG_FILENAME = f"{ALGORITHM}_{TAMANHO_DA_JANELA_EM_DIAS}_dias_iluminancia_{TIPO_ILUMINANCIA}.log"

RESULTS_SAVE_PATH = "logs_iluminancia"
DAILY_RESULTS_FILENAME = f"{ALGORITHM}_{TAMANHO_DA_JANELA_EM_DIAS}_dias_iluminancia_{TIPO_ILUMINANCIA}.csv"
RESULTS_FILENAME = f"{ALGORITHM}_iluminancia_{TIPO_ILUMINANCIA}.csv"

In [None]:
def log_setup(filename, log_level):
    logger = logging.getLogger(filename)
    logger.setLevel(log_level)
    formatter = logging.Formatter(fmt='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                                  datefmt='%m-%d-%y %H:%M:%S')
    fh = TimedRotatingFileHandler(filename, when='midnight')
    fh.setFormatter(formatter)
    sh = logging.StreamHandler(sys.stdout)
    sh.setLevel(log_level)
    sh.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(sh)
    return logger

In [None]:
os.makedirs(LOG_SAVE_PATH, exist_ok=True)
os.makedirs(RESULTS_SAVE_PATH, exist_ok=True)

In [None]:
if os.path.isfile(f"{LOG_SAVE_PATH}/{LOG_FILENAME}"):
    os.remove(f"{LOG_SAVE_PATH}/{LOG_FILENAME}")
logger = log_setup(f"{LOG_SAVE_PATH}/{LOG_FILENAME}", logging.INFO)

In [None]:
data = pd.read_csv("mai_2022_fev_2023.csv")

In [None]:
len(data)

In [None]:
data.head()

In [None]:
data.tail()

In [None]:
features_names = list(data.columns)[:-1]
logger.info(f"features_names: {features_names}")
logger.info(f"len(features_names): {len(features_names)}")

In [None]:
features_names_iluminancia_teto = features_names.copy()
features_names_iluminancia_caixa = features_names.copy()

features_names_iluminancia_teto.remove('iluminancia_caixa')
features_names_iluminancia_caixa.remove('iluminancia_teto')

logger.info(f"features_names_iluminancia_teto: {features_names_iluminancia_teto}")
logger.info(f"features_names_iluminancia_caixa: {features_names_iluminancia_caixa}")

In [None]:
if TIPO_ILUMINANCIA == "teto":
    features_names = features_names_iluminancia_teto.copy()
elif TIPO_ILUMINANCIA == "caixa":
    features_names = features_names_iluminancia_caixa.copy()
else:
    raise ValueError("Escolha entre os dois tipos de iluminâncias: 'teto' ou 'caixa'")

logger.info(f"features_names: {features_names}")

In [None]:
data['date'] = pd.to_datetime(dict(year=data.data_ano, month=data.data_mes, day=data.data_dia))

In [None]:
original_data = data.copy()

In [None]:
data['date'].unique()

In [None]:
def save_or_append_avg_metrics(colunas_geral, metrics_geral, path):
    new_data = pd.DataFrame(columns=colunas_geral, data=metrics_geral)
    
    if os.path.isfile(path):
        data = pd.read_csv(path)
        data = pd.concat([data, new_data])
        data.to_csv(path, index=False)
        print(f"Arquivo já existe: {path}")
    else:
        print("Arquivo não existe")
        new_data.to_csv(path, index=False)

In [None]:
colunas = ["Teste", "Precision", "Recall", "F1", "Accuracy"]

metrics = []
precision_list = []
recall_list = []
f1_list = []
accuracy_list = []

metrics_w = []
precision_w_list = []
recall_w_list = []
f1_w_list = []
accuracy_w_list = []

i = 0

while True:
    data = original_data.copy()
    logger.info(f"==================== Teste nº{i+1} (i={i}) ====================")

    min_date = datetime.strptime(str(data['date'].min())[:10], '%Y-%m-%d')
    max_date = datetime.strptime(str(data['date'].max())[:10], '%Y-%m-%d')
    
    # Dados de treino
    # Aqui foi colocado o "+30" para começar depois da data do primeiro teste
    initial_date_train = min_date + timedelta(days = i) 
    final_date_train = initial_date_train + timedelta(days = TAMANHO_DA_JANELA_EM_DIAS)
    if final_date_train > max_date:
        logger.info("A data final de treino é maior que a data máxima")
        break

    # Dados de teste
    initial_date_test = final_date_train
    final_date_test = initial_date_test + timedelta(days = TAMANHO_DA_JANELA_EM_DIAS)

    if initial_date_test > max_date:
        logger.info("A data inicial de teste é maior que a data máxima")
        break
    elif final_date_test > max_date:
        logger.info("A data final de teste é maior que a data máxima")
        break
    
    logger.info(f"initial_date_train: {initial_date_train}")
    logger.info(f"final_date_train: {final_date_train}")
    logger.info(f"initial_date_test: {initial_date_test}")
    logger.info(f"final_date_test: {final_date_test}")

    data_train = data.loc[(data['date'] >= initial_date_train) & (data['date'] < final_date_train)]
    data_test = data.loc[(data['date'] >= initial_date_test) & (data['date'] < final_date_test)]

    logger.info(f"min_date_train: {data_train['date'].min()}")
    logger.info(f"max_date_train: {data_train['date'].max()}")
    logger.info(f"min_date_test: {data_test['date'].min()}")
    logger.info(f"max_date_test: {data_test['date'].max()}")
    
    if str(data_train['date'].min()) == "NaT" or \
        str(data_train['date'].max()) == "NaT" or \
        str(data_test['date'].min()) == "NaT" or \
        str(data_test['date'].max()) == "NaT":
        logger.info("Erro: amostras vazias.")
        i += 1
        continue

    logger.info(f"Distribuição de classes - Treino: {data_train['output'].value_counts()}")
    logger.info(f"Distribuição de classes - Teste: {data_test['output'].value_counts()}")

    X_train = data_train[features_names]

    X_train = X_train.values.tolist()
    y_train = data_train['output'].values.tolist()

    X_test = data_test[features_names]
    if len(X_train) < 5 or len(X_test) < 5:
        logger.info("Quantidade de amostras é menor que a quantidade de vizinhos...")
        i += 1
        continue

    X_test = X_test.values.tolist()
    y_test = data_test['output'].values.tolist()

    # Train-Test
    logger.info("-- Train/Test --")
    clf = KNeighborsClassifier()
    try:
        clf.fit(X_train, y_train)
    except Exception as error:
        logger.info(f"Erro: {error}")
        continue

    y_pred = clf.predict(X_test)

    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')
    acc = accuracy_score(y_test, y_pred)

    metrics.append([i+1, precision, recall, f1, acc])
    precision_list.append(precision)
    recall_list.append(recall)
    f1_list.append(f1)
    accuracy_list.append(acc)

    i += 1

In [None]:
colunas_geral = [
    "Janela",
    "Media_Precision",
    "Media_Recall",
    "Media_F1",
    "Media_Accuracy",
    "Desvio_Precision",
    "Desvio_Recall",
    "Desvio_F1",
    "Desvio_Accuracy"
]
metrics_geral = [
    [TAMANHO_DA_JANELA_EM_DIAS,
    np.mean(precision_list),
    np.mean(recall_list),
    np.mean(f1_list),
    np.mean(accuracy_list),
    np.std(precision_list),
    np.std(recall_list),
    np.std(f1_list),
    np.std(accuracy_list)]
]

save_or_append_avg_metrics(colunas_geral, metrics_geral, f"{RESULTS_SAVE_PATH}/{RESULTS_FILENAME}")

# Resultados diários

In [None]:
analise_dias = pd.DataFrame(columns=colunas, data=metrics)
analise_dias.to_csv(f"{RESULTS_SAVE_PATH}/{DAILY_RESULTS_FILENAME}", index=False)