<a href="https://colab.research.google.com/github/thiagoalmeida-TLM/hospedagem_geocode/blob/main/geocode_hospedagem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# *Latitude e longitude separado*

Utilizado para buscar geolocalização de uma base de hospegadem que contém DATA, CHAPA, NOME, CIDADE.

Arquivos: base_ativos.xlsx; hospedagem.xlsx; registro_ponto.xlsx (clockin)

Foi cotejado com base do Clockin e Empregados Ativos.

Resultado retorna todos os horários com sua geolocalização.

Retorna Cidade e Estado.

Aba Entrada: retorna horários em ordem crescente

Aba Saída: retorna horários em ordem decrescente.

In [10]:
# @title
# Instalar dependências (se necessário)
#!pip install openpyxl pandas geopy tqdm

import pandas as pd
import json
from datetime import datetime
import time
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from tqdm.auto import tqdm

# Configurar o tqdm para usar o método .progress_apply()
tqdm.pandas()

print("Iniciando o processamento de dados...")

# --- DICIONÁRIO E FUNÇÕES AUXILIARES ---

ESTADOS_ABREVIADOS = {
    'Acre': 'AC', 'Alagoas': 'AL', 'Amapá': 'AP', 'Amazonas': 'AM',
    'Bahia': 'BA', 'Ceará': 'CE', 'Distrito Federal': 'DF', 'Espírito Santo': 'ES',
    'Goiás': 'GO', 'Maranhão': 'MA', 'Mato Grosso': 'MT', 'Mato Grosso do Sul': 'MS',
    'Minas Gerais': 'MG', 'Pará': 'PA', 'Paraíba': 'PB', 'Paraná': 'PR',
    'Pernambuco': 'PE', 'Piauí': 'PI', 'Rio de Janeiro': 'RJ', 'Rio Grande do Norte': 'RN',
    'Rio Grande do Sul': 'RS', 'Rondônia': 'RO', 'Roraima': 'RR', 'Santa Catarina': 'SC',
    'São Paulo': 'SP', 'Sergipe': 'SE', 'Tocantins': 'TO'
}

def abreviar_estado(nome_estado):
    if not nome_estado:
        return None
    nome_limpo = str(nome_estado).strip().title()
    return ESTADOS_ABREVIADOS.get(nome_limpo, nome_estado)

# Função para agregar HORA, LATITUDE e LONGITUDE juntos em colunas sequenciais
def agregar_hora_lat_lon_juntos(df_ponto):
    resultado = {}
    for i, row in enumerate(df_ponto.itertuples(), start=1):
        hora_str = row.HORA.strftime('%H:%M:%S') if hasattr(row.HORA, 'strftime') else None
        resultado[f'HORA_{i}'] = hora_str
        resultado[f'LATITUDE_{i}'] = row.LATITUDE
        resultado[f'LONGITUDE_{i}'] = row.LONGITUDE
    return pd.Series(resultado)

# FUNÇÃO DE LIMPEZA COM CORREÇÃO DO SettingWithCopyWarning
def limpar_base(df, status, cols_map):
    # Usa .copy() explicitamente para evitar o SettingWithCopyWarning
    df_limpo = df[[c for c in cols_map if c in df.columns]].copy()

    df_limpo['CHAPA_LIMPA'] = df_limpo['CHAPA'].astype(str).str.strip()
    df_limpo['CPF_LIMPO'] = df_limpo['CPF'].astype(str).str.strip()
    df_limpo['DESCRICAO'] = df_limpo['DESCRICAO'].astype(str).str.strip()
    df_limpo['STATUS'] = status
    return df_limpo


# --- FUNÇÕES DE BUSCA E ORDENAÇÃO (Mantidas) ---

def buscar_pontos_entrada(row, ponto_validos):
    chapa = row['CHAPA']
    data = row['DATA']
    pontos = ponto_validos[(ponto_validos['CHAPA'] == chapa) & (ponto_validos['DATA'] == data)]
    if pontos.empty:
        return pd.Series()
    pontos_ordenados = pontos.sort_values(by='HORA', ascending=True)
    return agregar_hora_lat_lon_juntos(pontos_ordenados)

