In [None]:
"""
Autor: Lucas Moreira
Projeto: ETL de dados Fenabrave (Novos e Seminovos/Usados)
Descrição: Notebook de captura dos dados no PDF e tratamento
Data: 2025-06
"""

import urllib
import fitz  #PyMuPDF
import sqlite3
import os
import shutil
import pandas as pd
import re
import gspread
from gspread_dataframe import set_with_dataframe
from oauth2client.service_account import ServiceAccountCredentials

### Reading PDFs

In [None]:
path_used = 'input/usados'
lista_arquivos = os.listdir(path_used)

path_new = 'input/novos'
lista_arquivos_new = os.listdir(path_new)

### Seminovos e Usados

1. Cria função para captura da tabela principal (consolidado por segmento)
2. Extrai dados de cada arquivo
3. Extrai o ranking dos 50 Autos e 50 Comerciais Leves mais vendidos no mês

In [None]:
def extract_total_used(texto, nome_arquivo):
    """"
    Extrai via regex os dados consolidados do mês para o total de seminovos e usados transacionados
    """
    total_used = []
    pattern = re.compile(
        r"(A\)\s*Autos|B\)\s*Com\.?\s*Leves|A\s*\+\s*B|C\)\s*Caminh[oõ]es|D\)\s*Ônibus|C\s*\+\s*D|Subtotal|E\)\s*Motos|F\)\s*Impl\.?\s*Rod\.?|Outros|Total)\s+([\d.]+)",
        re.IGNORECASE
    )

    for match in pattern.finditer(texto):
        segmento = match.group(1).strip()
        total = int(match.group(2).replace('.', ''))
        total_used.append({
            "segmento": segmento,
            "total": total,
            "arquivo": nome_arquivo
        })

        # Interrompe após os primeiros registros de interesse
        if segmento.lower() == "total" or len(total_used) == 11:
            break

    return total_used


# Coleta os dados de todos os PDFs
total_data = []

for nome_arquivo in lista_arquivos:
    if nome_arquivo.endswith(".pdf"):
        caminho_pdf = os.path.join(path_used, nome_arquivo)
        try:
            doc = fitz.open(caminho_pdf)
            texto_total = ""
            for pagina in doc:
                texto_total += pagina.get_text("text")
            doc.close()

            resultados = extract_total_used(texto_total, nome_arquivo)
            total_data.extend(resultados)

        except Exception as e:
            print(f"Erro ao processar {nome_arquivo}: {e}")

df_total_used = pd.DataFrame(total_data)

In [None]:
# Dicionário para renomear os segmentos
rename_dict = {
    "A) Autos": "Autos",
    "B) Com. Leves": "Comerciais Leves",
    "C) Caminhões": "Caminhões",
    "D) Ônibus": "Ônibus",
    "E) Motos": "Motos",
    "F) Impl. Rod.": "Implementos Rodoviários",
    "Outros": "Outros"
}
df_total_used["segmento"] = df_total_used["segmento"].str.strip().replace(rename_dict)

# Remove linhas com segmentos agregados
excluir = {"A + B", "C + D", "Subtotal", "Total"}
df = df_total_used[~df_total_used["segmento"].isin(excluir)]

# Extrai o padrão 'YYYY_MM'
df_total_used["ano_mes"] = df_total_used["arquivo"].apply(lambda x: re.search(r"\d{4}_\d{2}", x).group(0) if re.search(r"\d{4}_\d{2}", x) else "N/A")
df_total_used.drop(['arquivo'], axis=1, inplace=True)
df_total_used.reset_index(drop=True, inplace=True)

# Pivota por ano_mes
pivot_df_total_used = df_total_used.pivot(index='ano_mes', columns='segmento', values='total').reset_index()

In [None]:
def extract_ranking_used_autos(text):
    """"
    Extrai via regex os dados do ranking dos 50 Autos mais comercializados no mês
    """
    lines = text.splitlines()
    pattern = r'(\d+º)\s+([\w\s-]+)\s+([\d.]+)'
    data = re.findall(pattern, "\n".join(lines))
    return data[:50]

all_data = []

