In [None]:
from datasets import load_dataset
ds_stream = load_dataset("HuggingFaceFW/fineweb-2", "por_Latn", split="train", streaming=True)

In [2]:
ds_stream

IterableDataset({
    features: ['text', 'id', 'dump', 'url', 'date', 'file_path', 'language', 'language_score', 'language_script', 'minhash_cluster_size', 'top_langs'],
    num_shards: 1
})

In [3]:
import spacy
nlp = spacy.load("pt_core_news_lg")

In [5]:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, Text, Index

# Engine síncrono para criação da tabela
sync_engine = create_engine("postgresql+psycopg2://jadson:jadson@localhost/jadson", echo=True)
metadata = MetaData()

contadores = Table(
    "contadores",
    metadata,
    Column("token", Text, nullable=False, primary_key=True),
    Column("quantidade", Integer, nullable=False),
)

contadores_token_hash_idx = Index(
    "contadores_token_hash_idx",
    contadores.c.token,
    postgresql_using="hash"
)

# Criar tabela e índice
metadata.create_all(bind=sync_engine)
print("Tabela criada (sync).")
sync_engine.dispose()

2025-05-15 16:02:10,599 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-05-15 16:02:10,600 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-05-15 16:02:10,601 INFO sqlalchemy.engine.Engine select current_schema()
2025-05-15 16:02:10,601 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-05-15 16:02:10,602 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-05-15 16:02:10,602 INFO sqlalchemy.engine.Engine [raw sql] {}
2025-05-15 16:02:10,602 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-05-15 16:02:10,604 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname

In [6]:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import sessionmaker
from concurrent.futures import ThreadPoolExecutor

# Criar engine async
async_engine = create_async_engine("postgresql+asyncpg://jadson:jadson@localhost/jadson", echo=False)

# Async session factory
AsyncSessionLocal = sessionmaker(
    async_engine, expire_on_commit=False, class_=AsyncSession
)

executor = ThreadPoolExecutor(max_workers=20)

In [7]:
def nlp_process(text):
    # Processa texto e retorna tokens (ids)
    doc = nlp(text)
    return [token.text for token in doc]

In [8]:
import asyncio
from collections import Counter

MAX_PARAMS = 30000  # limite seguro, abaixo de 32767

def chunks(data, max_params=MAX_PARAMS):
    max_batch_size = max_params // 2  # 2 params por registro: token e quantidade
    for i in range(0, len(data), max_batch_size):
        yield data[i:i + max_batch_size]

async def processar_batch(textos):
    loop = asyncio.get_event_loop()

    # Processa os textos em executor paralelo
    tasks = [loop.run_in_executor(executor, nlp_process, texto) for texto in textos]
    resultados = await asyncio.gather(*tasks)

    counter = Counter()
    for tokens in resultados:
        counter.update(tokens)

    # Prepara dados convertendo token para decimal.Decimal
    data = [
        {
            "token": token,
            "quantidade": count
        }
        for token, count in counter.items()
    ]

    async with AsyncSessionLocal() as session:
        async with session.begin():
            for data_chunk in chunks(data):
                stmt = insert(contadores).values(data_chunk)
                stmt = stmt.on_conflict_do_update(
                    index_elements=[contadores.c.token],
                    set_={"quantidade": contadores.c.quantidade + stmt.excluded.quantidade},
                )
                await session.execute(stmt)


In [9]:
from tqdm import tqdm
import nest_asyncio

nest_asyncio.apply()

async def processar_tokens_batch(batch_size=1000):
    batch_texts = []
    with tqdm(desc="Batches processados", unit=" batch") as pbar:
        for exemplo in ds_stream:
            texto = exemplo.get("text")
            if texto:
                batch_texts.append(texto)
            if len(batch_texts) >= batch_size:
                await processar_batch(batch_texts)
                batch_texts = []
                pbar.update(1)
        if batch_texts:
            await processar_batch(batch_texts)
            pbar.update(1)

asyncio.run(processar_tokens_batch())

Batches processados: 33 batch [18:15, 33.21s/ batch]


In [None]:
from sqlalchemy import select

async def teste_contagem_simples():
    frases_teste = [
        "Olá, tudo bem?",
        "Estou testando a contagem de tokens.",
        "Olá, tudo bem?",  # repete para testar incremento
    ]
    
    # Processa e insere no banco
    await processar_batch(frases_teste)
    
    # Ler e mostrar os dados do banco para conferir
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(contadores).order_by(contadores.c.quantidade.desc()))
        rows = result.fetchall()
        print("Conteúdo da tabela 'contadores':")
        for row in rows:
            print(f"Token ID: {row.token}, Quantidade: {row.quantidade}, Texto: {nlp.vocab.strings[row.token]}")

# Executar teste
asyncio.run(teste_contagem_simples())