In [None]:
import pandas as pd
import numpy as np
pd.options.display.float_format = '{:.2f}'.format
from datetime import datetime, timedelta
import locale
import holidays
import time

locale.setlocale(locale.LC_ALL, '')  # Configura a localização padrão do sistema

In [None]:
def calcular_dias_nao_contidos(intervalo_principal, intervalos_menores):
    #data_inicio_principal = datetime.strptime(intervalo_principal[0], "%Y-%m-%d")
    #data_fim_principal = datetime.strptime(intervalo_principal[1], "%Y-%m-%d")
    
    data_inicio_principal = intervalo_principal[0]
    data_fim_principal = intervalo_principal[1]

    dias_nao_contidos = 0

    # Percorre cada dia dentro do intervalo principal
    data_atual = data_inicio_principal
    while data_atual <= data_fim_principal:
        contido_em_intervalo = False

        # Verifica se a data atual está contida em algum intervalo menor
        for intervalo in intervalos_menores:
            #data_inicio_intervalo = datetime.strptime(intervalo[0], "%Y-%m-%d")
            #data_fim_intervalo = datetime.strptime(intervalo[1], "%Y-%m-%d")
            data_inicio_intervalo = intervalo[0]
            data_fim_intervalo = intervalo[1]

            if data_inicio_intervalo <= data_atual <= data_fim_intervalo:
                contido_em_intervalo = True
                break

        if not contido_em_intervalo:
            dias_nao_contidos += 1

        data_atual += timedelta(days=1)

    return dias_nao_contidos


def gerar_vetor_dias_efetivos_do_tecnico(vetor_tuplas):
    
    result = []

    for i in range(len(vetor_tuplas)):    
        intervalo_principal = vetor_tuplas[i]    

        if (i + 1) < len(vetor_tuplas):
            intervalos_menores = vetor_tuplas[i+1:]
            dias_nao_contidos = calcular_dias_nao_contidos(intervalo_principal, intervalos_menores)        
        else:
            dias_nao_contidos = (vetor_tuplas[i][1] - vetor_tuplas[i][0]).days

        result.append(dias_nao_contidos)
    
    return result


In [None]:
#Versão melhorada da contagem de datas - usando conjuntos (sets)

def anos_intervalos(intervalos):
    anos = set() 

    for inicio, fim in intervalos:
        data_inicio = pd.to_datetime(inicio)
        data_fim = pd.to_datetime(fim)
        anos.update([data_inicio.year, data_fim.year])

    anos_list = sorted(list(anos))
    
    return anos_list
    

def estimar_dias_efetivos_do_tecnico(vetor_tuplas):

    intervalos = vetor_tuplas
    
    # Set com os dias já contidos em intervalos anteriores
    dias_cobertos = set()
    
    # lista de resultados
    num_dias_novos_list = []

    feriados_brasil = holidays.Brazil(years = anos_intervalos(intervalos))

    #Lista é invertida, para começar pelo fim, com as datas mais antigas - reversed()
    for i, (inicio, fim) in enumerate(reversed(intervalos), start=1):

        dias_intervalo = set(pd.date_range(inicio, fim, freq='B')) # freq='B' -> dias úteis
        dias_intervalo_sem_feriado = set([d for d in dias_intervalo if d not in feriados_brasil])
        dias_intervalo = dias_intervalo_sem_feriado
      
        dias_novos = dias_intervalo - dias_cobertos #operação sobre conjuntos: obtém os dias ainda não cobertos
        dias_cobertos.update(dias_intervalo)
        
        num_dias_novos_list.insert(0, len(dias_novos))

    
    return num_dias_novos_list

def remover_acentos_coluna(df, coluna):
    # Remove acentos e converte para MAIÚSCULAS os valores de uma coluna de texto em um DataFrame pandas.
    return (
        df[coluna]
        .astype(str)                       # garante tipo string
        .str.normalize('NFD')              # separa acentos
        .str.encode('ascii', 'ignore')     # remove acentos
        .str.decode('utf-8')               # volta para string
        .str.upper()                       # converte para maiúsculas
    )


CARREGAMENTO DE DADOS LOCAIS

In [None]:
#LEITURA DE DADOS LOCAIS
data = pd.read_excel(r'C:\Users\pftbm\OneDrive - bnb.gov.br\Convergente\Revisão da produtividade\Dados\Fonte principal\data.xlsx')


