**// IMPORTS**

In [1]:
import duckdb, pathlib, pandas as pd, re, numpy as np
from collections import Counter
import matplotlib.pyplot as plt

pd.set_option("display.max_colwidth", None)

PROJECT_DB_PATH = pathlib.Path("../data/duckdb/subs_project.duckdb")
SOURCE_DB_PATH  = pathlib.Path("../data/duckdb/subs.duckdb")

PROJECT_DB_STR = PROJECT_DB_PATH.as_posix()
SOURCE_DB_STR  = SOURCE_DB_PATH.as_posix()

def connect_project(read_only: bool = True) -> duckdb.DuckDBPyConnection:
    """
    Connect to subs_project.duckdb and ensure the 'src' catalog is attached.
    """
    con = duckdb.connect(PROJECT_DB_STR, read_only=read_only)

    # Attach src only if it's not there yet
    dbl = con.execute("PRAGMA database_list").df()
    if not (dbl["name"] == "src").any():
        # read-only attach is fine since we don't want to modify src
        con.execute(f"ATTACH '{SOURCE_DB_STR}' AS src")

    return con

TRAIN_TABLE  = 'train_data'      # <- use the VIEW directly
TEST_TABLE   = 'test_data'       # <- use the VIEW directly

N_RATIO_SAMPLE   = 500_000
N_SCATTER_SAMPLE = 50_000
N_TEXT_SAMPLE    = 10_000
SAMPLES = 1000

con = connect_project(read_only=True)

con.execute("PRAGMA threads=4;")
con.execute("PRAGMA preserve_insertion_order=false;")
# optionally, to avoid DuckDB grabbing too much RAM:
# con.execute("PRAGMA memory_limit='3GB';")

def both_sides(where_extra: str = "") -> str:
    extra = f" AND {where_extra}" if where_extra else ""
    return f"""
      SELECT text_pt_br, text_pt_pt FROM {TRAIN_TABLE}
      WHERE text_pt_br IS NOT NULL AND text_pt_pt IS NOT NULL{extra}
      UNION ALL
      SELECT text_pt_br, text_pt_pt FROM {TEST_TABLE}
      WHERE text_pt_br IS NOT NULL AND text_pt_pt IS NOT NULL{extra}
    """



**// CONFIGS**

In [2]:
import duckdb, pathlib, pandas as pd
from typing import Iterator, Dict, Optional

def stream_from_view(
    view_name: str,
    batch_size: int = 10_000,
    limit: Optional[int] = None,
) -> Iterator[Dict]:
    """
    Stream rows from train_data or test_data (views in subs_project.duckdb).
    """
    con = connect_project(read_only=True)   # <= important change
    offset = 0

    base_query = f"""
        SELECT
          dataset, source, bucket, theme, label,
          text_pt_br, text_pt_pt,
          ref_pt_pt_manual, ref_pt_pt_deepl
        FROM {view_name}
    """

    while True:
        if limit is not None:
            remaining = limit - offset
            if remaining <= 0:
                break
            cur_batch = min(batch_size, remaining)
        else:
            cur_batch = batch_size

        batch = con.execute(base_query + f" LIMIT {cur_batch} OFFSET {offset}").df()
        if batch.empty:
            break

        for _, row in batch.iterrows():
            yield row.to_dict()

        offset += len(batch)

    con.close()

def stream_training_rows_from_project(
    train_limit: Optional[int] = None,
    batch_size: int = 10_000,
) -> Iterator[Dict]:
    print("Streaming rows from train_data...")
    yield from stream_from_view(
        view_name="train_data",
        batch_size=batch_size,
        limit=train_limit,
    )

def stream_test_rows_from_project(
    test_limit: Optional[int] = None,
    batch_size: int = 10_000,
) -> Iterator[Dict]:
    print("Streaming rows from test_data...")
    yield from stream_from_view(
        view_name="test_data",
        batch_size=batch_size,
        limit=test_limit,
    )


# # -------- Per-dataset counts (do each split separately to avoid huge unions) --------
# train_ds = con.execute(f"""
#     SELECT dataset, COUNT(*) AS n
#     FROM {TRAIN_TABLE}
#     GROUP BY dataset
# """).fetchdf()

# test_ds = con.execute(f"""
#     SELECT dataset, COUNT(*) AS n
#     FROM {TEST_TABLE}
#     GROUP BY dataset
# """).fetchdf()

