# Limpeza e Pré-processamento

In [None]:
import pandas as pd
import numpy as np

# Carregando o dataframe
df = pd.read_csv('<caminho_dados>.csv', sep=',', low_memory=False)

# Criando uma nova coluna 'co_turma' composta por 'co_entidade' e 'co_etapa_ensino'
# Esse código deve aparecer uma vez a cada ano que a turma foi registrada no censo escolar
df['co_turma'] = df['co_entidade'].astype(str) + '-' + df['co_etapa_ensino'].astype(str)

# Levantando o código das turmas que aparecem no censo de 2023
turmas_com_2023 = df[df['nu_ano_censo'] == 2023]['co_turma']

# Filtrando o dataframe apenas pelas turmas que aparecem no censo de 2023
df_filtrado_2023 = df[df['co_turma'].isin(turmas_com_2023)]

# Agrupando por turma e adicionando a quantidade de registros
df_filtrado_2023_agrupado = df_filtrado_2023.groupby(['co_turma']).size().reset_index(name='quantitativo')
df_filtrado_2023_agrupado = df_filtrado_2023_agrupado.sort_values(by='quantitativo', ascending=False)

# Removendo linhas onde o quantitativo de registros é menor que 10 e pegando o código das turmas
df_filtrado_2023_agrupado_maior_igual_10 = df_filtrado_2023_agrupado[df_filtrado_2023_agrupado['quantitativo'] >= 10]
turmas_2023_maior_igual_10 = df_filtrado_2023_agrupado_maior_igual_10['co_turma'].unique()

# Filtrando o dataframe apenas pelas turmas que aparecem no censo de 2023 e tem 10 ou mais registros
df_filtrado_2023_maior_igual_10 = df[df['co_turma'].isin(turmas_2023_maior_igual_10)]

# Copiando o df e convertendo os valores de ano para string para poder usar como nome de coluna
df_filtrado_2023_maior_igual_10_copia = df_filtrado_2023_maior_igual_10.copy()
df_filtrado_2023_maior_igual_10_copia['nu_ano_censo'] = df_filtrado_2023_maior_igual_10_copia['nu_ano_censo'].astype(str)

# Usando pivot_table para montar o df da série temporal
# Os valores de 'qtd_alunos' serão distribuídos conforme 'nu_ano_censo', para cada 'co_turma'
# Valores NaN serão preenchidos com 0 e o name do index será removido
df_serie_temporal = df_filtrado_2023_maior_igual_10_copia.pivot_table(
        index='co_turma',columns='nu_ano_censo', values='qtd_alunos', aggfunc="sum"
    ).reset_index().fillna(0)
df_serie_temporal.columns.name = None

# Adicionando uma coluna com a quantidade de registros no censo para cada turma
anos = [str(ano) for ano in range(2007, 2024)]
df_serie_temporal['nu_valores'] = df_serie_temporal[anos].apply(lambda row: (row != 0).sum(), axis=1)

# Verificando se há interrupção na série temporal
def verifica_interrupcao(row):
    serie_anos = row[anos]
    inicio_atividade = serie_anos[serie_anos != 0].first_valid_index()
    fim_atividade = serie_anos[serie_anos != 0].last_valid_index()
    return (serie_anos.loc[inicio_atividade:fim_atividade] == 0).any()

df_serie_temporal['has_interruption'] = df_serie_temporal.apply(verifica_interrupcao, axis=1)

# Salvando o df da série temporal
df_serie_temporal.to_csv('<caminho_dados_preprocessados>.csv', index=False)

# Análise Exploratória

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Transformando para formato longo
df_long = pd.melt(df_serie_temporal, id_vars=['co_turma'], var_name='nu_ano_censo', value_name='qt_censo',
                  value_vars=[str(year) for year in range(2007, 2023)])
df_long = df_long[df_long['qt_censo'] > 0]

