In [None]:
from __future__ import annotations
import argparse
import json
import re
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple

import duckdb
import pandas as pd
import matplotlib.pyplot as plt

# -------- Embeddings / Vector store (Chroma via LangChain) --------
from langchain_core.documents import Document
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain_community.embeddings import SentenceTransformerEmbeddings
from langchain_chroma import Chroma
from langchain_text_splitters import (
    MarkdownHeaderTextSplitter,
    RecursiveCharacterTextSplitter,
)

# -------- Parsers --------
import pymupdf4llm  # PDF -> Markdown
from langchain_community.document_loaders import (
    UnstructuredPDFLoader,
    UnstructuredWordDocumentLoader,
)
import pdfplumber  # table extraction
from pypdf import PdfReader  # quick full-text as-needed
from docx import Document as DocxDocument
from dateutil import parser as dateparser

### **Configuration**

In [None]:
DEFAULT_DATA_DIR = "/home/ssever/InsightViewer/data/test"
STORAGE_DIR = "/home/ssever/InsightViewer/test_storage"
#STORAGE_DIR.mkdir(parents=True, exist_ok=True)

CHROMA_DIR = STORAGE_DIR + "/chroma"
COLLECTION_NAME = "filings"

DUCKDB_PATH = STORAGE_DIR + "/metrics.duckdb"
#PLOTS_DIR = STORAGE_DIR + "/plots"
#PLOTS_DIR.mkdir(parents=True, exist_ok=True)

EMBED_MODEL = "all-MiniLM-L12-v2"
CHUNK_SIZE = 400
CHUNK_OVERLAP = 60

### **Metadata extraction**

In [None]:
COMPANY_TO_TICKER = {
    "Microsoft Corporation": "MSFT",
}

RE_FORM = re.compile(r"\bForm\s+(10[-\s]?K|10[-\s]?Q|8[-\s]?K)\b", re.I)
RE_AR   = re.compile(r"\b(Annual\s+Report)\b", re.I)
RE_FY_ENDED  = re.compile(r"\bfiscal\s+year\s+ended\s+([A-Za-z0-9, ]+)\b", re.I)
RE_Q_ENDED   = re.compile(r"\bquarterly\s+period\s+ended\s+([A-Za-z0-9, ]+)\b", re.I)
RE_FY_CODE   = re.compile(r"\bFY(?:20)?(\d{2})\b", re.I)
RE_Q_CODE    = re.compile(r"\bQ([1-4])\b", re.I)
RE_COMPANY   = re.compile(r"\b([A-Z][A-Za-z&.,()\- ]{2,}(?:Corporation|Company|Inc\.|Incorporated|PLC))\b")

TICKER_PREFIX_RE = re.compile(r"^([A-Za-z]{1,6})[ _-]", re.I)
FILENAME_YEAR_RE = re.compile(r"(20\d{2}|FY(?:20)?\d{2})", re.I)
FILING_NAME_RE  = re.compile(r"(10[-_]?K|10[-_]?Q|8-K|AR|Annual[_-]?Report|PressRelease|Slides|Transcript|Outlook)", re.I)

### **Content-first metadata extraction**

In [None]:
def _norm_form(form_str: str) -> str:
    s = form_str.upper().replace(" ", "").replace("_", "").replace("-", "")
    if s == "10K": return "10-K"
    if s == "10Q": return "10-Q"
    if s == "8K":  return "8-K"
    return form_str.upper()


def _year_from_datephrase(phrase: str) -> Optional[str]:
    try:
        dt = dateparser.parse(phrase, fuzzy=True)
        return str(dt.year)
    except Exception:
        return None


def _read_pdf_pages_quick(path: Path, max_pages: int = 6) -> List[str]:
    txts: List[str] = []
    try:
        r = PdfReader(str(path))
        for p in r.pages[:max_pages]:
            try:
                txts.append(p.extract_text() or "")
            except Exception:
                txts.append("")
    except Exception:
        pass
    return txts


def _read_docx_quick(path: Path, max_chars: int = 12000) -> List[str]:
    try:
        doc = DocxDocument(str(path))
        text = "\n".join(p.text for p in doc.paragraphs if p.text)[:max_chars]
        chunk = 3000
        return [text[i:i+chunk] for i in range(0, len(text), chunk)][:4]
    except Exception:
        return []