# per_dataset = (
#     pd.concat([train_ds.assign(split="train"), test_ds.assign(split="test")], ignore_index=True)
#       .groupby("dataset", as_index=False)["n"].sum()
#       .sort_values("n", ascending=False)
# )

# # -------- Totals & diagnostics (per split, then sum in pandas) --------
# train_tot = con.execute(f"""
#     SELECT
#       COUNT(*) AS total_rows,
#       SUM(dataset IS NULL) AS null_dataset,
#       SUM(COALESCE(TRIM(text_pt_br),'')='' AND COALESCE(TRIM(text_pt_pt),'')='') AS both_texts_empty_or_null
#     FROM {TRAIN_TABLE}
# """).fetchdf().iloc[0]

# test_tot = con.execute(f"""
#     SELECT
#       COUNT(*) AS total_rows,
#       SUM(dataset IS NULL) AS null_dataset,
#       SUM(COALESCE(TRIM(text_pt_br),'')='' AND COALESCE(TRIM(text_pt_pt),'')='') AS both_texts_empty_or_null
#     FROM {TEST_TABLE}
# """).fetchdf().iloc[0]

# totals = pd.DataFrame([{
#     "total_rows": int(train_tot["total_rows"] + test_tot["total_rows"]),
#     "rows_with_null_dataset": int(train_tot["null_dataset"] + test_tot["null_dataset"]),
#     "rows_with_both_texts_empty_or_null": int(train_tot["both_texts_empty_or_null"] + test_tot["both_texts_empty_or_null"]),
# }])

# per_split = pd.DataFrame([
#     {"split":"train",
#      "total_rows": int(train_tot["total_rows"]),
#      "rows_with_null_dataset": int(train_tot["null_dataset"]),
#      "rows_with_both_texts_empty_or_null": int(train_tot["both_texts_empty_or_null"])},
#     {"split":"test",
#      "total_rows": int(test_tot["total_rows"]),
#      "rows_with_null_dataset": int(test_tot["null_dataset"]),
#      "rows_with_both_texts_empty_or_null": int(test_tot["both_texts_empty_or_null"])},
# ])

# print("=== Rows per dataset (combined) ==="); display(per_dataset)
# print("\n=== Totals & null/empty diagnostics (combined) ==="); display(totals)
# print("\n=== Per-split diagnostics (optional) ==="); display(per_split)


**// MAIN CODE**

In [3]:
# print(con.execute("SELECT COUNT(*) FROM train_data").fetchone()[0])
# print(con.execute("SELECT COUNT(*) FROM test_data").fetchone()[0])

In [4]:
from collections import Counter

def compute_basic_counts_from_view_stream(
    train_limit=None,
    test_limit=None,
):
    stats = {
        "train": Counter(),
        "test": Counter(),
    }

    # TRAIN
    n_seen = 0
    for row in stream_training_rows_from_project(
        train_limit=train_limit,
        batch_size=10_000,
    ):
        split = "train"
        br = row["text_pt_br"]
        pt = row["text_pt_pt"]

        stats[split]["total_rows"] += 1
        if br is None:
            stats[split]["null_br"] += 1
        if pt is None:
            stats[split]["null_pt"] += 1

        if (br or "").strip() == "":
            stats[split]["empty_br"] += 1
        if (pt or "").strip() == "":
            stats[split]["empty_pt"] += 1

        if br is None and pt is None:
            stats[split]["both_null"] += 1

        if (br is None or str(br).strip() == "") and (pt is None or str(pt).strip() == ""):
            stats[split]["both_empty_or_null"] += 1

        n_seen += 1
        if train_limit is not None and n_seen >= train_limit:
            break

    # TEST
    n_seen = 0
    for row in stream_test_rows_from_project(
        test_limit=test_limit,
        batch_size=10_000,
    ):
        split = "test"
        br = row["text_pt_br"]
        pt = row["text_pt_pt"]

        stats[split]["total_rows"] += 1
        if br is None:
            stats[split]["null_br"] += 1
        if pt is None:
            stats[split]["null_pt"] += 1

        if (br or "").strip() == "":
            stats[split]["empty_br"] += 1
        if (pt or "").strip() == "":
            stats[split]["empty_pt"] += 1

        if br is None and pt is None:
            stats[split]["both_null"] += 1

        if (br is None or str(br).strip() == "") and (pt is None or str(pt).strip() == ""):
            stats[split]["both_empty_or_null"] += 1

        n_seen += 1
        if test_limit is not None and n_seen >= test_limit:
            break

    return stats