def buscar_pontos_saida(row, ponto_validos):
    chapa = row['CHAPA']
    data = row['DATA']
    pontos = ponto_validos[(ponto_validos['CHAPA'] == chapa) & (ponto_validos['DATA'] == data)]
    if pontos.empty:
        return pd.Series()
    pontos_ordenados = pontos.sort_values(by='HORA', ascending=False)
    return agregar_hora_lat_lon_juntos(pontos_ordenados)

def reorganizar_colunas(df, prefixos=['HORA', 'LATITUDE', 'LONGITUDE', 'LOCALIZACAO']):
    colunas_originais = [col for col in df.columns if not any(col.startswith(p) for p in prefixos)]

    indices = set()
    for col in df.columns:
        for p in prefixos:
            if col.startswith(p + '_'):
                try:
                    idx = int(col.split('_')[1])
                    indices.add(idx)
                except:
                    pass
    indices = sorted(indices)

    colunas_ordenadas = colunas_originais.copy()
    for i in indices:
        for p in prefixos:
            colunas_ordenadas.append(f'{p}_{i}')

    return df[colunas_ordenadas]


# --- ETAPAS DE PROCESSAMENTO DE DADOS ---

# 1. Leitura e Preparação das Bases (Ativos e Demitidos Combinadas)
cols_map = {'CHAPA', 'CPF', 'DESCRICAO'}
try:
    base_ativos = pd.read_excel('base_ativos.xlsx')
    base_demitidos = pd.read_excel('base_demitidos.xlsx')
except FileNotFoundError as e:
    print(f"ERRO: Arquivo {e.filename} não encontrado.")
    exit()

base_ativos_limpa = limpar_base(base_ativos, 'ATIVO', cols_map)
base_demitidos_limpa = limpar_base(base_demitidos, 'DEMITIDO', cols_map)

# Combina as bases, priorizando ATIVOS em caso de CHAPA duplicada
base_completa = pd.concat([base_ativos_limpa, base_demitidos_limpa], ignore_index=True)
base_completa = base_completa.drop_duplicates(subset=['CHAPA_LIMPA'], keep='first')

# Criação dos dicionários de mapeamento a partir da base completa
chapa_to_cpf = dict(zip(base_completa['CHAPA_LIMPA'], base_completa['CPF_LIMPO']))
chapa_to_descricao = dict(zip(base_completa['CHAPA_LIMPA'], base_completa['DESCRICAO']))
cpf_to_chapa = dict(zip(base_completa['CPF_LIMPO'], base_completa['CHAPA_LIMPA']))


# 2. Leitura e Preparação da Hospedagem (Mapeamento CHAPA -> CPF e DESCRICAO)
try:
    hosp_entrada = pd.read_excel('hospedagem.xlsx', sheet_name='Entrada')
    hosp_saida = pd.read_excel('hospedagem.xlsx', sheet_name='Saída')
except FileNotFoundError:
    print("ERRO: Arquivo 'hospedagem.xlsx' não encontrado.")
    exit()

# Limpar CHAPA na Hospedagem
hosp_entrada['CHAPA_LIMPA'] = hosp_entrada['CHAPA'].astype(str).str.strip()
hosp_saida['CHAPA_LIMPA'] = hosp_saida['CHAPA'].astype(str).str.strip()

# Adiciona CPF e DESCRICAO
hosp_entrada['CPF'] = hosp_entrada['CHAPA_LIMPA'].map(chapa_to_cpf).fillna('Não Localizado')
hosp_entrada['DESCRICAO'] = hosp_entrada['CHAPA_LIMPA'].map(chapa_to_descricao).fillna('Não Localizado')
hosp_saida['CPF'] = hosp_saida['CHAPA_LIMPA'].map(chapa_to_cpf).fillna('Não Localizado')
hosp_saida['DESCRICAO'] = hosp_saida['CHAPA_LIMPA'].map(chapa_to_descricao).fillna('Não Localizado')