def extract_meta_from_content(path: Path) -> Dict[str, Any]:
    pages = _read_pdf_pages_quick(path) if path.suffix.lower() == ".pdf" else (
        _read_docx_quick(path) if path.suffix.lower() in {".docx", ".doc"} else []
    )
    filing_type = None
    fiscal_year = None
    fiscal_period = None
    company_name = None
    prov: Dict[str, Any] = {}
    conf = 0.0

    for idx, t in enumerate(pages, start=1):
        if not t.strip():
            continue
        m_form = RE_FORM.search(t)
        if m_form and not filing_type:
            filing_type = _norm_form(m_form.group(1))
            prov.setdefault("filing_type", {"page": idx, "evidence": m_form.group(0)})
            conf = max(conf, 0.8)
        if not filing_type and RE_AR.search(t):
            filing_type = "AR"
            prov.setdefault("filing_type", {"page": idx, "evidence": "Annual Report"})
            conf = max(conf, 0.6)

        if not fiscal_year:
            fy_phrase = RE_FY_ENDED.search(t)
            if fy_phrase:
                y = _year_from_datephrase(fy_phrase.group(1))
                if y:
                    fiscal_year = y
                    prov.setdefault("fiscal_year", {"page": idx, "evidence": fy_phrase.group(0)})
                    conf = max(conf, 0.7)
        if not fiscal_period:
            q_phrase = RE_Q_ENDED.search(t)
            if q_phrase:
                q = RE_Q_CODE.search(t)
                if q:
                    fiscal_period = f"Q{q.group(1)}"
                    prov.setdefault("fiscal_period", {"page": idx, "evidence": q.group(0)})
                    conf = max(conf, 0.6)
                if not fiscal_year:
                    y = _year_from_datephrase(q_phrase.group(1))
                    if y:
                        fiscal_year = y
                        prov.setdefault("fiscal_year", {"page": idx, "evidence": q_phrase.group(0)})
                        conf = max(conf, 0.6)

        if not fiscal_year:
            m_fy = RE_FY_CODE.search(t)
            if m_fy:
                fiscal_year = "20" + m_fy.group(1)
                prov.setdefault("fiscal_year", {"page": idx, "evidence": m_fy.group(0)})
                conf = max(conf, 0.55)
        if not fiscal_period:
            m_q = RE_Q_CODE.search(t)
            if m_q:
                fiscal_period = f"Q{m_q.group(1)}"
                prov.setdefault("fiscal_period", {"page": idx, "evidence": m_q.group(0)})
                conf = max(conf, 0.5)

        if not company_name:
            m_co = RE_COMPANY.search(t)
            if m_co:
                company_name = m_co.group(0).replace("  ", " ").strip().rstrip(",")
                prov.setdefault("company_name", {"page": idx, "evidence": company_name})
                conf = max(conf, 0.5)

        if filing_type and fiscal_year and company_name and conf >= 0.8:
            break

    ticker = COMPANY_TO_TICKER.get(company_name)
    if ticker:
        prov.setdefault("ticker", {"evidence": f"map:{company_name}->{ticker}"})
        conf = max(conf, 0.85)

    return {
        "ticker": ticker,
        "filing_type": filing_type,
        "fiscal_year": fiscal_year,
        "fiscal_period": fiscal_period,
        "company_name": company_name,
        "meta_confidence": conf,
        "meta_provenance": prov,
    }


def get_document_meta(path: Path, ticker_default: Optional[str] = None) -> Dict[str, Any]:
    stem = path.stem
    m_tick = TICKER_PREFIX_RE.match(stem)
    filename_ticker = m_tick.group(1).upper() if m_tick else (ticker_default.upper() if ticker_default else None)
    m_file = FILING_NAME_RE.search(stem)
    filename_filing = _norm_form(m_file.group(1)) if m_file else None
    m_year = FILENAME_YEAR_RE.search(stem)
    filename_year = None
    if m_year:
        y = m_year.group(1).upper()
        if y.startswith("FY"):
            digits = re.sub(r"[^0-9]", "", y[2:])
            filename_year = ("20" + digits) if len(digits) == 2 else (digits if len(digits) == 4 else None)
        else:
            filename_year = y

    content_meta = extract_meta_from_content(path)

    return {
        "ticker": content_meta.get("ticker") or filename_ticker,
        "filing_type": content_meta.get("filing_type") or filename_filing,
        "fiscal_year": content_meta.get("fiscal_year") or filename_year,
        "fiscal_period": content_meta.get("fiscal_period"),
        "company_name": content_meta.get("company_name"),
        "meta_confidence": content_meta.get("meta_confidence"),
        "meta_provenance": content_meta.get("meta_provenance"),
        "source_filename": path.name,
        "source_path": str(path),
    }

