## Setup

In [None]:
%pip install swifter neuralprophet plotly kaleido statsmodels mariadb

In [None]:
import pandas as pd
import numpy as np
import typing
import matplotlib.pyplot as plt
import swifter
import dateutil
import datetime
import kaleido
import plotly
from tqdm import tqdm
from statsmodels.graphics.tsaplots import plot_pacf
from statsmodels.tsa.stattools import adfuller
from pmdarima.arima import KPSSTest, auto_arima
from pmdarima.model_selection import train_test_split
import mariadb

from neuralprophet import NeuralProphet

## Data preparation

### Loading data

In [None]:
try:
    mydb = mariadb.connect(host="172.17.0.3", database = 'dados_tribunais',user="root", passwd="abc@123")
    query = "select * from processos where grau='JE' and tribunal='TJRN' and orgao_julgador = '4º JUIZADO ESPECIAL DA FAZENDA PÚBLICA';"
    df_tribunal = pd.read_sql(query,mydb)
    mydb.close() #close the connection
except Exception as e:
    mydb.close()
    print(str(e))

In [None]:
#df_tribunal = pd.read_csv('dados/processados/serie_temporal_ajuizamento-TJRN_G1_2018.csv', header='infer', sep=';')
df_tribunal.drop(columns=['numero_processo', 'tribunal', 'classe', 'ultima_atualizacao', 'codigo', 'municipio', 'grau', 'assuntos', 'movimentos', 'data_ajuizamento'], inplace=True, errors='ignore')
print(df_tribunal.head(2))
#df_tribunal['data_ajuizamento'] = df_tribunal['data_ajuizamento'].swifter.apply(dateutil.parser.parse)
#df_tribunal['data_ajuizamento'] = df_tribunal['data_ajuizamento'].swifter.apply(datetime.datetime.date)

df_tribunal['data_sentenca'] = df_tribunal['data_sentenca'].swifter.apply(dateutil.parser.parse)
df_tribunal['data_sentenca'] = df_tribunal['data_sentenca'].swifter.apply(datetime.datetime.date)


In [None]:
dataframe = df_tribunal.groupby(['data_sentenca']).size()
df_tribunal = dataframe.to_frame(name='quantidade')
df_tribunal = df_tribunal.rename_axis('data_sentenca').reset_index()
print(df_tribunal)

In [None]:
#df_tribunal['data_ajuizamento'] = pd.to_datetime(df_tribunal['data_ajuizamento'])

### Visualização dos dados

Série Temporal dos ajuizamentos ao longo do tempo

In [None]:
plt.figure(figsize=(18, 6))
plt.plot(df_tribunal['quantidade'])
plt.legend('número de sentenças')
plt.savefig('producao_4jefp.png', )

## Análise estatística

In [None]:
## PACF

df_pacf = df_tribunal
df_pacf.set_index('data_sentenca', inplace=True)
plot_pacf(df_pacf)

In [None]:
adf, pval, usedlag, nobs, crit_vals, icbest =  adfuller(df_pacf.quantidade.values)
print('ADF test statistic:', adf)
print('ADF p-values:', pval)
print('ADF number of lags used:', usedlag)
print('ADF number of observations:', nobs)
print('ADF critical values:', crit_vals)
print('ADF best information criterion:', icbest)

if pval < 0.5:
    print('\n\nA série temporal é estacionária')
else:
    print('\n\nA série temporal não é estacionária')

# Métodos e modelos

## ARIMA e variantes

In [None]:
def mean_square_error(y_true, y_pred):
    y_true = np.asmatrix(y_true).reshape(-1)
    y_pred = np.asmatrix(y_pred).reshape(-1)

    return np.square(np.subtract(y_true, y_pred)).mean()

def root_mean_square_error(y_true, y_pred):

    return mean_square_error(y_true, y_pred)**0.5


def mean_absolute_percentage_error(y_true, y_pred):
    y_true = np.asarray(y_true).reshape(-1)
    y_pred = np.asarray(y_pred).reshape(-1)

    if len(np.where(y_true == 0)[0]) > 0:
        return np.inf
    else:
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100


def mean_absolute_error(y_true, y_pred):

    y_true = np.asarray(y_true).reshape(-1)
    y_pred = np.asarray(y_pred).reshape(-1)

    return np.mean(np.abs(y_true - y_pred))


def u_theil(y_true, y_pred):
    y_true = np.asarray(y_true).reshape(-1)
    y_pred = np.asarray(y_pred).reshape(-1)

    error_sup = np.square(np.subtract(y_true, y_pred)).sum()
    error_inf = np.square(np.subtract(y_pred[0:(len(y_pred) - 1)], y_pred[1:(len(y_pred))])).sum()

    return error_sup / error_inf


def average_relative_variance(y_true, y_pred):
    y_true = np.asarray(y_true).reshape(-1)
    y_pred = np.asarray(y_pred).reshape(-1)
    mean = np.mean(y_true)

    error_sup = np.square(np.subtract(y_true, y_pred)).sum()
    error_inf = np.square(np.subtract(y_pred, mean)).sum()

    return error_sup / error_inf