stats = compute_basic_counts_from_view_stream(train_limit=SAMPLES, test_limit=SAMPLES)
stats


Streaming rows from train_data...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Streaming rows from test_data...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

{'train': Counter({'total_rows': 1000}),
 'test': Counter({'total_rows': 1000,
          'null_br': 817,
          'empty_br': 817,
          'null_pt': 183,
          'empty_pt': 183})}

In [5]:
import random
 
def reservoir_sample_pairs(
    n_target: int = 100_000,
    include_test: bool = True,
) -> list[tuple[str, str]]:
    """
    Uniform sample of sentence pairs (br, pt) from train_data (+ optionally test_data).
    Only keeps rows where both sides are non-empty.
    """
    reservoir: list[tuple[str, str]] = []
    t = 0  # number of eligible pairs seen so far

    # TRAIN
    for row in stream_training_rows_from_project(
        train_limit=None,
        batch_size=10_000,
    ):
        br = row["text_pt_br"]
        pt = row["text_pt_pt"]
        if not br or not pt:
            continue
        t += 1
        pair = (br, pt)
        if len(reservoir) < n_target:
            reservoir.append(pair)
        else:
            j = random.randint(0, t - 1)
            if j < n_target:
                reservoir[j] = pair

    # TEST (optional)
    if include_test:
        for row in stream_test_rows_from_project(
            test_limit=None,
            batch_size=10_000,
        ):
            br = row["text_pt_br"]
            pt = row["text_pt_pt"]
            if not br or not pt:
                continue
            t += 1
            pair = (br, pt)
            if len(reservoir) < n_target:
                reservoir.append(pair)
            else:
                j = random.randint(0, t - 1)
                if j < n_target:
                    reservoir[j] = pair

    print(f"Reservoir sample size: {len(reservoir)} from {t} eligible pairs")
    return reservoir

N_SAMPLE_PAIRS = 10_000  # you can reduce if it's slow
pairs = reservoir_sample_pairs(N_SAMPLE_PAIRS, include_test=True)

# Split into side lists
br_texts = [p[0] for p in pairs]
pt_texts = [p[1] for p in pairs]


Streaming rows from train_data...


RuntimeError: Query interrupted

In [None]:
from transformers import AutoTokenizer

MODEL_ID = "Qwen/Qwen3-0.6B"

tok = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
if tok.pad_token is None:
    tok.pad_token = tok.eos_token

def count_qwen_tokens(texts):
    enc = tok(
        texts,
        add_special_tokens=False,
        padding=False,
        truncation=False,
    )
    return [len(ids) for ids in enc["input_ids"]]

br_token_lens = count_qwen_tokens(br_texts)
pt_token_lens = count_qwen_tokens(pt_texts)

def summarize_lengths(name, arr):
    arr = np.array(arr)
    return {
        "side": name,
        "n": int(arr.size),
        "mean": float(arr.mean()),
        "median": float(np.median(arr)),
        "p90": float(np.quantile(arr, 0.90)),
        "p95": float(np.quantile(arr, 0.95)),
        "p99": float(np.quantile(arr, 0.99)),
        "max": int(arr.max()),
    }

qwen_token_stats = pd.DataFrame([
    summarize_lengths("pt_br", br_token_lens),
    summarize_lengths("pt_pt", pt_token_lens),
])

print("=== Qwen-token stats on raw sentences (sample) ===")
display(qwen_token_stats)

In [None]:
len_br = np.array([len(s) for s in br_texts])
len_pt = np.array([len(s) for s in pt_texts])

length_stats = pd.DataFrame([
    summarize_lengths("len_br_chars", len_br),
    summarize_lengths("len_pt_chars", len_pt),
])

print("\n=== LENGTH STATS (chars) — sampled pairs ===")
display(length_stats)

plt.figure()
plt.hist(len_br, bins=100)
plt.title("PT-BR length distribution (sampled train+test)")
plt.xlabel("Characters"); plt.ylabel("Frequency")
plt.show()



=== LENGTH STATS (chars) — full dataset via SQL ===