for nome_arquivo in lista_arquivos:
    if nome_arquivo.endswith('.pdf'):
        caminho_pdf = os.path.join(path_used, nome_arquivo)

        pdf_document = fitz.open(caminho_pdf)

        todo_texto = ""
        for num_pagina in range(pdf_document.page_count):
            pagina = pdf_document[num_pagina]
            todo_texto += pagina.get_text("text")

        pdf_document.close()

        data = extract_ranking_used_autos(todo_texto)

        if data:
            match = re.search(r'(\d{4}_\d{2})', nome_arquivo)
            if match:
                ano_mes = match.group(1)
            else:
                print(f"Aviso: padrão 'YYYY_MM' não encontrado em {nome_arquivo}")
                continue  # pula esse arquivo

            for i, dados_entrada in enumerate(data, start=1):
                lista_entrada = list(dados_entrada)
                lista_entrada[0] = f"{i}º"
                lista_entrada.append(ano_mes)
                all_data.append(lista_entrada)

used_autos = pd.DataFrame(all_data, columns=["ranking", "model", "total", "ano_mes"])

In [None]:
def extract_ranking_used_light(text):
    """"
    Extrai via regex os dados do ranking dos 50 Comerciais Leves mais comercializados no mês
    """
    blocos = re.split(r'(?=\d{1,2}º)', text)
    resultados = []
    for bloco in blocos:
        match = re.search(r'(\d{1,2}º)[\s\n]+([A-Z0-9 \-]+)[\s\n]+([\d.]+)', bloco)
        if match:
            ranking = match.group(1)
            modelo = match.group(2).strip()
            total = match.group(3).replace('.', '')
            resultados.append((ranking, modelo, int(total)))
    return resultados[50:]

all_data = []

for nome_arquivo in lista_arquivos:
    if nome_arquivo.endswith('.pdf'):
        caminho_pdf = os.path.join(path_used, nome_arquivo)

        pdf_document = fitz.open(caminho_pdf)

        todo_texto = ""
        for num_pagina in range(pdf_document.page_count):
            pagina = pdf_document[num_pagina]
            todo_texto += pagina.get_text("text")

        pdf_document.close()

        data = extract_ranking_used_light(todo_texto)

        if data:
            match = re.search(r'(\d{4}_\d{2})', nome_arquivo)
            if match:
                ano_mes = match.group(1)
            else:
                print(f"Aviso: padrão 'YYYY_MM' não encontrado em {nome_arquivo}")
                continue

            for i, dados_entrada in enumerate(data, start=1):
                lista_entrada = list(dados_entrada)
                lista_entrada[0] = f"{i}º"
                lista_entrada.append(ano_mes)
                all_data.append(lista_entrada)

used_leves = pd.DataFrame(all_data, columns=["ranking", "model", "total", "ano_mes"])

### Emplacamentos

1. Utiliza da mesma função para capturar a tabela principal (consolidado por segmento)
2. Extrai a tabela de de cada arquivo com o resultado mensal. Faz ajustes finos retirando subtotais e ajustando nomes. Pivota a tabela.
3. Extrai o ranking dos 50 Autos e 50 Comerciais Leves mais vendidos no mês

In [None]:
def extract_total_used(texto, nome_arquivo):
    """"
    Extrai via regex os dados consolidados do mês para o total de emplacamentos
    """
    total_new = []
    pattern = re.compile(
        r"(A\)\s*Autos|B\)\s*Com\.?\s*Leves|A\s*\+\s*B|C\)\s*Caminh[oõ]es|D\)\s*Ônibus|C\s*\+\s*D|Subtotal|E\)\s*Motos|F\)\s*Impl\.?\s*Rod\.?|Outros|Total)\s+([\d.]+)",
        re.IGNORECASE
    )

    for match in pattern.finditer(texto):
        segmento = match.group(1).strip()
        total = int(match.group(2).replace('.', ''))
        total_new.append({
            "segmento": segmento,
            "total": total,
            "arquivo": nome_arquivo
        })

        # Interrompe após os registros de interesse
        if segmento.lower() == "total" or len(total_new) == 11:
            break

    return total_new


# Coleta os dados de todos os PDFs
total_data = []