LEITURA DE DADOS VIA GOOGLE COLAB

In [None]:
#LEITURA VIA GOOGLE COLAB
from google.colab import files
uploaded = files.upload()
data = pd.read_excel('data.xlsx')

In [None]:
data.columns

In [None]:
data_sorted = data.sort_values(by=['RESP. TÉCNICO', 'CONCLUSÃO', 'ALOCAÇÃO', 'VALOR DE REFERÊNCIA'], 
                               ascending=[True, False, True, False])

# Definindo a formatação da coluna 'Numero'

#data_sorted['VALOR DE REFERÊNCIA'] = data_sorted['VALOR DE REFERÊNCIA'].apply(lambda x: '{:,.2f}'.format(x).replace(',', ' ').replace('.', ',').replace(' ', '.'))


In [None]:
est = data_sorted['VALOR DE REFERÊNCIA'].describe()
print(est)

valor = est.quantile(0.05)
print(locale.format_string("%.2f", valor, grouping=True))

valor = est.quantile(0.9)
print(locale.format_string("%.2f", valor, grouping=True))


In [None]:
data_sorted.shape[0]

In [None]:
data_sorted.loc[data_sorted['CLIENTE'] == 'MAXIMILIANO RIBEIRO PEREIRA']

### Filtro 1

In [None]:
# FILTROS INICIAIS (executados previamente à estimativa dos prazos de execução):
# 1. MANTER APENAS 'STATUS' = 'Laudo concluído' OU 'Parec concluído'
# 2. REMOVER 'ATIVIDADE' = 'CQ' e 'Sensoriamento'

data_sorted = data_sorted[( (data_sorted.STATUS == 'Concluída') | 
                         (data_sorted.STATUS == 'Parecer gerencial concluído') |
                         (data_sorted.STATUS == 'Em monitoração') ) &
                        ( (data_sorted.ATIVIDADE != 'CQ') & 
                         (data_sorted.ATIVIDADE != 'Av. Qual.') & 
                         (data_sorted.ATIVIDADE != 'Sensoriamento') )].copy()

#data_sorted.loc[(data_sorted['STATUS'] == 'Concluída') & (data_sorted['ALOCAÇÃO'].isna()),'ALOCAÇÃO'] = data_sorted.loc[(data_sorted['STATUS'] == 'Concluída') & (data_sorted['ALOCAÇÃO'].isna()),'SOLICITAÇÃO']

condicao = (data_sorted['STATUS'] == 'Concluída') & (data_sorted['ALOCAÇÃO'].isna())

data_sorted.loc[condicao, 'ALOCAÇÃO'] = data_sorted.loc[condicao, 'SOLICITAÇÃO']

data_sorted['dias efetivos'] = -1
data_sorted['dias uteis efetivos'] = -1
#data_sorted['VALOR DE REFERÊNCIA'] = data_sorted['VALOR DE REFERÊNCIA'].apply(lambda x: '{:,.2f}'.format(x).replace(',', ' ').replace('.', ',').replace(' ', '.'))
data_sorted.shape

In [None]:
data_sorted.loc[data_sorted['CLIENTE'] == 'MAXIMILIANO RIBEIRO PEREIRA']

In [None]:
tecnicos = data_sorted['RESP. TÉCNICO'].unique().tolist()
#tecnicos = ['Ana Emilia - F112933', 'Efren - F105015', 'Wagner - F157643']
vetor_tuplas = []

inicio = time.time()

for tecnico in tecnicos:
    data_aux = data_sorted[data_sorted['RESP. TÉCNICO'] == tecnico][['ALOCAÇÃO', 'CONCLUSÃO']].copy()
    
    vetor_tuplas = [tuple(x[1:]) for x in data_aux.itertuples()]
    
    dias_efetivos = gerar_vetor_dias_efetivos_do_tecnico(vetor_tuplas)
    dias_uteis_efetivos = estimar_dias_efetivos_do_tecnico(vetor_tuplas) #Função nova
    
    data_sorted.loc[data_sorted['RESP. TÉCNICO'] == tecnico, 'dias efetivos'] = dias_efetivos
    data_sorted.loc[data_sorted['RESP. TÉCNICO'] == tecnico, 'dias uteis efetivos'] = dias_uteis_efetivos

fim = time.time()
print(f"Tempo de execução: {fim - inicio:.2f} segundos")