### **Markdown parsing and chunking (text → Chroma)**

In [None]:
def pdf_to_markdown(pdf_path: Path) -> str:
    try:
        return pymupdf4llm.to_markdown(str(pdf_path))
    except Exception:
        try:
            docs = UnstructuredPDFLoader(str(pdf_path), strategy="hi_res").load()
            return "\n\n".join(d.page_content for d in docs)
        except Exception:
            raw = []
            try:
                reader = PdfReader(str(pdf_path))
                for p in reader.pages:
                    raw.append(p.extract_text() or "")
            except Exception:
                pass
            return "\n\n".join(raw)


def docx_to_markdown(docx_path: Path) -> str:
    docs = UnstructuredWordDocumentLoader(str(docx_path)).load()
    return "\n\n".join(d.page_content for d in docs)


def md_to_chunks(markdown_text: str, base_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
    headers_to_split_on = [("#", "h1"), ("##", "h2"), ("###", "h3")]
    header_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    md_docs = header_splitter.split_text(markdown_text)

    chunks: List[Dict[str, Any]] = []
    for d in md_docs:
        content = d.page_content
        meta = {**base_metadata, **d.metadata}
        has_table = bool(re.search(r"(^|\n)\s*\|.+\|\s*(\n|$)", content))
        meta["has_table"] = has_table

        if has_table or len(content) <= CHUNK_SIZE:
            chunks.append({"text": content, "metadata": meta})
        else:
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=CHUNK_SIZE,
                chunk_overlap=CHUNK_OVERLAP,
            )
            for t in splitter.split_text(content):
                chunks.append({"text": t, "metadata": meta})
    return chunks

### **Table extraction + tidy facts**

In [None]:
TARGET_METRICS = {
    "revenue": ["revenue", "net sales", "total revenue", "sales"],
    "net_income": [
        "net income", "net income attributable to", "profit for the year",
        "net earnings", "consolidated net income", "net profit",
    ],
}


def scan_units_scale(text: str) -> Tuple[Optional[str], Optional[int]]:
    t = text.lower()
    units = "USD" if ("$" in t or "usd" in t or "dollars" in t) else None
    scale = None
    if "in millions" in t:
        scale = 1_000_000
    elif "in thousands" in t or "in 000s" in t:
        scale = 1_000
    return units, scale


def parse_number(cell: Any) -> Optional[float]:
    s = str(cell).strip()
    if s in ("", "-", "—", "–"):
        return None
    s = s.replace(",", "")
    negative = s.startswith("(") and s.endswith(")")
    if negative:
        s = s[1:-1]
    s = re.sub(r"[\$\€\£]|[^\d\.\-]", "", s)
    if not s or s in (".", "-"):
        return None
    try:
        val = float(s)
        return -val if negative else val
    except ValueError:
        return None


def best_metric_match(label: str, threshold: int = 80) -> Optional[str]:
    from rapidfuzz import fuzz, process
    label_norm = re.sub(r"\s+", " ", label.lower()).strip()
    candidates: List[Tuple[str, int]] = []
    for norm, synonyms in TARGET_METRICS.items():
        best = process.extractOne(label_norm, synonyms, scorer=fuzz.token_sort_ratio)
        if best and best[1] >= threshold:
            candidates.append((norm, best[1]))
    if not candidates:
        return None
    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates[0][0]


def extract_pdf_tables_with_provenance(path: Path) -> List[Dict[str, Any]]:
    out: List[Dict[str, Any]] = []
    with pdfplumber.open(str(path)) as pdf:
        for p_idx, page in enumerate(pdf.pages):
            try:
                tables = page.extract_tables() or []
            except Exception:
                tables = []
            for t_idx, table in enumerate(tables):
                if not table or len(table) < 2:
                    continue
                df = pd.DataFrame(table)
                header = df.iloc[0].astype(str).tolist()
                header_has_text = sum(bool(re.search(r"[A-Za-z]", c or "")) for c in header) >= 2
                if header_has_text:
                    df.columns = header
                    df = df.iloc[1:].reset_index(drop=True)
                out.append({
                    "page": p_idx + 1,
                    "table_id": t_idx + 1,
                    "df": df,
                })
    return out