# Prepara as colunas originais
hosp_entrada['DATA'] = pd.to_datetime(hosp_entrada['DATA']).dt.date
hosp_saida['DATA'] = pd.to_datetime(hosp_saida['DATA']).dt.date
hosp_entrada['CHAPA'] = hosp_entrada['CHAPA_LIMPA']
hosp_saida['CHAPA'] = hosp_saida['CHAPA_LIMPA']
hosp_entrada = hosp_entrada.drop(columns=['CHAPA_LIMPA'])
hosp_saida = hosp_saida.drop(columns=['CHAPA_LIMPA'])


# 3. Leitura e Preparação do Registro de Ponto (mdmpersonid -> CHAPA)
try:
    registro_ponto = pd.read_excel('registro_ponto.xlsx')
except FileNotFoundError:
    print("ERRO: Arquivo 'registro_ponto.xlsx' não encontrado.")
    exit()

def extrair_data_hora_gmt(eventdatestr):
    if pd.isna(eventdatestr): return None, None, None
    try:
        dt = datetime.fromisoformat(eventdatestr[:-6])
        return dt.date(), dt.time(), eventdatestr[-6:]
    except Exception: return None, None, None

def extrair_lat_lon(coord_str):
    if pd.isna(coord_str): return None, None
    try:
        coord = json.loads(str(coord_str).replace("'", '"'))
        return coord.get('lat'), coord.get('lon')
    except: return None, None

registro_ponto[['DATA', 'HORA', 'GMT']] = registro_ponto['eventdatestr'].apply(
    lambda x: pd.Series(extrair_data_hora_gmt(x))
)
registro_ponto[['LATITUDE', 'LONGITUDE']] = registro_ponto['coordinates'].apply(
    lambda x: pd.Series(extrair_lat_lon(x))
)

registro_ponto['mdmpersonid_LIMPO'] = registro_ponto['mdmpersonid'].astype(str).str.strip()
registro_ponto['CHAPA'] = registro_ponto['mdmpersonid_LIMPO'].map(cpf_to_chapa)
registro_ponto['CHAPA'] = registro_ponto['CHAPA'].fillna('Não Localizado CPF')

ponto_validos = registro_ponto[registro_ponto['CHAPA'] != 'Não Localizado CPF']


# 4. Aplicação da Busca de Pontos com Ordenação
print("\nBuscando e ordenando pontos de registro por horário...")
dados_pontos_entrada = hosp_entrada.apply(lambda row: buscar_pontos_entrada(row, ponto_validos), axis=1)
dados_pontos_saida = hosp_saida.apply(lambda row: buscar_pontos_saida(row, ponto_validos), axis=1)

resultado_entrada = pd.concat([hosp_entrada, dados_pontos_entrada], axis=1)
resultado_saida = pd.concat([hosp_saida, dados_pontos_saida], axis=1)


# --- 5. GEOLOCALIZAÇÃO (Formatando como CIDADE/UF) ---

geolocator = Nominatim(user_agent="meu_app_geolocalizacao", timeout=10)
reverse = RateLimiter(geolocator.reverse, min_delay_seconds=1)

cache_coords = {}

def obter_cidade_estado(lat, lon):
    if pd.isna(lat) or pd.isna(lon): return None

    chave = (round(lat, 5), round(lon, 5))
    if chave in cache_coords: return cache_coords[chave]

    try:
        location = reverse(chave, language='pt', exactly_one=True)
        if location and location.raw and 'address' in location.raw:
            addr = location.raw['address']
            cidade = addr.get('city') or addr.get('town') or addr.get('village') or addr.get('municipality')
            estado_completo = addr.get('state')
            estado_abreviado = abreviar_estado(estado_completo)

            localizacao = f"{cidade}/{estado_abreviado}" if cidade and estado_abreviado else (cidade or estado_abreviado)

            cache_coords[chave] = localizacao
            return localizacao
    except Exception as e:
        pass

    cache_coords[chave] = None
    return None