for nome_arquivo in lista_arquivos_new:
    if nome_arquivo.endswith(".pdf"):
        caminho_pdf = os.path.join(path_new, nome_arquivo)
        try:
            doc = fitz.open(caminho_pdf)
            texto_total = ""
            for pagina in doc:
                texto_total += pagina.get_text("text")
            doc.close()

            resultados = extract_total_used(texto_total, nome_arquivo)
            total_data.extend(resultados)

        except Exception as e:
            print(f"Erro ao processar {nome_arquivo}: {e}")

# Cria o DataFrame
df_total_new = pd.DataFrame(total_data)

In [None]:
# Dicionário para renomear os segmentos
rename_dict = {
    "A) Autos": "Autos",
    "B) Com. Leves": "Comerciais Leves",
    "C) Caminhões": "Caminhões",
    "D) Ônibus": "Ônibus",
    "E) Motos": "Motos",
    "F) Impl. Rod.": "Implementos Rodoviários",
    "Outros": "Outros",
    "OUTROS": "Outros"
}
df_total_new["segmento"] = df_total_new["segmento"].str.strip().replace(rename_dict)

# Remove linhas com segmentos agregados
excluir = {"A + B", "C + D", "Subtotal", "Total"}
df_novos = df_total_new[~df_total_new["segmento"].isin(excluir)]

# Extrai o padrão 'YYYY_MM'
df_total_new["ano_mes"] = df_total_new["arquivo"].apply(lambda x: re.search(r"\d{4}_\d{2}", x).group(0) if re.search(r"\d{4}_\d{2}", x) else "N/A")
df_total_new.drop(['arquivo'], axis=1, inplace=True)
df_total_new.reset_index(drop=True, inplace=True)

# Elimina duplicados
df_total_new = df_total_new.groupby(['ano_mes', 'segmento'])['total'].max().reset_index()

# Pivota por ano_mes
pivot_df_total_new = df_total_new.pivot(index='ano_mes', columns='segmento', values='total').reset_index()

In [None]:
def extract_ranking_new_autos(text):
    """"
    Extrai via regex os dados do ranking dos 50 Autos novos mais comercializados no mês
    """
    lines = text.splitlines()
    pattern = r'(\d+º)\s+([A-Za-z0-9/().\- ]*?[A-Za-z][A-Za-z0-9/().\- ]*?)\s+(\d{1,3}(?:\.\d{3})*|\d+)(?=\s|$)'
    data = re.findall(pattern, "\n".join(lines))
    return data[:50]

all_data = []

for nome_arquivo in lista_arquivos_new:
    if nome_arquivo.endswith('.pdf'):
        caminho_pdf = os.path.join(path_new, nome_arquivo)

        pdf_document = fitz.open(caminho_pdf)

        todo_texto = ""
        for num_pagina in range(pdf_document.page_count):
            pagina = pdf_document[num_pagina]
            todo_texto += pagina.get_text("text")

        pdf_document.close()

        data = extract_ranking_new_autos(todo_texto)

        if data:
            match = re.search(r'(\d{4}_\d{2})', nome_arquivo)
            if match:
                ano_mes = match.group(1)
            else:
                print(f"Aviso: padrão 'YYYY_MM' não encontrado em {nome_arquivo}")
                continue

            for i, dados_entrada in enumerate(data, start=1):
                lista_entrada = list(dados_entrada)
                lista_entrada[0] = f"{i}º"
                lista_entrada.append(ano_mes)
                all_data.append(lista_entrada)

new_autos = pd.DataFrame(all_data, columns=['ranking', 'model', 'total', 'ano_mes'])
new_autos.drop_duplicates(inplace=True)
new_autos['total'] = new_autos['total'].str.replace('.', '').astype(int)

In [None]:
def extract_ranking_new_light(text):
    """"
    Extrai via regex os dados do ranking dos 50 Comerciais Leves novos mais comercializados no mês
    """
    blocos = re.split(r'(?=\d{1,2}º)', text)

    resultados = []
    for bloco in blocos:
        match = re.search(r'(\d+º)\s+([\w/.\- ()]+?)\s+(\d{1,3}(?:\.\d{3})*|\d+)(?=\s+\d+º|\s+Ed\.|$)', bloco.strip())
        if match:
            ranking = match.group(1)
            modelo = match.group(2).strip()
            total = match.group(3).replace('.', '')
            resultados.append((ranking, modelo, total))
    return resultados[49:99]