# Função para calcular estatísticas de um ano específico
def calcular_estatisticas(df, ano):
    dados_ano = df[df['nu_ano_censo'] == str(ano)]['qt_censo']
    return {
        'Média ao longo dos anos': dados_ano.mean(),
        'Mediana (média entre turmas)': dados_ano.median(),
        'Primeiro Quartil (média entre turmas)': dados_ano.quantile(0.25),
        'Terceiro Quartil (média entre turmas)': dados_ano.quantile(0.75),
    }

# Calculando estatísticas para cada ano
anos = range(2007, 2023)
estatisticas_anuais = {ano: calcular_estatisticas(df_long, ano) for ano in anos}

# Convertendo para DataFrame e calculando a média das estatísticas
df_estatisticas_anuais = pd.DataFrame(estatisticas_anuais).T
estatisticas_media = df_estatisticas_anuais.mean().to_dict()

# Calculando o desvio padrão
df_serie_temporal['desvio_padrao'] = df_serie_temporal[[str(year) for year in anos]].replace(0, np.nan).std(axis=1, ddof=0)
desvio_padrao_medio = df_serie_temporal['desvio_padrao'].mean()

# Calculando o mínimo e máximo
minimo_geral = df_long['qt_censo'].min()
maximo_geral = df_long['qt_censo'].max()

# Adicionando os valores ao dicionário de estatísticas médias
estatisticas_media.update({
    'Desvio Padrão por turma': desvio_padrao_medio,
    'Mínimo Geral': df_long['qt_censo'].min(),
    'Máximo Geral': df_long['qt_censo'].max(),
})

# Criando o boxplot para todos os anos
plt.figure(figsize=(20, 10))
plt.ylim(-50, 300)

plt.title('Boxplot da Quantidade de Alunos por Ano')
plt.xlabel('Ano')
plt.ylabel('Quantidade de Alunos')
plt.xticks(rotation=45)

sns.boxplot(x='nu_ano_censo', y='qt_censo', data=df_long)

plt.show()

# Geração do histograma combinado para todos os anos
plt.figure(figsize=(20, 10))
plt.xlim(0, 250)

plt.title('Histograma da Quantidade de Alunos (Todos os Anos)')
plt.xlabel('Quantidade de Alunos')
plt.ylabel('Frequência')

sns.histplot(data=df_long, x='qt_censo', bins=1000) 

plt.show()

# Determinação de Baselines

In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression

def linear_regression_baseline(row, anos):
    y = row[anos].values
    X = np.array([int(year) for year in anos]).reshape(-1, 1)
    mask = y != 0
    X, y = X[mask], y[mask]
    
    if len(y) > 0:
        model = LinearRegression().fit(X, y)
        return model.predict(np.array([[2023]]))[0]
    else:
        return np.nan 

def exponential_smoothing(series, alpha):
    result = [series[0]]
    for n in range(1, len(series)):
        result.append(alpha * series[n] + (1 - alpha) * result[n-1])
    return result

def exponential_smoothing_baseline(row, anos, alpha=0.5):
    y = row[anos].values
    mask = y != 0
    y = y[mask]
    
    if len(y) > 0:
        smoothed_values = exponential_smoothing(pd.Series(y), alpha)
        return smoothed_values[-1]
    else:
        return np.nan

# Definindo baselines para cada turma (média e repetir o último valor)
anos = [str(ano) for ano in range(2007, 2023)]

# Calculando a média para os anos entre 2010 e 2022 apenas com valores diferentes de zero
df_serie_temporal['baseline_media'] = df_serie_temporal[anos].replace(0, np.NaN).mean(axis=1).round(0)

# Repetindo o valor do ano de 2022 como baseline
df_serie_temporal['baseline_last_year'] = df_serie_temporal['2022']

# Calculando predição através de regressão linear
df_serie_temporal['baseline_linear_regression'] = df_serie_temporal.apply(linear_regression_baseline, axis=1, anos=anos).round(0)

# Calculando predição através de suavização exponencial
df_serie_temporal['baseline_exponential_smoothing'] = df_serie_temporal.apply(exponential_smoothing_baseline, axis=1, anos=anos, alpha=0.5).round(0)