def adicionar_cidade_estado(df, nome_aba):
    max_pontos = 0
    for col in df.columns:
        if col.startswith('LATITUDE_'):
            try: max_pontos = max(max_pontos, int(col.split('_')[1]))
            except: pass

    for i in range(1, max_pontos + 1):
        lat_col, lon_col = f'LATITUDE_{i}', f'LONGITUDE_{i}'
        localizacao_col = f'LOCALIZACAO_{i}'

        if lat_col in df.columns and lon_col in df.columns:

            df[localizacao_col] = None

            indices_para_buscar = df[
                (df[lat_col].notna()) & (df[lon_col].notna())
            ].index

            df_para_buscar = df.loc[indices_para_buscar].copy()

            if not df_para_buscar.empty:
                print(f"\nBuscando coordenadas (coluna {i}) para a aba '{nome_aba}' ({len(df_para_buscar)} pontos a buscar)...")

                resultados_busca = df_para_buscar.progress_apply(
                    lambda row: obter_cidade_estado(row[lat_col], row[lon_col]), axis=1
                )

                df.loc[indices_para_buscar, localizacao_col] = resultados_busca
            else:
                 print(f"\nNenhum ponto válido encontrado na coluna {i} da aba '{nome_aba}'. Pulando busca.")

    return df

print("\n" + "="*50)
print("Iniciando consulta Nominatim para LOCALIZAÇÃO (CIDADE/UF) com barra de progresso...")
print("Isto pode levar tempo devido ao limite de 1 segundo por consulta.")
print("="*50)

resultado_entrada = adicionar_cidade_estado(resultado_entrada, "Entrada")
resultado_saida = adicionar_cidade_estado(resultado_saida, "Saída")

print("\nConsulta concluída.")

# Reorganizar colunas
resultado_entrada = reorganizar_colunas(resultado_entrada)
resultado_saida = reorganizar_colunas(resultado_saida)

# --- 6. Exportar resultados ---
print("\nExportando resultados para o arquivo Excel...")
with pd.ExcelWriter('resultado_hospedagem_com_pontos_localizacao.xlsx', engine='openpyxl') as writer:
    resultado_entrada.to_excel(writer, sheet_name='Entrada', index=False)
    resultado_saida.to_excel(writer, sheet_name='Saída', index=False)

print("\nProcessamento concluído.")
print("Arquivo gerado: resultado_hospedagem_com_pontos_localizacao.xlsx com abas 'Entrada' e 'Saída'.")

Iniciando o processamento de dados...

Buscando e ordenando pontos de registro por horário...

Iniciando consulta Nominatim para LOCALIZAÇÃO (CIDADE/UF) com barra de progresso...
Isto pode levar tempo devido ao limite de 1 segundo por consulta.

Buscando coordenadas (coluna 1) para a aba 'Entrada' (229 pontos a buscar)...


  0%|          | 0/229 [00:00<?, ?it/s]


Buscando coordenadas (coluna 2) para a aba 'Entrada' (203 pontos a buscar)...


  0%|          | 0/203 [00:00<?, ?it/s]


Buscando coordenadas (coluna 3) para a aba 'Entrada' (3 pontos a buscar)...


  0%|          | 0/3 [00:00<?, ?it/s]


Buscando coordenadas (coluna 4) para a aba 'Entrada' (1 pontos a buscar)...


  0%|          | 0/1 [00:00<?, ?it/s]


Buscando coordenadas (coluna 1) para a aba 'Saída' (247 pontos a buscar)...


  0%|          | 0/247 [00:00<?, ?it/s]


Buscando coordenadas (coluna 2) para a aba 'Saída' (213 pontos a buscar)...


  0%|          | 0/213 [00:00<?, ?it/s]


Buscando coordenadas (coluna 3) para a aba 'Saída' (2 pontos a buscar)...


  0%|          | 0/2 [00:00<?, ?it/s]


Consulta concluída.

Exportando resultados para o arquivo Excel...

Processamento concluído.
Arquivo gerado: resultado_hospedagem_com_pontos_localizacao.xlsx com abas 'Entrada' e 'Saída'.


In [None]:
# @title
import pandas as pd

print("Iniciando teste de mapeamento completo (Ativos + Demitidos)...")

# --- 1. Leitura e Preparação das Bases (Ativos e Demitidos Combinadas) ---
try:
    # Leitura das bases
    base_ativos = pd.read_excel('base_ativos.xlsx')
    base_demitidos = pd.read_excel('base_demitidos.xlsx')
except FileNotFoundError as e:
    print(f"ERRO: Arquivo {e.filename} não encontrado. Verifique o nome do arquivo.")
    exit()