all_data = []

for nome_arquivo in lista_arquivos_new:
    if nome_arquivo.endswith('.pdf'):
        caminho_pdf = os.path.join(path_new, nome_arquivo)

        pdf_document = fitz.open(caminho_pdf)

        todo_texto = ""
        for num_pagina in range(pdf_document.page_count):
            pagina = pdf_document[num_pagina]
            todo_texto += pagina.get_text("text")

        pdf_document.close()

        data = extract_ranking_new_light(todo_texto)

        if data:
            match = re.search(r'(\d{4}_\d{2})', nome_arquivo)
            if match:
                ano_mes = match.group(1)
            else:
                print(f"Aviso: padrão 'YYYY_MM' não encontrado em {nome_arquivo}")
                continue 

            for i, dados_entrada in enumerate(data, start=1):
                lista_entrada = list(dados_entrada)
                lista_entrada[0] = f"{i}º"
                lista_entrada.append(ano_mes)
                all_data.append(lista_entrada)

new_leves = pd.DataFrame(all_data, columns=["ranking", "model", "total", "ano_mes"])
new_leves.drop_duplicates(inplace=True)
new_leves['total'] = new_leves['total'].astype(int)

In [None]:
# Extrai o número do ranking para facilitar ordenação
new_leves["ranking_num"] = new_leves["ranking"].str.extract(r"(\d+)").astype(int)

# Ordena pra garantir que o 1º ranking vem primeiro dentro de cada grupo
new_leves = new_leves.sort_values(by=["ano_mes", "ranking_num"]).reset_index(drop=True)

# Pega o total do 1º colocado por mês
primeiro_colocado = new_leves[new_leves["ranking_num"] == 1][["ano_mes", "total"]]
primeiro_colocado = primeiro_colocado.rename(columns={"total": "total_top1"})

# Faz merge pra comparar todos os totais com o do 1º colocado
leves_merge = new_leves.merge(primeiro_colocado, on="ano_mes", how="left")

# Mantém só linhas cujo total seja menor ou igual ao do primeiro colocado
new_leves_filtrado = leves_merge[leves_merge["total"] <= leves_merge["total_top1"]].copy()

# Remove colunas auxiliares se quiser
new_leves_filtrado = new_leves_filtrado.drop(columns=["ranking_num", "total_top1"])

### Salvar

In [None]:
path_credentials = ''
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]
creds = ServiceAccountCredentials.from_json_keyfile_name(path_credentials, scope)
client = gspread.authorize(creds)

In [None]:
# Seminovos e Usados
segmento_total_used = client.create("20250601_Seminovos_Usados_Fenabrave_v2")
sheet1 = segmento_total_used.sheet1
set_with_dataframe(sheet1, pivot_df_total_used)
segmento_total_used.add_worksheet(title="Autos", rows=str(len(used_autos)), cols="20")
sheet2 = segmento_total_used.worksheet("Autos")
set_with_dataframe(sheet2, used_autos)
segmento_total_used.add_worksheet(title="Comerciais Leves", rows=str(len(used_leves)), cols="20")
sheet3 = segmento_total_used.worksheet("Comerciais Leves")
set_with_dataframe(sheet3, used_leves)


# Emplacamentos
segmento_total_new = client.create("20250601_Emplacamentos_Fenabrave_v1")
sheet1 = segmento_total_new.sheet1
set_with_dataframe(sheet1, pivot_df_total_new)
segmento_total_new.add_worksheet(title="Autos", rows=str(len(new_autos)), cols="20")
sheet2 = segmento_total_new.worksheet("Autos")
set_with_dataframe(sheet2, new_autos)
segmento_total_new.add_worksheet(title="Comerciais Leves", rows=str(len(new_leves_filtrado)), cols="20")
sheet3 = segmento_total_new.worksheet("Comerciais Leves")
set_with_dataframe(sheet3, new_leves_filtrado)

# Compartilhamento com os emails
emails = ['lucas.nasc.m@gmail.com', 'lucas.moreira@olxbr.com']
for email in emails:
    segmento_total_used.share(email, perm_type='user', role='writer')
    segmento_total_new.share(email, perm_type='user', role='writer')