def prediction_of_change_in_direction(y_true, y_pred):
    y_true = np.asarray(y_true).reshape(-1)
    y_pred = np.asarray(y_pred).reshape(-1)

    true_sub = np.subtract(y_true[0:(len(y_true) - 1)], y_true[1:(len(y_true))])
    pred_sub = np.subtract(y_pred[0:(len(y_pred) - 1)], y_pred[1:(len(y_pred))])

    mult = true_sub * pred_sub
    result = 0
    for m in mult:
        if m > 0:
            result = result + 1

    return (100 * (result / len(y_true)))


def gerenerate_metric_results(y_true, y_pred):
    return {'MSE': mean_square_error(y_true, y_pred),
            'RMSE':root_mean_square_error(y_true, y_pred),
            'MAPE': mean_absolute_percentage_error(y_true, y_pred),
            'MAE': mean_absolute_error(y_true, y_pred),
            'theil': u_theil(y_true, y_pred),
            'ARV': average_relative_variance(y_true, y_pred),
            'POCID': prediction_of_change_in_direction(y_true, y_pred)}


In [None]:
def arima_fit_predict(ts, m, horizon, test_size):
  ts_train = ts[0:-(test_size+(horizon-1))].values
  ts_test = ts[-(test_size+(horizon-1)):].values

  model = auto_arima(ts_train, #X=None,
            start_p=2, start_q=2, max_p=6, max_d=2,
            max_q=6, start_P=1, start_Q=1, max_P=2,
            max_D=1, max_Q=2, max_order=15, m=m,
            stepwise=True, trace=True)

  direct_predict = model.predict(ts_test.shape[0])

  prevs_h_steps = []

  for t in tqdm(ts_test):
    prevs_h_steps.append(model.predict(horizon)[horizon-1])
    model.update(t)

  return prevs_h_steps[0:test_size], direct_predict[-test_size:]

In [None]:
test_size = 180
m = 30
horizon = 2
ts = df_pacf['quantidade']
prevs_h_steps, direct_predict = arima_fit_predict(ts, m, horizon, test_size)

In [None]:
df_result = pd.DataFrame(
    {
        'real': ts.iloc[-test_size:],
        'direct_predict': direct_predict,
        'arima_1_steps': prevs_h_steps,
    }
)
df_result.plot()
plt.xticks(rotation=45)

In [None]:
y_true = ts.iloc[-test_size:]
metric_results = []
for model_name in ['direct_predict', 'arima_1_steps']:
  y_true = df_result['real']
  y_pred = df_result[model_name]
  dict_result = gerenerate_metric_results(y_true, y_pred)
  dict_result['model_name'] = model_name
  metric_results.append(dict_result)
  
pd.DataFrame(metric_results)  

## NeuralProphet

In [None]:
dados = pd.DataFrame()
df_tribunal.reset_index(inplace=True)
dados['ds'] = df_tribunal['data_sentenca']
dados['y'] = df_tribunal['quantidade']

In [None]:
#dados.to_csv('dados/neural_prophet_TJRN_G1_2018.csv', header=True, sep=';', index=False)

In [None]:
#dados = pd.read_csv('dados/neural_prophet_TJRN_G1_2018.csv', header='infer', sep=';')
#dados = dados[dados.y < 6000]
m = NeuralProphet(n_lags=1, n_forecasts=7, ar_layers=(16, 64))
m = m.add_country_holidays("BR")
m.set_plotting_backend("plotly-static")
df_train, df_val = m.split_df(dados, valid_p=0.25)
metrics = m.fit(df_train, validation_df=df_val, freq='D', progress="plot")
forecast = m.predict(dados)
m.plot(forecast, xlabel="Data", ylabel="Quantidade de sentenças", figsize=(18, 6))

In [None]:
m.plot_components(forecast)

In [None]:
m.plot_parameters()

In [None]:
print(metrics.tail(5))

In [None]:
print(dados['y'].describe())

## Prophet

In [None]:
recesso_judiciario = pd.DataFrame({
  'holiday': 'recesso_judiciario',
  'ds': pd.to_datetime(['2018-12-19', '2019-12-19', 
                        '2020-12-19', '2021-12-19',
                        '2022-12-19', '2023-12-19',
                        '2024-12-19', '2025-12-19']),
  'lower_window': 0,
  'upper_window': 18,
})

In [None]:
from prophet import Prophet
from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics
from prophet.plot import plot_cross_validation_metric
import itertools

m = Prophet(changepoint_prior_scale=0.5, seasonality_prior_scale=0.01,interval_width=0.99, holidays=recesso_judiciario)
m.add_country_holidays(country_name='BR')
m.fit(dados)
df_cv = cross_validation(m, initial='1080 days', period='60 days', horizon = '180 days', parallel="processes")

df_p = performance_metrics(df_cv)
df_p.head(5)

In [None]:
fig = plot_cross_validation_metric(df_cv, metric='rmse')
fig.show()