cols_map = {'CHAPA', 'CPF', 'DESCRICAO'}

# Função auxiliar para limpar e padronizar
def limpar_base(df, status):
    df_limpo = df[[c for c in cols_map if c in df.columns]]
    df_limpo['CHAPA_LIMPA'] = df_limpo['CHAPA'].astype(str).str.strip()
    df_limpo['CPF_LIMPO'] = df_limpo['CPF'].astype(str).str.strip()
    df_limpo['DESCRICAO'] = df_limpo['DESCRICAO'].astype(str).str.strip()
    df_limpo['STATUS'] = status
    return df_limpo[['CHAPA_LIMPA', 'CPF_LIMPO', 'DESCRICAO', 'STATUS']]

base_ativos_limpa = limpar_base(base_ativos, 'ATIVO')
base_demitidos_limpa = limpar_base(base_demitidos, 'DEMITIDO')

# Combina as bases, priorizando ATIVOS em caso de CHAPA duplicada
base_completa = pd.concat([base_ativos_limpa, base_demitidos_limpa], ignore_index=True)
base_completa = base_completa.drop_duplicates(subset=['CHAPA_LIMPA'], keep='first')

# Dicionário CHAPA -> CPF
chapa_to_cpf = dict(zip(base_completa['CHAPA_LIMPA'], base_completa['CPF_LIMPO']))

print(f"Base Completa (Ativos + Demitidos) carregada. Total de {len(base_completa)} registros únicos (CHAPA).")


# --- 2. Leitura e Preparação da Hospedagem ---
try:
    hosp_entrada = pd.read_excel('hospedagem.xlsx', sheet_name='Entrada')
    hosp_saida = pd.read_excel('hospedagem.xlsx', sheet_name='Saída')
except FileNotFoundError:
    print("ERRO: Arquivo 'hospedagem.xlsx' não encontrado.")
    exit()

# Limpar CHAPA na Hospedagem
hosp_entrada['CHAPA_LIMPA'] = hosp_entrada['CHAPA'].astype(str).str.strip()
hosp_saida['CHAPA_LIMPA'] = hosp_saida['CHAPA'].astype(str).str.strip()


# --- 3. Mapeamento e Análise ---

def analisar_resultados(df, nome_aba):
    # Tenta mapear o CPF usando a CHAPA limpa da base completa
    df['CPF_ENCONTRADO'] = df['CHAPA_LIMPA'].map(chapa_to_cpf)

    # Conta quantos CPFs foram encontrados (não são NaN)
    encontrados = df['CPF_ENCONTRADO'].notna().sum()
    nao_encontrados = df['CPF_ENCONTRADO'].isna().sum()
    total = len(df)

    print(f"\n--- Resultado para a aba '{nome_aba}' ---")
    print(f"Total de registros: {total}")
    print(f"CPF/CHAPA encontrado(s): {encontrados}")
    print(f"CPF/CHAPA NÃO encontrado(s): {nao_encontrados}")

    if nao_encontrados > 0:
        # Exibe as CHAPAS que falharam
        chapas_falharam = df[df['CPF_ENCONTRADO'].isna()]['CHAPA_LIMPA'].unique()
        print(f"\nCHAPAs únicas que AINDA falharam no mapeamento:")
        # Limita a exibição a 10 para não sobrecarregar
        print(chapas_falharam[:10].tolist())

    return nao_encontrados

falhas_entrada = analisar_resultados(hosp_entrada, 'Entrada')
falhas_saida = analisar_resultados(hosp_saida, 'Saída')

print("\nTeste de mapeamento concluído.")

if falhas_entrada == 0 and falhas_saida == 0:
    print("\n✅ SUCESSO! Todos os registros de hospedagem foram mapeados para um CPF/DESCRICAO na base combinada.")
    print("Você pode prosseguir com a execução do código completo.")
else:
    print("\n⚠️ ATENÇÃO: Ainda há registros que não puderam ser mapeados.")
    print("Isso geralmente indica que a CHAPA não existe em NENHUMA das bases (ativos ou demitidos), ou que o nome do campo nas bases originais está incorreto.")