In [None]:
data_aux_pf = data_sorted[data_sorted['RESP. TÉCNICO'] == 'Paulo Fagner (Pf) - F110523 (U)'][['ALOCAÇÃO', 'CONCLUSÃO']].copy()
vetor_tuplas_pf = [tuple(x[1:]) for x in data_aux_pf.itertuples()]

i = -2
print(vetor_tuplas_pf[i])

datas = pd.date_range(vetor_tuplas_pf[i][0], vetor_tuplas_pf[i][1])
print(len(datas))
datas_uteis = pd.date_range(vetor_tuplas_pf[i][0], vetor_tuplas_pf[i][1], freq='B')
print(len(datas_uteis))

In [None]:
#testes da função nova
dias_efetivos_pf = gerar_vetor_dias_efetivos_do_tecnico(vetor_tuplas_pf)
for dia in dias_efetivos_pf:
    print(dia)

In [None]:
data_sorted.loc[data_sorted['CLIENTE'] == 'MAXIMILIANO RIBEIRO PEREIRA']

In [None]:
data_sorted[data_sorted['RESP. TÉCNICO'] == 'Paulo Fagner (Pf) - F110523 (U)'][['RESP. TÉCNICO', 'ATIVIDADE', 'CLIENTE', 
                                                                                  'PRAZO EXECUÇÃO DD', 'dias efetivos', 'dias efetivos novo']][:60]

### Filtro 2

In [None]:
data_sorted.shape

In [None]:
# FILTROS
# 1. Remover 'VALOR DE REFERÊNCIA' <= 1 OU 'VALOR DE REFERÊNCIA' > 500.000.000
# 2. Manter apenas 0 < 'dias efetivos' < 30
# 3. Remover 'dias efetivos' < 3 SE 'ATIVIDADE' == 'Aval.' ou 'Crít.'

# 1 
data_teste = data_sorted.drop(data_sorted[(data_sorted['VALOR DE REFERÊNCIA'] <= 1) ].index)
print(data_teste.shape)

# 2
data_teste = data_teste[(data_teste['dias efetivos'] > 0) & (data_teste['dias efetivos'] < 30)].copy()
print(data_teste.shape)

# 3
data_teste = data_teste.drop(data_teste[(data_teste['dias efetivos'] < 3) & 
                                        ((data_teste['ATIVIDADE'] == 'Aval.') | (data_teste['ATIVIDADE'] == 'Crít.') )].index)

print(data_teste.shape)

data_teste[:20]

In [None]:
a = pd.pivot_table(data=data_teste, index='RESP. TÉCNICO', columns='ATIVIDADE', values='dias efetivos', aggfunc='mean', margins=False)

#a.div(a.sum(axis=1), axis=0).round(3) #percentuais por linha
a

SALVAR O DATA FRAME

In [None]:
with pd.ExcelWriter(r'C:\Users\pftbm\OneDrive - bnb.gov.br\Convergente\Revisão da produtividade\Dados\Cronos out 2025\data_dias_efetivos_filtro.xlsx') as writer:
    data_teste.to_excel(writer) 

#### CRIAR DATA FRAME DE AVALIAÇÕES E INCLUIR VARIÁVEIS DE DISPONIBILIDADE DE DADOS

In [None]:
def calcular_numero_de_avaliacoes_por_municipio(df_municipio_data):
    result_cont = []
    contagem = {}
    
    for municipio in df_municipio_data['MUNICÍPIO(S)']:
        if municipio not in contagem:
            contagem[municipio] = 0
        else:
            contagem[municipio] += 1

        result_cont.append(contagem[municipio])
    
    return result_cont


def calcular_numero_de_avaliacoes_por_municipio_no_periodo(df_municipio_data, periodo=2):
    result_cont = []
    contagem = {}
    
    for i, linha in df_municipio_data.iterrows():
        municipio = linha['MUNICÍPIO(S)']
        conclusao = linha['CONCLUSÃO']
        limite = conclusao - pd.DateOffset(years=periodo)

        if municipio not in contagem:
            contagem[municipio] = [conclusao]
        else:
            datas = contagem[municipio]
            datas_filtradas = [d for d in datas if d >= limite]
            contagem[municipio] = datas_filtradas
            contagem[municipio].append(conclusao)

        result_cont.append(len(contagem[municipio])-1 )

    
    return result_cont


In [None]:
#FILTRAR AS AVALIAÇÕES URBANAS E MISTAS
data_teste.columns
print(data_teste.shape)