# Métricas de Avaliação

In [None]:
import numpy as np
from scipy.stats import wilcoxon

def evaluate_metrics(dataframe, column_base, column_predicted):
    # Calculando o Mean Absolute Error (MAE)
    mae = np.abs(dataframe[column_base] - dataframe[column_predicted]).mean()
    
    # Calculando o Standard Deviation (STD)
    std = (dataframe[column_base] - dataframe[column_predicted]).std()
    
    # Calculando o Mean Squared Error (MSE)
    mse = ((dataframe[column_base] - dataframe[column_predicted]) ** 2).mean()
    
    # Calculando o Root Mean Squared Error (RMSE)
    rmse = np.sqrt(mse)
    
    # Calculando o Mean Absolute Percentage Error (MAPE)
    mape = (np.abs((dataframe[column_base] - dataframe[column_predicted]) / dataframe[column_base])).mean() * 100
    
    # Calculando o R-squared (R²)
    ss_res = ((dataframe[column_base] - dataframe[column_predicted]) ** 2).sum()
    ss_tot = ((dataframe[column_base] - dataframe[column_base].mean()) ** 2).sum()
    r_squared = 1 - (ss_res / ss_tot)

    return {
        "mae": mae,
        "std": std,
        "mse": mse,
        "rmse": rmse,
        "mape": mape,
        "r_squared": r_squared
    } 

def apply_wilcoxon_test(df, column1, column2):
    stat, p_value = wilcoxon(df[column1], df[column2])
    return stat, p_value

def test_wilcoxon(df):
    results = {
        'Baseline': [],
        'W-Statistic': [],
        'P-Value': []
    }
    
    baselines = ['baseline_media', 'baseline_last_year', 'baseline_linear_regression', 'baseline_exponential_smoothing']
    
    for baseline in baselines:
        w_stat, p_value = apply_wilcoxon_test(df, baseline, 'predicao_kf')
        results['Baseline'].append(baseline)
        results['W-Statistic'].append(w_stat)
        results['P-Value'].append(p_value)
        
    return pd.DataFrame(results)

# Desenvolvimento do Filtro de Kalman

In [None]:
from pykalman import KalmanFilter
import numpy as np
import pandas as pd

def run_kalman_filter(data, em_iterations, year_gap):
    # Criando DataFrame temporário para pré-processamento
    temp_df = pd.DataFrame(data)
    temp_df = temp_df[temp_df.cumsum() != 0] # Removendo os zeros iniciais das séries temporais    
    temp_df = temp_df.ffill()                # Preenchendo os valores zero no meio da série temporal pelo último valor válido
    temp_df = temp_df.dropna()               # Removendo os NaNs resultantes
    series = temp_df.values.reshape(-1, 1)   # Extraindo os valores e redimensionando

    # Definindo filtro
    kalman_filter = KalmanFilter(em_vars=['transition_covariance', 'observation_covariance'])
    
    # Rodando Filtro de Kalman
    kalman_filter = kalman_filter.em(series, n_iter=em_iterations) # Executando o algoritmo EM para estimativa de parâmetros
    _, _ = kalman_filter.filter(series)                            # Filtrando os dados
    smoothed_state_means, _ = kalman_filter.smooth(series)         # Suavizando os dados

    # Predizendo o próximo valor da série
    last_smoothed_state = smoothed_state_means[-1]                          # Obtendo o último estado suavizado
    next_state = kalman_filter.transition_matrices.dot(last_smoothed_state) # Aplicando a matriz de transição para prever o próximo estado
    if year_gap == 2:
        next_state = kalman_filter.transition_matrices.dot(next_state)      # Aplicando novamente a matriz de transição para casos em que o gap é de 2 anos
    
    next_observation = kalman_filter.observation_matrices.dot(next_state)   # Aplicando a matriz de observação para prever a próxima observação

    # Retornar o valor previsto
    return np.round(next_observation[0]) 