def melt_wide_years(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame(columns=["label", "year", "value"])

    # Always work by column index to avoid duplicate-name issues
    ncols = df.shape[1]
    # Helper: get a Series for column i even if names are duplicated
    def col_series(i: int) -> pd.Series:
        ser = df.iloc[:, i]
        # If iloc gives a DataFrame (shouldn't), take first col
        if isinstance(ser, pd.DataFrame):
            ser = ser.iloc[:, 0]
        return ser

    # 1) pick a likely label column
    label_idx = 0
    for i in range(ncols):
        ser = col_series(i).astype(str).fillna("")
        # count entries that look textual
        text_count = ser.str.contains(r"[A-Za-z]", regex=True, na=False).sum()
        if text_count >= max(2, int(0.2 * len(ser))):
            label_idx = i
            break

    # 2) detect year columns by header text
    year_idxs = []
    for i in range(ncols):
        if i == label_idx:
            continue
        header = str(df.columns[i])
        if re.search(r"(19|20)\d{2}", header):
            year_idxs.append(i)

    if not year_idxs:
        return pd.DataFrame(columns=["label", "year", "value"])

    # 3) build a compact DataFrame with one label + many year columns
    tmp = pd.DataFrame()
    tmp["label"] = col_series(label_idx).astype(str)

    # Use distinct temporary column names to avoid duplicate header collisions
    year_col_map = {}
    for i in year_idxs:
        header = str(df.columns[i])
        # extract the 4-digit year token
        m = re.search(r"(19|20)\d{2}", header)
        year = m.group(0) if m else header
        colname = f"y_{year}"
        # de-duplicate if same year appears multiple times
        k = 2
        base = colname
        while colname in tmp.columns:
            colname = f"{base}_{k}"
            k += 1
        tmp[colname] = col_series(i)

        # remember which colname maps to which year
        year_col_map[colname] = year

    # 4) melt to tidy
    tidy = tmp.melt(id_vars=["label"], var_name="year_col", value_name="raw_value")

    # 5) map back to the actual year and parse numbers
    tidy["year"] = tidy["year_col"].map(year_col_map).fillna("")
    tidy.dropna(subset=["year"], inplace=True)
    tidy["value"] = tidy["raw_value"].apply(parse_number)

    # 6) final clean-up
    tidy = tidy[["label", "year", "value"]].reset_index(drop=True)
    return tidy

### **Chroma helpers**

In [None]:
def get_chroma(collection_name: str = COLLECTION_NAME, persist_dir: str = CHROMA_DIR):
    embeddings = SentenceTransformerEmbeddings(model_name=EMBED_MODEL)
    vs = Chroma(
        collection_name=collection_name,
        persist_directory=persist_dir,
        embedding_function=embeddings,
    )
    return vs


def upsert_chunks_to_chroma(
    chunks,
    collection_name: str = COLLECTION_NAME,
    persist_dir: str = CHROMA_DIR,
):
    if not chunks:
        return 0

    vs = get_chroma(collection_name, persist_dir)

    # Build Documents then filter complex metadata
    docs = [Document(page_content=c["text"], metadata=c["metadata"]) for c in chunks]
    docs = filter_complex_metadata(docs)

    vs.add_documents(docs)

    # For older langchain versions that still have .persist(), this is harmless.
    try:
        vs.persist()  # no-op on modern versions / AttributeError otherwise
    except AttributeError:
        pass

    return len(docs)

### **DuckDB helpers**

In [None]:
DDL_METRICS = """
CREATE TABLE IF NOT EXISTS metrics (
    id TEXT PRIMARY KEY,
    ticker TEXT,
    filing_type TEXT,
    fiscal_year INTEGER,
    fiscal_period TEXT,
    metric TEXT,
    period_year INTEGER,
    value DOUBLE,
    units TEXT,
    scale INTEGER,
    source_filename TEXT,
    page INTEGER,
    table_id INTEGER,
    provenance JSON
)
"""

def init_duckdb(path: Path = DUCKDB_PATH) -> duckdb.DuckDBPyConnection:
    #path.parent.mkdir(parents=True, exist_ok=True)  # <--- add this line
    conn = duckdb.connect(str(path))
    conn.execute(DDL_METRICS)
    return conn


def insert_metrics(conn: duckdb.DuckDBPyConnection, rows: List[Dict[str, Any]]):
    if not rows:
        return 0
    df = pd.DataFrame(rows)
    conn.register("df", df)
    conn.execute("INSERT OR REPLACE INTO metrics SELECT * FROM df")
    conn.unregister("df")
    return len(rows)

### **Ingestion**

In [None]:
@dataclass
class IngestStats:
    chunk_docs: int = 0
    metric_rows: int = 0


def ingest_path(path: Path, ticker_default: Optional[str], conn: duckdb.DuckDBPyConnection) -> IngestStats:
    stats = IngestStats()
    base_meta = get_document_meta(path, ticker_default=ticker_default)

    # ---- Vector text (Markdown) ----
    try:
        if path.suffix.lower() == ".pdf":
            md = pdf_to_markdown(path)
        elif path.suffix.lower() in {".docx", ".doc"}:
            md = docx_to_markdown(path)
        else:
            print(f"Skipping unsupported file: {path.name}")
            return stats
    except Exception as e:
        print(f"[WARN] Failed to parse {path.name}: {e}")
        return stats
          

    chunks = md_to_chunks(md, base_meta)
    stats.chunk_docs += upsert_chunks_to_chroma(chunks)

    # ---- Extract metrics (PDF only) ----
    if path.suffix.lower() == ".pdf":
        units, scale = scan_units_scale(md)
        tables = extract_pdf_tables_with_provenance(path)

        rows: List[Dict[str, Any]] = []
        for t in tables:
            tidy = melt_wide_years(t["df"])
            if tidy.empty:
                continue
            for _, r in tidy.iterrows():
                metric_norm = best_metric_match(str(r["label"]))
                if not metric_norm:
                    continue
                val = r["value"]
                if val is None:
                    continue
                period_year = int(r["year"]) if pd.notnull(r["year"]) else None
                rows.append({
                    "id": str(uuid.uuid4()),
                    "ticker": base_meta.get("ticker"),
                    "filing_type": base_meta.get("filing_type"),
                    "fiscal_year": int(base_meta["fiscal_year"]) if base_meta.get("fiscal_year") else None,
                    "fiscal_period": base_meta.get("fiscal_period"),
                    "metric": metric_norm,
                    "period_year": period_year,
                    "value": float(val * (scale or 1)),
                    "units": units,
                    "scale": scale,
                    "source_filename": base_meta.get("source_filename"),
                    "page": int(t["page"]),
                    "table_id": int(t["table_id"]),
                    "provenance": json.dumps({
                        "label_raw": str(r["label"]),
                        "columns_sample": [str(c) for c in list(t["df"].columns)[:6]],
                        "meta_provenance": base_meta.get("meta_provenance"),
                    }),
                })
        stats.metric_rows += insert_metrics(conn, rows)

    return stats


def cmd_ingest(data_dir, ticker):
    data_dir = data_dir
    ticker = ticker
    conn = init_duckdb(DUCKDB_PATH)

    files = [p for p in sorted(data_dir.glob("**/*")) if p.suffix.lower() in (".pdf", ".docx", ".doc")]
    if not files:
        print(f"No files found under {data_dir}.")
        return

    total = IngestStats()
    for f in files:
        s = ingest_path(f, ticker_default=ticker, conn=conn)
        total.chunk_docs += s.chunk_docs
        total.metric_rows += s.metric_rows
        print(f"Ingested {f.name}: chunks+{s.chunk_docs}, metrics+{s.metric_rows}")

    print("—" * 60)
    print(f"Total chunks added: {total.chunk_docs}")
    print(f"Total metric rows upserted: {total.metric_rows}")
    print(f"Chroma dir: {CHROMA_DIR}")
    print(f"DuckDB file: {DUCKDB_PATH}")

In [None]:
cmd_ingest(data_dir=Path(DEFAULT_DATA_DIR), ticker="MSFT")