condicao = ((data_teste['ATIVIDADE'] == 'Aval. Bens') | (data_teste['ATIVIDADE'] == 'Aval.Im.') | (data_teste['ATIVIDADE'] == 'Conv. Aval.')) & ((data_teste['ÁREA'] == 'Urbano') | (data_teste['ÁREA'] == 'Mista'))

data_teste_avaliacoes_urbanas = data_teste.loc[condicao].copy()

print(data_teste_avaliacoes_urbanas.shape)



In [None]:
#REMOVER AVALIAÇÕES SEM MUNICÍPIO INFORMADO (ORIGEM SIAT)

data_teste_avaliacoes_urbanas = data_teste_avaliacoes_urbanas.loc[data_teste_avaliacoes_urbanas['MUNICÍPIO(S)'].notna()].copy()
data_teste_avaliacoes_urbanas.shape

In [None]:
# CONTAR AVALIAÇÕES POR MUNICÍPIOS
df_municipio_data = data_teste_avaliacoes_urbanas[['MUNICÍPIO(S)', 'CONCLUSÃO']].copy()
df_municipio_data['id'] = [(i+1) for i in range(len(df_municipio_data))]
df_municipio_data_sorted = df_municipio_data.sort_values(by=['CONCLUSÃO'], ascending=[True]).copy()
df_municipio_data_sorted['num_avaliacoes'] = calcular_numero_de_avaliacoes_por_municipio(df_municipio_data_sorted)
df_municipio_data_sorted['num_avaliacoes_2_anos'] = calcular_numero_de_avaliacoes_por_municipio_no_periodo(df_municipio_data_sorted, 2)
df_municipio_data_sorted['num_avaliacoes_1_ano'] = calcular_numero_de_avaliacoes_por_municipio_no_periodo(df_municipio_data_sorted, 1)
#voltando à ordem original
df_municipio_data_sorted.sort_values(by=['id'], ascending=[True], inplace=True)

df_municipio_data_sorted.head(5)

In [None]:
#Adicionar campo calculado ao data frame original
data_teste_avaliacoes_urbanas['num_acumulado_avaliacoes'] = df_municipio_data_sorted['num_avaliacoes']
data_teste_avaliacoes_urbanas['num_acumulado_avaliacoes 2 anos'] = df_municipio_data_sorted['num_avaliacoes_2_anos']
data_teste_avaliacoes_urbanas['num_acumulado_avaliacoes 1 ano'] = df_municipio_data_sorted['num_avaliacoes_1_ano']

#### VARIÁVEIS EXTRÍNSECAS - CARACTERÍSTICAS DOS MUNICÍPIOS - FONTES EXTERNAS

In [None]:
data_teste_avaliacoes_urbanas.columns

In [None]:
data_teste_avaliacoes_urbanas['MUNICÍPIO(S)']

In [None]:
data_ibge = pd.read_excel(r'C:\Users\pftbm\OneDrive - bnb.gov.br\Convergente\Revisão da produtividade\Dados\IBGE\variaveis ibge final prod.xlsx')

In [None]:
data_ibge.columns

In [None]:
data_ibge.shape

In [None]:
data_ibge['municipio/uf'] = remover_acentos_coluna(data_ibge,'municipio/uf')
data_ibge

In [None]:
df_merge = pd.merge(
    data_teste_avaliacoes_urbanas,
    data_ibge,  # seleciona colunas desejadas - todas, no caso
    left_on='MUNICÍPIO(S)',                # chave do df1
    right_on='municipio/uf',                   # chave do df2
    how='left'
)

In [None]:
df_merge.columns

In [None]:
df_merge.drop(columns=['ID'], inplace=True)


In [None]:
df_merge.columns

In [None]:
data_teste_avaliacoes_urbanas = df_merge.copy()

SALVAR ARQUIVO LOCALMENTE

In [None]:
with pd.ExcelWriter(r'C:\Users\pftbm\OneDrive - bnb.gov.br\Convergente\Revisão da produtividade\Dados\Fonte principal\data_avaliacoes_URBANAS.xlsx') as writer:
    data_teste_avaliacoes_urbanas.to_excel(writer) 

BAIXAR ARQUIVOS COM O GOOGLE COLAB

In [None]:
from google.colab import files

data_teste_avaliacoes_urbanas.to_excel('data_avaliacoes_URBANAS.xlsx', index=False)

files.download('data_avaliacoes_URBANAS.xlsx')

## MISCELÂNEA

In [None]:
#Para remover hora de uma columa datetime
data_sorted['CONCLUSÃO'] = pd.to_datetime(data_sorted['CONCLUSÃO']).dt.date
data_sorted['ALOCAÇÃO'] = pd.to_datetime(data_sorted['ALOCAÇÃO']).dt.date


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Dados de exemplo
#dados = data_sorted['VALOR DE REFERÊNCIA'].copy()
#dados = dados[(dados > 20000) & (dados < 500000000)].copy()
dados = data_sorted['PRAZO EXECUÇÃO'].copy()
dados = dados[dados > 1]

# Plotando o histograma
fig, ax = plt.subplots(figsize=(8, 6))
sns.set_theme(style="darkgrid")
sns.histplot(dados)

# Exibindo o gráfico
plt.show()

In [None]:

# DataFrame de exemplo
df = data_wagner_dates #pd.DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c']})

# Convertendo o DataFrame em um vetor de tuplas
vetor_tuplas = [tuple(x[1:]) for x in df.itertuples()]

# Exibindo o vetor de tuplas
print(vetor_tuplas[0][0])

In [None]:
data_sorted['RESPONSÁVEL TÉCNICO'].unique().tolist()


In [None]:
data_sorted['dias efetivos'] = -1
data_sorted.loc[data_sorted['RESPONSÁVEL TÉCNICO'] == 'Paulo Fagner (Pf) - F110523', 'dias efetivos'] = x
data_sorted[data_sorted['RESPONSÁVEL TÉCNICO'] == 'Romulo - F113875']

In [None]:
x = [i for i in range(26)]

In [None]:
def anos_intervalos(intervalos):
    anos = set() 

    for inicio, fim in intervalos:
        data_inicio = pd.to_datetime(inicio)
        data_fim = pd.to_datetime(fim)
        anos.update([data_inicio.year, data_fim.year])

    anos_list = sorted(list(anos))
    
    return anos_list
    

def calcular_dias_efetivos_teste(intervalos):

    # Conjunto acumulado de dias já cobertos pelos intervalos anteriores
    dias_cobertos = set()
    
    # Lista para armazenar resultados
    resultados = []
    num_dias_novos_list = []
    
    feriados_brasil = holidays.Brazil(years = anos_intervalos(intervalos))
    
    for i, (inicio, fim) in enumerate(reversed(intervalos), start=1):
        
        dias_intervalo = set(pd.date_range(inicio, fim, freq='B')) # freq='B' -> dias úteis
        dias_intervalo_sem_feriado = set([d for d in dias_intervalo if d not in feriados_brasil])
        dias_intervalo = dias_intervalo_sem_feriado
        # Agora queremos apenas os dias que NÃO estão cobertos pelos intervalos já considerados
        dias_novos = dias_intervalo - dias_cobertos
        dias_cobertos.update(dias_intervalo)
        
        num_dias_novos_list.insert(0, len(dias_novos))
        
        #adiciona no início da lista, para voltar à ordem original
        resultados.insert(0, {
            "intervalo": i,
            "inicio": inicio,
            "fim": fim,
            "dias_novos": len(dias_novos),
            "total_dias_intervalo": len(dias_intervalo)
        })

    df_resultados = pd.DataFrame(resultados)
        
    return df_resultados
        
intervalos = [
    ("2022-10-15", "2022-10-18"),
    ("2022-09-30", "2022-10-13"),
    ("2022-10-01", "2022-10-10"),
    ("2022-09-30", "2022-10-08"),
    ("2022-09-25", "2022-10-06"),
]

resultados = calcular_dias_efetivos_teste(intervalos)
print(resultados)

In [None]:
intervalos = [
    ("2025-10-15", "2025-10-16"),
    ("2025-09-30", "2025-10-13"),
    ("2025-10-01", "2025-10-10"),
    ("2025-09-30", "2025-10-08"),
    ("2024-09-24", "2025-10-06"),
]

anos = set() 

for inicio, fim in intervalos:
    data_inicio = pd.to_datetime(inicio)
    data_fim = pd.to_datetime(fim)
    anos.update([data_inicio.year, data_fim.year])

anos_list = sorted(list(anos))

print(anos_list)

feriados_brasil = holidays.Brazil(years = anos_list)

dias_uteis_sem_feriado = [d for d in datas if d not in feriados_brasil]